📦 Rocket版本 4.9.6

🏢 官方安装文档: https://rocketmq.apache.org/zh/docs/4.x/quickstart/03quickstartWithDockercompose

💻 使用的系统是 ubuntu22.04

🏆 RocketMQ的消息的发送和消费方式

⭐️发送方式

1️⃣同步发送

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

在快速入门就是使用的同步发送方式

同步发送

2️⃣异步发送

同步发送异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

异步发送需要实现异步发送回调接口SendCallback)。

消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

代码演示

    @Test
    public void ASync() throws Exception {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("ASyncProducerGroup");
        //连接nameserver
        producer.setNamesrvAddr(export.NameSrv);
        //启动生产者
        producer.start();
        //设置异步消息发送失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //创建消息
        Message aSyncTopic = new Message("ASyncTopic", "这是一条异步消息".getBytes());
        //异步发送消息
        producer.send(aSyncTopic, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功!");
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("发送失败!");
            }
        });
        System.out.println("执行顺序");
        //异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
        Thread.sleep(10000);
        //关闭生产者
        producer.shutdown();
    }

异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是 send方法需要传入 SendCallback 的实现,SendCallback 接口主要有 onSuccessonException 两个方法,表示消息发送成功和消息发送失败。

3️⃣单向发送

同步发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集

代码演示,使用 sendOneway 不会对返回结果有任何等待和处理

    @Test
    public void OneWay() throws InterruptedException, RemotingException, MQClientException {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("OneWayProducerGroup");
        //连接nameserver
        producer.setNamesrvAddr(export.NameSrv);
        //启动生产者
        producer.start();
        //设置异步消息发送失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //创建消息
        Message OneWayTopic = new Message("OneWayTopic", "这是一条异步消息".getBytes());
        //发送单向消息
        producer.sendOneway(OneWayTopic);
        //关闭生产者
        producer.shutdown();
    }

4️⃣延迟发送

延迟消息发送是指消息发送到

Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer进行消费。

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用

RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力

定时消息场景一

延时消息约束:Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level) 延迟时间 投递等级(delay level) 延迟时间
1 1s 1秒 10 6min 6分钟
2 5s 5秒 11 7min 7分钟
3 10s 10秒 12 8min 8分钟
4 30s 30多岁 13 9min 9分钟
5 1min 1分钟 14 10min 10分钟
6 2min 2分钟 15 20min 20分钟
7 3min 3分钟 16 30min 30分钟
8 4min 4分钟 17 1h 1小时
9 5min 5分钟 18 2h 2小时

代码示例

1️⃣首先创建一个生产者吗,设置延迟等级为3,也就是延迟10s,发送后输出发送时间

@Test
    public void Delay() throws Exception {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
        //连接nameserver
        producer.setNamesrvAddr(export.NameSrv);
        //启动生产者
        producer.start();
        //创建消息
        Message message = new Message("DelayTopic","这是一条延迟10s发送的消息".getBytes());
        //设置消息延迟发送时间
        message.setDelayTimeLevel(3);
        //发送消息
        SendResult send = producer.send(message);

        //输出发送时间
        System.out.println("发送时间: "+new Date());
        //关闭生产者
        producer.shutdown();
    }

2️⃣ 消费者接收,接收后输出接收时间

 @Test
    public void Consumer() throws Exception {
        //创建一个消费者
        DefaultMQPushConsumer defaultMQConsumer = new DefaultMQPushConsumer("DelayConsumerGroup");
        //设置NameServer地址
        defaultMQConsumer.setNamesrvAddr(export.NameSrv);
        //订阅消息,构建方法使用topic和消息匹配规则,* 表示任何消息
        defaultMQConsumer.subscribe("DelayTopic","*");
        //注册回调接口来处理从Broker中收到的消息, MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
        defaultMQConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("消息: "+ new String(msgs.get(0).getBody()));
                System.out.println("收到消息的时间: "+ new Date());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动Consumer
        defaultMQConsumer.start();
        //挂起jvm
        Thread.sleep(100000);
    }

3️⃣ 结果

image-20240801210922630

5️⃣批量发送

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数

batch

代码演示

    @Test
    public void Batch() throws Exception {
        //创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
        //连接nameserver
        producer.setNamesrvAddr(export.NameSrv);
        //启动生产者
        producer.start();
        //创建消息
        List<Message> messages = new ArrayList<>();
        messages.add(new Message("BatchTopic", "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message("BatchTopic", "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message("BatchTopic", "Tag", "OrderID003", "Hello world 2".getBytes()));
        //发送消息
        SendResult send = producer.send(messages);

        //输出发送结果
        System.out.println("发送时间: "+new Date());
        //关闭生产者
        producer.shutdown();
    }

执行查看 dashabord,发现消息都在同一个队列中

image-20240801212507848

这里调用非常简单,将消息打包成 Collection<Message> msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同

批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。批量消费不仅仅只用于批量发送场景,适用于出顺序发送的发任何场景

6️⃣顺序发送

顺序消息是一种对消息发送和消费顺序有严格要求的消息。

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的

ShardingKey),同一个 ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。


生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

顺序消息发送

顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。
例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将

ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。

顺序消息场景一

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个

queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的 queue只有一个,则是全局有序;如果多个 queue参与,则为分区有序,即相对每个queue,消息都是有序的。

消费者在默认情况下采用的是并发多线程的模式来接收消息比如

MessageListenerConcurrently,它默认会开启20个线程来读取消息,那么这样并不能保证消息的一致性,所以需要采用单线程的模式来读取消息,需要顺序发送的消息会发送到同一个队列中去,然后通过消费者单线程来读取消息,也就是安装顺序一个一个读取

代码示例

1️⃣ 生产者代码

  /**
     * 顺序发送
     * @throws Exception
     */
    @Test
    public void Order() throws  Exception{
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
        producer.setNamesrvAddr(export.NameSrv);
        producer.start();
        //创建顺序消息
        for (int i = 1; i <= 10; i++) {
            //所有消息的id为1表示所有消息都会发送到同一个队列中
            int orderId = 1;
            Message message = new Message("OrderTopic", ("我是第" + i + "条消息").getBytes());
            producer.send(message, new MessageQueueSelector() {
                @Override
                //这里的arg就是后面orderId的值(可以将某一个信息中的唯一值传入)
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //将i的值转换为哈希code(这里可以将某一个值的唯一值转为哈希值)
                    int argHs = arg.hashCode();
                    //在取模决定发送给哪个
                    int queue = argHs % mqs.size();
                    //return 决定发送的队列,那么相同orderId的消息会被发送在同一个队列中
                    return mqs.get(queue);
                }
            }, orderId);
        };

        System.out.println("发送完成");
        producer.shutdown();
    }

这里的区别主要是调用了 SendResult send(Message msg, MessageQueueSelector selector, Object arg)方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准

MessageQueueSelector的接口如下:

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

其中 mqs 是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。

生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序

2️⃣ 消费者代码

/**
     * 接收需要变成单线程接收
     */
    @Test
    public void Consumer() throws  Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");
        consumer.setNamesrvAddr(export.NameSrv);
        consumer.subscribe("OrderTopic","*");
        //使用顺序消费MessageListenerOrderly,这里并不是说只开启一个线程,而是保证同一个orderId的消息在一个线程中读取
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //输出当前线程id
                System.out.println(Thread.currentThread().getId());
                System.out.println(new String(msgs.get(0).getBody()));
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        Thread.sleep(100000);
        consumer.shutdown();
    }

运行代码

image-20240802154404330

顺序消息的一致性

如果一个Broker掉线,那么此时队列总数是否会发化?

如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息:

$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

其次要保证NameServer中的配置 orderMessageEnablereturnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

7️⃣事务发送

⭐️消费方式

1️⃣Push消费

在快速入门就使用的是 push方法来获取消息,也介绍了步骤和代码

设置集群模式和广播模式

我们可以通过以下代码来设置采用集群模式,RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。

consumer.setMessageModel(MessageModel.CLUSTERING);

通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。

consumer.setMessageModel(MessageModel.BROADCASTING);

消息过滤

Tag过滤

Tag在生产者章节已经介绍过,用于对某个Topic下的消息进行分类。生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:

  • 订单消息
  • 支付消息
  • 物流消息

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 支付系统:只需订阅支付消息。
  • 物流系统:只需订阅物流消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。
  • 交易成功率分析系统:需订阅订单和支付消息。

过滤示意图如下所示

Tag过滤

对于物流系统和支付系统来说,它们都只订阅单个Tag,此时只需要在调用subcribe接口时明确标明Tag即可。

consumer.subscribe("TagFilterTest", "TagA");

对于实时计算系统来说,它订阅交易Topic下所有的消息,Tag用星号(*)表示即可。

consumer.subscribe("TagFilterTest", "*");

对于交易成功率分析系统来说,它订阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔即可。

consumer.subscribe("TagFilterTest", "TagA||TagB");

这里需要注意的是,如果同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准。

//如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("TagFilterTest", "TagA");
consumer.subscribe("TagFilterTest", "TagB");

代码演示

1️⃣ 生产者代码

 @Test
    public void Tags() throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("TagProducerGroup");
        producer.setNamesrvAddr(export.NameSrv);
        producer.start();
		//设置tag
        Message message1 = new Message("TagsTopic","tag1","tag1的消息".getBytes());
        Message message2 = new Message("TagsTopic","tag2","tag2的消息".getBytes());
        producer.send(message2);
        producer.send(message1);

        System.out.println("发送完毕");
        producer.shutdown();

    }

使用的构建方法为:

public Message(String topic, String tags, byte[] body) {
    this(topic, tags, "", 0, body, true);
}

2️⃣ 消费者代码

/**
     * 接收带tag1的消息的消费者
     */
    @Test
    public void Consumer1() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagsConsumerGroup");
        consumer.setNamesrvAddr(export.NameSrv);
        consumer.subscribe("TagsTopic","tag1");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt x = msgs.get(0);
                System.out.println(new String(x.getBody()));
                System.out.println(x.getTags());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        Thread.sleep(100000);
        consumer.shutdown();
    }
    /**
     * 接收带tag2和tag1的消息的消费者
     */
    @Test
    public void Consumer2() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagsConsumerGroup");
        consumer.setNamesrvAddr(export.NameSrv);
        consumer.subscribe("TagsTopic","tag2 || tag1");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt x = msgs.get(0);
                System.out.println(new String(x.getBody()));
                System.out.println(x.getTags());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        Thread.sleep(100000);
        consumer.shutdown();
    }

SQL92过滤

SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。

Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 开启属性过滤首先要在Broker端设置配置enablePropertyFilter=true,该值默认为false。

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:

  • 订单消息
  • 物流消息
    • 物流消息且地域为杭州
    • 物流消息且地域为上海

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 物流系统1:只需订阅物流消息且消息地域为杭州。
  • 物流系统2:只需订阅物流消息且消息地域为杭州或上海。
  • 订单跟踪系统:只需订阅订单消息。

SQL92过滤示意图如下所示:

SQL92过滤

地域将作为自定义属性设置在消息中。

  • 消息发送端: 设置消息的自定义属性。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("a", "1");
  • 消息消费端: 使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。
consumer.subscribe("SqlFilterTest",
    MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
        "and (a is not null and a between 0 and 3)"));

消息唯一性(keys)

在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询

1️⃣ 生产者代码

通过消息对象来设置 key

@Test
public void testKeyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest","tagA","key", "我是一个带标记和key的消息".getBytes());
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

2️⃣ 消费者代码

@Test
public void testKeyConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
    consumer.subscribe("TopicTest", "tagA || tagB || tagC");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            System.out.println(msgs.get(0).getTags());
            System.out.println(msgs.get(0).getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

2️⃣Pull消费

在RocketMQ中有两种Pull方式,一种是比较原始 Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是 Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

Pull Consumer

Pull Consumer示例如下

public class PullConsumerTest {
  public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
      MessageQueue mq = new MessageQueue();
      mq.setQueueId(0);
      mq.setTopic("TopicTest");
      mq.setBrokerName("jinrongtong-MacBook-Pro.local");
      long offset = 26;
      PullResult pullResult = consumer.pull(mq, "*", offset, 32);
      if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
        System.out.printf("%s%n", pullResult.getMsgFoundList());
        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    consumer.shutdown();
  }
}

首先需要初始化 DefaultMQPullConsumer并启动,然后构造需要拉取的队列 MessageQueue,除了构造外也可以如下所示调用 fetchSubscribeMessageQueues方法获取某个Topic的所有队列,然后挑选队列进行拉取。

Set<MessageQueue> queueSet =  consumer.fetchSubscribeMessageQueues("TopicTest");

找到或者构造完队列之后,调用pull方法就可以进行拉取,需要传入拉取的队列,过滤表达式,拉取的位点,最大拉取消息条数等参数。拉取完成后会返回拉取结果 PullResult,PullResult中的PullStatus表示结果状态,如下所示

public enum PullStatus {
    /**
     * Founded
     */
    FOUND,
    /**
     * No new message can be pull
     */
    NO_NEW_MSG,
    /**
     * Filtering results can not match
     */
    NO_MATCHED_MSG,
    /**
     * Illegal offset,may be too big or too small
     */
    OFFSET_ILLEGAL
}

FOUND表示拉取到消息,NO_NEW_MSG表示没有发现新消息,NO_MATCHED_MSG表示没有匹配的消息,OFFSET_ILLEGAL表示传入的拉取位点是非法的,有可能偏大或偏小。如果拉取状态是FOUND,我们可以通过 pullResultgetMsgFoundList方法获取拉取到的消息列表。最后,如果消费完成,通过 updateConsumeOffset方法更新消费位点。

Lite Pull Consumer

Lite Pull Consumer是RocketMQ 4.6.0推出的Pull Consumer,相比于原始的Pull Consumer更加简单易用,它提供了Subscribe和Assign两种模式,Subscribe模式示例如下

public class LitePullConsumerSubscribe {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setPullBatchSize(20);
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

首先还是初始化 DefaultLitePullConsumer并设置 ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过 setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点。在subscribe模式下,同一个消费组下的多个 LitePullConsumer会负载均衡消费,与PushConsumer一致。

如下是Assign模式的示例

public class LitePullConsumerAssign {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        litePullConsumer.start();
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        litePullConsumer.assign(assignList);
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

Assign模式一开始仍然是初始化 DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列,因此在例子中,先用fetchMessageQueues获取了Topic下的队列,再取前面的一半队列进行拉取,示例中还调用了seek方法,将第一个队列拉取的位点设置从10开始。紧接着进入循环不停地调用poll方法拉取消息,拉取到消息后调用commitSync方法手动提交位点。