当前位置 博文首页 > Ocean曈的博客:RocketMq之存储模型

    Ocean曈的博客:RocketMq之存储模型

    作者:[db:作者] 时间:2021-06-21 12:40

    (一)RocketMq简介

    在当前微服务大行其道的时代,对于消息中间件的高可用、高吞吐量、以及消息触达率都有着更高要求。而rocketmq在阿里的双十一这种亿级流量的大规模部署的集群环境中应运而生。让它天生带有了集群的功能特性,并且对标了业界成名已久的老大哥级别的kafka这一产品。其实很多思路也是借鉴kafka的。其他的废话我也不多说,想知道更多rocketmq的相关性能秀的同学可以去看官网
    话不多说,还是直接进入今天的主题,rocketmq的集群设计到底是怎样的。

    (二)角色介绍

    RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

    producer

    负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

    consumer

    负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

    nameserver

    名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。单独提一句nameserver是一个无状态的路由服务器

    broker

    消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

    (三)集群设计

    在这里插入图片描述

    从上图我们看出 broker、producer、consumer、nameserver都是可以集群部署的。那么作为一个消息中间件,rocketmq是如何做到集群环境下让consumer正确消费,并且把流量打到不同的服务上的呢?这里就要引入两个新的概念:topic跟queue。

    topic、queue

    什么是topic?按照官方的解释是:表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
    那么queue是什么?也可以用官方的话来解释:Message Queue 用于存储消息的物理地址,但是queue其实不是真正的物理地址,真正的物理地址,翻阅源码来看每个broker中将所有的topice写在了commitLog类型的文件中,而每个message queue对应一个逻辑的索引地址,指向commitlog文件,标识commitlog文件中哪些消息是当前queue下面的,已经消息的offset。

    每个topic包含至少一个以上的queue,默认创建一个topic,就自动创建4个queue与之对应。

    在这里插入图片描述
    从上图我们可以看到每个topic有多个对应的queue。这些queue分布在不同的broker下面。用多个queue的分步不同机器的方式来达到负载。但是这个时候就产生了一个问题?作为一个consumer,或者producer,订阅了一个topic之后,是如何消费消息的?

    (四)两种消费模式

    1.cluster 集群模式

    集群消费是每个订阅topic每一个consumer分组都能成功消费一次。这种确保每个group分组能消费一次,由broker确保。
    不同的消费者group之间,消费互不影响。意思就是,一个消费组groupA已经消费了Topic28条消息,如果此时重启一个消费组groupB,还是会从0开始消费,一直消费到最后一条。

    2.broadcast 广播模式

    广播模式,group中的每一个consumer都能消费到一次。这种消费确认由每个consumer自己确定。

    (五)消息存储机制

    按照topic的分片模型我们可以看到,一个topic对应多个queue,一个broker对应多个topic。但是只是rocketmq的逻辑储存模型,而实际的储存模式并非如此。

    在这里插入图片描述

    commitLog: 每个broker都会有commitlog,所有消息的实际储存在当前的commitlog文件中。
    consumequeue:每个topic会在cosumequeue文件中生成一个对应名字的文件夹,在文件夹中根据topic下面的queue id生成相应的文件夹。文件夹里面根据相应的commitLog文件生成对应的索引文件。
    在这里插入图片描述
    index:跟consumequeue类似,不过index记录的按照时间顺序的对commitLog 里面信息的索引。

    config:记录broker的一些信息,记录每个consumer集群,进行集群消费时对于topic消费offset,延时消费offset,broker中topic的信息。
    在这里插入图片描述
    上图就是一个consumer记录在 broker中的消费进度记录。其中记录了包括每个topic下consumer集群在每个queue下面的消费进度。

    (六)源码分析

    源码git地址:https://github.com/apache/rocketmq.git
    压缩包地址:https://github.com/apache/rocketmq/archive/master.zip
    直接通过idea打开,然后根据maven的方式引入一下包即可,其目录结构如下
    在这里插入图片描述

    1.broker是如何写commitlog源码解析

    我们可以从源码中看到broker文件下,org.apache.rocketmq.broker.BrokerStartup 这个类即为broker的启动类。

    主要刷盘逻辑在 org.apache.rocketmq.store.DefaultMessageStore

    我们先看看其构造方法:

    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
            final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
            this.messageArrivingListener = messageArrivingListener;
            this.brokerConfig = brokerConfig;
            this.messageStoreConfig = messageStoreConfig;
            this.brokerStatsManager = brokerStatsManager;
            this.allocateMappedFileService = new AllocateMappedFileService(this);
            //主要msg刷盘逻辑
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
            	// 用raft协议实现的 高可用主备同步方案,主节点挂了能自动选举恢复新节点
                this.commitLog = new DLedgerCommitLog(this);
            } else {
            	//刷盘类
                this.commitLog = new CommitLog(this);
            }
            this.consumeQueueTable = new ConcurrentHashMap<>(32);
    		//持有所有的conumer的消费消息的记录情况
            this.flushConsumeQueueService = new FlushConsumeQueueService();
            this.cleanCommitLogService = new CleanCommitLogService();
            this.cleanConsumeQueueService = new CleanConsumeQueueService();
            //记录当前msg的刷盘状态,节点关掉,重新启动时可用
            this.storeStatsService = new StoreStatsService();
            //记录msg的时间索引
            this.indexService = new IndexService(this);
            //高可用实现方案
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                this.haService = new HAService(this);
            } else {
                this.haService = null;
            }
            this.reputMessageService = new ReputMessageService();
    
          。。。。。。。
        }
    

    在org.apache.rocketmq.store.CommitLog#CommitLog

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
         //所有在/store/commitlog 下面的所有commitlog 文件队列
         this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
             defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
         this.defaultMessageStore = defaultMessageStore;
         //刷盘操作service
         if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
             this.flushCommitLogService = new GroupCommitService();
         } else {
             this.flushCommitLogService = new FlushRealTimeService();
         }
         //异步刷盘service,如果是刷盘是SYNC_FLUSH同步模式,这个类就没用  
         this.commitLogService = new CommitRealTimeService();
         
         this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
         batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
             @Override
             protected MessageExtBatchEncoder initialValue() {
                 return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
             }
         };
         this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    
     }
    

    然后我根据DefaultMessageStore里面的源码画了一张详细的代码架构逻辑图

    在这里插入图片描述

    最终无论是commitlog 、 consumerQueue、index 的刷盘的逻辑会落在MappedFile里面最终实现。通过MappedByteBuffer 内存映射技术实现Zero Copy刷盘方案,保证了rockmq高效率的刷盘。