📦 Rocket版本 4.9.6

🏢 官方文档: https://rocketmq.apache.org/zh/docs/4.x/

💻 使用的系统是 ubuntu22.04

🏆 重复机制和死信消息

⭐️ 重复机制

1️⃣ 生产者重复

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

2️⃣ 消费者重试

Consumer消费某条消息失败(在 return ConsumeConcurrentlyStatus.RECONSUME_LATER后),则 RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,MQ在广播模式下只负责广播发送数据,至于消费者消费成功与否它都不管,不会记录消费点位

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(10);
  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。

    consumer.setSuspendCurrentQueueTimeMillis(5000);
    

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊 Topic,其命名为 %RETRY%ConsumerGroupName,集群模式下并发消费每一个 ConsumerGroup会对应一个特殊Topic,并会订阅该 Topic。 两者参数差别如下

消费类型 重试间隔 最大重试次数
顺序消费 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis 最大重试次数可通过自定义参数 MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为 Integer.MAX
并发消费 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值

并发消费重试间隔如下,可以看到与延迟消息第三个等级开始的时间完全一致。

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10s 9 7min
2 30s 10 8min
3 1min 11 9min
4 2min 12 10min
5 3min 13 20min
6 4min 14 30min
7 5min 15 1h
8 6min 16 2h

但是在实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理

代码演示

/**
 * 测试消费者
 *
 * @throws Exception
 */
@Test
public void testConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            try {
                // 这里执行消费的代码
                System.out.println(Thread.currentThread().getName() + "----" + msgs);
                // 这里制造一个错误
                int i = 10 / 0;
            } catch (Exception e) {
                // 出现问题 判断重试的次数
                MessageExt messageExt = msgs.get(0);
                // 获取重试的次数 失败一次消息中的失败次数会累加一次
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    // 则把消息确认了,可以将这条消息记录到日志或者数据库 通知人工处理
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

⭐️ 死信队列

当一条消息初次消费失败,

RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的 ConsumerGroup的死信 Topic名称为 %DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用 RocketMQ Admin工具或者 RocketMQ Dashboard上查询到对应死信消息的信息

演示代码:

1️⃣ 生产者代码

@Test
    public void Producer() throws Exception{
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("DealProducerGroup");
        //连接nameserver
        producer.setNamesrvAddr(export.NameSrv);
        //启动生产者
        producer.start();
        //创建消息
        Message message = new Message("DealTopic","这是一条死信".getBytes());
        //发送消息
        SendResult send = producer.send(message);

        //输出发送结果
        System.out.println("发送成功");
        //关闭生产者
        producer.shutdown();
    }

2️⃣消费者代码

 @Test
    public void Consumer() throws Exception {
        //创建一个消费者
        DefaultMQPushConsumer defaultMQConsumer = new DefaultMQPushConsumer("DelayConsumerGroup");
        //设置NameServer地址
        defaultMQConsumer.setNamesrvAddr(export.NameSrv);
        //订阅消息,构建方法使用topic和消息匹配规则,* 表示任何消息
        defaultMQConsumer.subscribe("DealTopic","*");
        //消息重试次数2次
        defaultMQConsumer.setMaxReconsumeTimes(2);
        //注册回调接口来处理从Broker中收到的消息, MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
        defaultMQConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("消息消费失败");
                //在消费失败两次后就加入到死信队列
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        // 启动Consumer
        defaultMQConsumer.start();
        //挂起jvm
        Thread.sleep(100000);
    }

执行代码后,结果为消费者在重试消费两次之后如果继续失败就将消息加入死信队列

image-20240803124214415

image-20240803124448863