当前位置 博文首页 > Ocean曈的博客:RocketMq之存储模型
在当前微服务大行其道的时代,对于消息中间件的高可用、高吞吐量、以及消息触达率都有着更高要求。而rocketmq在阿里的双十一这种亿级流量的大规模部署的集群环境中应运而生。让它天生带有了集群的功能特性,并且对标了业界成名已久的老大哥级别的kafka这一产品。其实很多思路也是借鉴kafka的。其他的废话我也不多说,想知道更多rocketmq的相关性能秀的同学可以去看官网
话不多说,还是直接进入今天的主题,rocketmq的集群设计到底是怎样的。
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。单独提一句nameserver是一个无状态的路由服务器
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
从上图我们看出 broker、producer、consumer、nameserver都是可以集群部署的。那么作为一个消息中间件,rocketmq是如何做到集群环境下让consumer正确消费,并且把流量打到不同的服务上的呢?这里就要引入两个新的概念:topic跟queue。
什么是topic?按照官方的解释是:表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
那么queue是什么?也可以用官方的话来解释:Message Queue 用于存储消息的物理地址,但是queue其实不是真正的物理地址,真正的物理地址,翻阅源码来看每个broker中将所有的topice写在了commitLog类型的文件中,而每个message queue对应一个逻辑的索引地址,指向commitlog文件,标识commitlog文件中哪些消息是当前queue下面的,已经消息的offset。
每个topic包含至少一个以上的queue,默认创建一个topic,就自动创建4个queue与之对应。
从上图我们可以看到每个topic有多个对应的queue。这些queue分布在不同的broker下面。用多个queue的分步不同机器的方式来达到负载。但是这个时候就产生了一个问题?作为一个consumer,或者producer,订阅了一个topic之后,是如何消费消息的?
集群消费是每个订阅topic每一个consumer分组都能成功消费一次。这种确保每个group分组能消费一次,由broker确保。
不同的消费者group之间,消费互不影响。意思就是,一个消费组groupA已经消费了Topic28条消息,如果此时重启一个消费组groupB,还是会从0开始消费,一直消费到最后一条。
广播模式,group中的每一个consumer都能消费到一次。这种消费确认由每个consumer自己确定。
按照topic的分片模型我们可以看到,一个topic对应多个queue,一个broker对应多个topic。但是只是rocketmq的逻辑储存模型,而实际的储存模式并非如此。
commitLog: 每个broker都会有commitlog,所有消息的实际储存在当前的commitlog文件中。
consumequeue:每个topic会在cosumequeue文件中生成一个对应名字的文件夹,在文件夹中根据topic下面的queue id生成相应的文件夹。文件夹里面根据相应的commitLog文件生成对应的索引文件。
index:跟consumequeue类似,不过index记录的按照时间顺序的对commitLog 里面信息的索引。
config:记录broker的一些信息,记录每个consumer集群,进行集群消费时对于topic消费offset,延时消费offset,broker中topic的信息。
上图就是一个consumer记录在 broker中的消费进度记录。其中记录了包括每个topic下consumer集群在每个queue下面的消费进度。
源码git地址:https://github.com/apache/rocketmq.git
压缩包地址:https://github.com/apache/rocketmq/archive/master.zip
直接通过idea打开,然后根据maven的方式引入一下包即可,其目录结构如下
我们可以从源码中看到broker文件下,org.apache.rocketmq.broker.BrokerStartup 这个类即为broker的启动类。
主要刷盘逻辑在 org.apache.rocketmq.store.DefaultMessageStore
我们先看看其构造方法:
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
//主要msg刷盘逻辑
if (messageStoreConfig.isEnableDLegerCommitLog()) {
// 用raft协议实现的 高可用主备同步方案,主节点挂了能自动选举恢复新节点
this.commitLog = new DLedgerCommitLog(this);
} else {
//刷盘类
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
//持有所有的conumer的消费消息的记录情况
this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
//记录当前msg的刷盘状态,节点关掉,重新启动时可用
this.storeStatsService = new StoreStatsService();
//记录msg的时间索引
this.indexService = new IndexService(this);
//高可用实现方案
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
this.reputMessageService = new ReputMessageService();
。。。。。。。
}
在org.apache.rocketmq.store.CommitLog#CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
//所有在/store/commitlog 下面的所有commitlog 文件队列
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
//刷盘操作service
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
//异步刷盘service,如果是刷盘是SYNC_FLUSH同步模式,这个类就没用
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
然后我根据DefaultMessageStore里面的源码画了一张详细的代码架构逻辑图
最终无论是commitlog 、 consumerQueue、index 的刷盘的逻辑会落在MappedFile里面最终实现。通过MappedByteBuffer 内存映射技术实现Zero Copy刷盘方案,保证了rockmq高效率的刷盘。