当前位置 博文首页 > 龚厂长的博客:RocketMQ-DefaultMQPullConsumer消费消息原理详解

    龚厂长的博客:RocketMQ-DefaultMQPullConsumer消费消息原理详解

    作者:[db:作者] 时间:2021-07-26 17:48

    本文基于RocketMQ 4.7.1版本

    在文章《RocketMQ-如何创建消费者》中展示如何使用DefaultMQPullConsumer与DefaultMQPushConsumer消费消息。DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。这给开发带来了极大的自由,同时由于都是自己维护,也给编程工作带来了困难。下面来介绍一下DefaultMQPullConsumer的内部原理。
    首先我们将《RocketMQ-如何创建消费者》文章中已经介绍过的代码在这里再展示一遍,不过代码中会省略一些内容,只展示关键信息。

    private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
    public static void main(String[] args) throws MQClientException {
    	//创建消费者
    	DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer-A");
    	consumer.start();//启动
    	//获取主题下所有的消息队列,这里是根据主题从nameserver获取的
    	//这里我们可以修改为从其他位置获取队列信息
    	Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
    	//遍历队列
    	for(MessageQueue mq:mqs){
    		try {
    			//获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取
    			//true表示从broker获取
    			long offset = consumer.fetchConsumeOffset(mq,true);
    			while(true){
    				//第二个参数表示可以消费哪些tag的消息
    				//第三个参数表示从哪个位移开始消费消息
    				//第四个参数表示一次最大拉多少个消息
    				PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, 
    						getMessageQueueOffset(mq), 32);
    				//代码省略,这里是记录消息位移
    				//代码省略,这里是消费消息
    			}
    		} catch (Exception e) {
    		}
    	}
    	consumer.shutdown();
    }
    //保存下次消费消息的位移,这里将位移保存到内存,也可以使用数据库
    private static void putMessageQueueOffset(MessageQueue mq,
    		long nextBeginOffset) {}
    
    //获取本次要消费消息的位移
    private static Long getMessageQueueOffset(MessageQueue mq) {}
    

    本文将针对上面的代码介绍DefaultMQPullConsumer的几个关键方法执行原理。

    文章目录

    • 一、启动DefaultMQPullConsumer
    • 二、获取主题下读队列信息fetchSubscribeMessageQueues
    • 三、获取队列消费位移fetchConsumeOffset
    • 四、拉取消息pullBlockIfNotFound
    cs
    下一篇:没有了