📦 Rocket版本 4.9.6

🏢 官方文档: https://rocketmq.apache.org/zh/docs/4.x/

💻 使用的系统是 ubuntu22.04

🏆消息重复消费问题

什么是重复消费?

BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。

CLUSTERING(负载均衡)模式下,如果一个 topic被多个 consumerGroup消费,也会重复消费。

即使是在 CLUSTERING模式下,同一个 consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:

  1. 一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset提交给 broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。
  2. 还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。
  3. 生产者发送消息给队列的时候,在发送完成后队列需要回应,如果在回应的时候恰巧 mq和生产者联系断掉了,那么生产者会在重新发送消息,这样在队列中有可能出现两天一样的消息

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId也可以是你自定义的唯一的key,这样就可以去重了

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同 msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费

幂等性: 多次执行结果和第一次执行结果一致

解决方法

目前有两种解决方法:

  1. 使用 Redis或者 mysql去保存唯一值 key,并且需要设置约束在表中唯一(UNIQUE),在消费时先向数据库中插入这个唯一的 key,如果插入成功就继续执行业务逻辑,如果失败直接签收(失败就表示这条消息时重复消息)
  2. 布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。在 hutool的工具中我们可以直接使用,当然你自己使用 redisbitmap类型手写一个也是可以的,或者用 set,但是 bit最好

如果业务逻辑报错需要在 redis或者 mysql中删除这条key

布隆过滤器代码演示

1️⃣ 生产者

 @Test
    public void Repeat() throws Exception {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("RepeatProducerGroup");
        //连接nameserver
        producer.setNamesrvAddr(export.NameSrv);
        //启动生产者
        producer.start();
        //创建两个一模一样的消息
        String key = UUID.randomUUID().toString();
        //key一样的消息
        Message message1 = new Message("RepeatTopic", null, key, "我是一条重复消息".getBytes());
        Message message2 = new Message("RepeatTopic", null, key, "我是一条重复消息".getBytes());
        //发送
        producer.send(message1);
        producer.send(message2);
        System.out.println("发送完成");
        producer.shutdown();
    }

2️⃣ 消费者

@Test
    public void Consumer() throws Exception {
        // 创建第一个消费者实例
        DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("RepeatConsumerGroup");
        consumer1.setNamesrvAddr(export.NameSrv);

        consumer1.subscribe("RepeatTopic", "*");
        consumer1.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                String keys = messageExt.getKeys();
                //判断消息是否来过
                if(export.bloomFilter.contains(keys)) {
                    System.out.println("消息重复了");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                System.out.println("消息: " + new String(messageExt.getBody()));
                System.out.println("key : " + keys);
                export.bloomFilter.add(keys);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer1.start();
        System.in.read();
    }