当前位置 博文首页 > 龚厂长的博客:RocketMQ-DefaultMQPullConsumer消费消息原理详解
本文基于RocketMQ 4.7.1版本
在文章《RocketMQ-如何创建消费者》中展示如何使用DefaultMQPullConsumer与DefaultMQPushConsumer消费消息。DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。这给开发带来了极大的自由,同时由于都是自己维护,也给编程工作带来了困难。下面来介绍一下DefaultMQPullConsumer的内部原理。
首先我们将《RocketMQ-如何创建消费者》文章中已经介绍过的代码在这里再展示一遍,不过代码中会省略一些内容,只展示关键信息。
private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer-A");
consumer.start();//启动
//获取主题下所有的消息队列,这里是根据主题从nameserver获取的
//这里我们可以修改为从其他位置获取队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
//遍历队列
for(MessageQueue mq:mqs){
try {
//获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取
//true表示从broker获取
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
//第二个参数表示可以消费哪些tag的消息
//第三个参数表示从哪个位移开始消费消息
//第四个参数表示一次最大拉多少个消息
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
//代码省略,这里是记录消息位移
//代码省略,这里是消费消息
}
} catch (Exception e) {
}
}
consumer.shutdown();
}
//保存下次消费消息的位移,这里将位移保存到内存,也可以使用数据库
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {}
//获取本次要消费消息的位移
private static Long getMessageQueueOffset(MessageQueue mq) {}
本文将针对上面的代码介绍DefaultMQPullConsumer的几个关键方法执行原理。