📦 Rocket
版本 4.9.6
🏢 官方安装文档: https://rocketmq.apache.org/zh/docs/4.x/quickstart/03quickstartWithDockercompose
💻 使用的系统是 ubuntu22.04
🏆 RocketMQ的消息的发送和消费方式
⭐️发送方式
1️⃣同步发送
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
在快速入门就是使用的同步发送方式
2️⃣异步发送
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
异步发送需要实现异步发送回调接口(
SendCallback
)。
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
代码演示
@Test
public void ASync() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ASyncProducerGroup");
//连接nameserver
producer.setNamesrvAddr(export.NameSrv);
//启动生产者
producer.start();
//设置异步消息发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
//创建消息
Message aSyncTopic = new Message("ASyncTopic", "这是一条异步消息".getBytes());
//异步发送消息
producer.send(aSyncTopic, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功!");
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败!");
}
});
System.out.println("执行顺序");
//异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
Thread.sleep(10000);
//关闭生产者
producer.shutdown();
}
异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是 send
方法需要传入 SendCallback
的实现,SendCallback
接口主要有 onSuccess
和 onException
两个方法,表示消息发送成功和消息发送失败。
3️⃣单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
代码演示,使用 sendOneway
不会对返回结果有任何等待和处理
@Test
public void OneWay() throws InterruptedException, RemotingException, MQClientException {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("OneWayProducerGroup");
//连接nameserver
producer.setNamesrvAddr(export.NameSrv);
//启动生产者
producer.start();
//设置异步消息发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
//创建消息
Message OneWayTopic = new Message("OneWayTopic", "这是一条异步消息".getBytes());
//发送单向消息
producer.sendOneway(OneWayTopic);
//关闭生产者
producer.shutdown();
}
4️⃣延迟发送
延迟消息发送是指消息发送到
Apache RocketMQ
后,并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer
进行消费。
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用
RocketMQ
的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力
延时消息约束:Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 |
---|---|---|---|
1 | 1s 1秒 | 10 | 6min 6分钟 |
2 | 5s 5秒 | 11 | 7min 7分钟 |
3 | 10s 10秒 | 12 | 8min 8分钟 |
4 | 30s 30多岁 | 13 | 9min 9分钟 |
5 | 1min 1分钟 | 14 | 10min 10分钟 |
6 | 2min 2分钟 | 15 | 20min 20分钟 |
7 | 3min 3分钟 | 16 | 30min 30分钟 |
8 | 4min 4分钟 | 17 | 1h 1小时 |
9 | 5min 5分钟 | 18 | 2h 2小时 |
代码示例
1️⃣首先创建一个生产者吗,设置延迟等级为3,也就是延迟10s,发送后输出发送时间
@Test
public void Delay() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
//连接nameserver
producer.setNamesrvAddr(export.NameSrv);
//启动生产者
producer.start();
//创建消息
Message message = new Message("DelayTopic","这是一条延迟10s发送的消息".getBytes());
//设置消息延迟发送时间
message.setDelayTimeLevel(3);
//发送消息
SendResult send = producer.send(message);
//输出发送时间
System.out.println("发送时间: "+new Date());
//关闭生产者
producer.shutdown();
}
2️⃣ 消费者接收,接收后输出接收时间
@Test
public void Consumer() throws Exception {
//创建一个消费者
DefaultMQPushConsumer defaultMQConsumer = new DefaultMQPushConsumer("DelayConsumerGroup");
//设置NameServer地址
defaultMQConsumer.setNamesrvAddr(export.NameSrv);
//订阅消息,构建方法使用topic和消息匹配规则,* 表示任何消息
defaultMQConsumer.subscribe("DelayTopic","*");
//注册回调接口来处理从Broker中收到的消息, MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
defaultMQConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("消息: "+ new String(msgs.get(0).getBody()));
System.out.println("收到消息的时间: "+ new Date());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
defaultMQConsumer.start();
//挂起jvm
Thread.sleep(100000);
}
3️⃣ 结果
5️⃣批量发送
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数
代码演示
@Test
public void Batch() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
//连接nameserver
producer.setNamesrvAddr(export.NameSrv);
//启动生产者
producer.start();
//创建消息
List<Message> messages = new ArrayList<>();
messages.add(new Message("BatchTopic", "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message("BatchTopic", "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message("BatchTopic", "Tag", "OrderID003", "Hello world 2".getBytes()));
//发送消息
SendResult send = producer.send(messages);
//输出发送结果
System.out.println("发送时间: "+new Date());
//关闭生产者
producer.shutdown();
}
执行查看 dashabord
,发现消息都在同一个队列中
这里调用非常简单,将消息打包成
Collection<Message> msgs
传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同
批量方式消费
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。批量消费不仅仅只用于批量发送场景,适用于出顺序发送的发任何场景
6️⃣顺序发送
顺序消息是一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的
ShardingKey
),同一个 ShardingKey
的消息会被分配到同一个队列中,并按照顺序被消费。
需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。
生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。
例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将
ShardingKey
相同(同一订单号)的消息序路由到一个逻辑队列中。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个
queue
中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的 queue
只有一个,则是全局有序;如果多个 queue
参与,则为分区有序,即相对每个queue,消息都是有序的。
消费者在默认情况下采用的是并发多线程的模式来接收消息比如
MessageListenerConcurrently
,它默认会开启20个线程来读取消息,那么这样并不能保证消息的一致性,所以需要采用单线程的模式来读取消息,需要顺序发送的消息会发送到同一个队列中去,然后通过消费者单线程来读取消息,也就是安装顺序一个一个读取
代码示例
1️⃣ 生产者代码
/**
* 顺序发送
* @throws Exception
*/
@Test
public void Order() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr(export.NameSrv);
producer.start();
//创建顺序消息
for (int i = 1; i <= 10; i++) {
//所有消息的id为1表示所有消息都会发送到同一个队列中
int orderId = 1;
Message message = new Message("OrderTopic", ("我是第" + i + "条消息").getBytes());
producer.send(message, new MessageQueueSelector() {
@Override
//这里的arg就是后面orderId的值(可以将某一个信息中的唯一值传入)
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//将i的值转换为哈希code(这里可以将某一个值的唯一值转为哈希值)
int argHs = arg.hashCode();
//在取模决定发送给哪个
int queue = argHs % mqs.size();
//return 决定发送的队列,那么相同orderId的消息会被发送在同一个队列中
return mqs.get(queue);
}
}, orderId);
};
System.out.println("发送完成");
producer.shutdown();
}
这里的区别主要是调用了 SendResult send(Message msg, MessageQueueSelector selector, Object arg)
方法,MessageQueueSelector
是队列选择器,arg
是一个 Java Object 对象,可以传入作为消息发送分区的分类标准
MessageQueueSelector的接口如下:
public interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); }
其中 mqs 是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。
生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序
2️⃣ 消费者代码
/**
* 接收需要变成单线程接收
*/
@Test
public void Consumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");
consumer.setNamesrvAddr(export.NameSrv);
consumer.subscribe("OrderTopic","*");
//使用顺序消费MessageListenerOrderly,这里并不是说只开启一个线程,而是保证同一个orderId的消息在一个线程中读取
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//输出当前线程id
System.out.println(Thread.currentThread().getId());
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
Thread.sleep(100000);
consumer.shutdown();
}
运行代码
顺序消息的一致性
如果一个Broker掉线,那么此时队列总数是否会发化?
如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o
参数(--order)为true,表示顺序消息:
$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]
其次要保证NameServer中的配置 orderMessageEnable
和 returnOrderTopicConfigToBroker
必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。
7️⃣事务发送
⭐️消费方式
1️⃣Push消费
在快速入门就使用的是 push
方法来获取消息,也介绍了步骤和代码
设置集群模式和广播模式
我们可以通过以下代码来设置采用集群模式,RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。
consumer.setMessageModel(MessageModel.CLUSTERING);
通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。
consumer.setMessageModel(MessageModel.BROADCASTING);
消息过滤
Tag过滤
Tag在生产者章节已经介绍过,用于对某个Topic下的消息进行分类。生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:
- 订单消息
- 支付消息
- 物流消息
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:
- 支付系统:只需订阅支付消息。
- 物流系统:只需订阅物流消息。
- 实时计算系统:需要订阅所有和交易相关的消息。
- 交易成功率分析系统:需订阅订单和支付消息。
过滤示意图如下所示
对于物流系统和支付系统来说,它们都只订阅单个Tag,此时只需要在调用subcribe接口时明确标明Tag即可。
consumer.subscribe("TagFilterTest", "TagA");
对于实时计算系统来说,它订阅交易Topic下所有的消息,Tag用星号(*)表示即可。
consumer.subscribe("TagFilterTest", "*");
对于交易成功率分析系统来说,它订阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔即可。
consumer.subscribe("TagFilterTest", "TagA||TagB");
这里需要注意的是,如果同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准。
//如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("TagFilterTest", "TagA");
consumer.subscribe("TagFilterTest", "TagB");
代码演示
1️⃣ 生产者代码
@Test
public void Tags() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("TagProducerGroup");
producer.setNamesrvAddr(export.NameSrv);
producer.start();
//设置tag
Message message1 = new Message("TagsTopic","tag1","tag1的消息".getBytes());
Message message2 = new Message("TagsTopic","tag2","tag2的消息".getBytes());
producer.send(message2);
producer.send(message1);
System.out.println("发送完毕");
producer.shutdown();
}
使用的构建方法为:
public Message(String topic, String tags, byte[] body) { this(topic, tags, "", 0, body, true); }
2️⃣ 消费者代码
/**
* 接收带tag1的消息的消费者
*/
@Test
public void Consumer1() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagsConsumerGroup");
consumer.setNamesrvAddr(export.NameSrv);
consumer.subscribe("TagsTopic","tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt x = msgs.get(0);
System.out.println(new String(x.getBody()));
System.out.println(x.getTags());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(100000);
consumer.shutdown();
}
/**
* 接收带tag2和tag1的消息的消费者
*/
@Test
public void Consumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagsConsumerGroup");
consumer.setNamesrvAddr(export.NameSrv);
consumer.subscribe("TagsTopic","tag2 || tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt x = msgs.get(0);
System.out.println(new String(x.getBody()));
System.out.println(x.getTags());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(100000);
consumer.shutdown();
}
SQL92过滤
SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。
Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 开启属性过滤首先要在Broker端设置配置enablePropertyFilter=true,该值默认为false。
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:
- 订单消息
- 物流消息
- 物流消息且地域为杭州
- 物流消息且地域为上海
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:
- 物流系统1:只需订阅物流消息且消息地域为杭州。
- 物流系统2:只需订阅物流消息且消息地域为杭州或上海。
- 订单跟踪系统:只需订阅订单消息。
SQL92过滤示意图如下所示:
地域将作为自定义属性设置在消息中。
- 消息发送端: 设置消息的自定义属性。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("a", "1");
- 消息消费端: 使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
消息唯一性(keys)
在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询
1️⃣ 生产者代码
通过消息对象来设置 key
值
@Test
public void testKeyProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
Message msg = new Message("TopicTest","tagA","key", "我是一个带标记和key的消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}
2️⃣ 消费者代码
@Test
public void testKeyConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
consumer.subscribe("TopicTest", "tagA || tagB || tagC");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
System.out.println(msgs.get(0).getTags());
System.out.println(msgs.get(0).getKeys());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
2️⃣Pull消费
在RocketMQ中有两种Pull方式,一种是比较原始 Pull Consumer
,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是 Lite Pull Consumer
,它提供了Subscribe和Assign两种方式,使用起来更加方便。
Pull Consumer
Pull Consumer示例如下
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("TopicTest");
mq.setBrokerName("jinrongtong-MacBook-Pro.local");
long offset = 26;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
System.out.printf("%s%n", pullResult.getMsgFoundList());
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
首先需要初始化 DefaultMQPullConsumer
并启动,然后构造需要拉取的队列 MessageQueue
,除了构造外也可以如下所示调用 fetchSubscribeMessageQueues
方法获取某个Topic的所有队列,然后挑选队列进行拉取。
Set<MessageQueue> queueSet = consumer.fetchSubscribeMessageQueues("TopicTest");
找到或者构造完队列之后,调用pull方法就可以进行拉取,需要传入拉取的队列,过滤表达式,拉取的位点,最大拉取消息条数等参数。拉取完成后会返回拉取结果 PullResult
,PullResult中的PullStatus表示结果状态,如下所示
public enum PullStatus {
/**
* Founded
*/
FOUND,
/**
* No new message can be pull
*/
NO_NEW_MSG,
/**
* Filtering results can not match
*/
NO_MATCHED_MSG,
/**
* Illegal offset,may be too big or too small
*/
OFFSET_ILLEGAL
}
FOUND表示拉取到消息,NO_NEW_MSG表示没有发现新消息,NO_MATCHED_MSG表示没有匹配的消息,OFFSET_ILLEGAL表示传入的拉取位点是非法的,有可能偏大或偏小。如果拉取状态是FOUND,我们可以通过 pullResult
的 getMsgFoundList
方法获取拉取到的消息列表。最后,如果消费完成,通过 updateConsumeOffset
方法更新消费位点。
Lite Pull Consumer
Lite Pull Consumer是RocketMQ 4.6.0推出的Pull Consumer,相比于原始的Pull Consumer更加简单易用,它提供了Subscribe和Assign两种模式,Subscribe模式示例如下
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
首先还是初始化 DefaultLitePullConsumer
并设置 ConsumerGroupName
,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer
拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过 setPullBatchSize
可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer
默认是自动提交位点。在subscribe模式下,同一个消费组下的多个 LitePullConsumer
会负载均衡消费,与PushConsumer一致。
如下是Assign模式的示例
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
Assign模式一开始仍然是初始化 DefaultLitePullConsumer
,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列,因此在例子中,先用fetchMessageQueues获取了Topic下的队列,再取前面的一半队列进行拉取,示例中还调用了seek方法,将第一个队列拉取的位点设置从10开始。紧接着进入循环不停地调用poll方法拉取消息,拉取到消息后调用commitSync方法手动提交位点。