当前位置 博文首页 > 罗西的思考:Dyno-queues 分布式延迟队列 之 基本功能
本系列我们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,通过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。
Dyno-queues 是 Netflix 实现的基于 Dynomite 和 Redis 构建的队列。
Dynomite是一种通用的实现,可以与许多不同的 key-value 存储引擎一起使用。目前它提供了对Redis序列化协议(RESP)和Memcached写协议的支持。
具体设计目标依据业务系统不同而不同。
Dyno-queues 的业务背景是:在 Netflix 的平台上运行着许多的业务流程,这些流程的任务是通过异步编排进行驱动,现在要实现一个分布式延迟队列,这个延迟队列具有如下特点:
Netflix 选择 Dynomite,是因为:
Netflix选择Redis作为构建队列的存储引擎是因为:
查询模型:基于Key-Value模型,而不是SQL,即关系模型。存储对象比较小。
ACID属性:传统的关系数据库中,用ACID(A原子性、C一致性、I 隔离性、D持久性)来保证事务,在保证ACID的前提下往往有很差的可用性。Dynamo用弱一致性C来达到高可用,不提供数据隔离 I,只允许单Key更新。
其实所有的高可用,是可以依赖于RPC和存储的高可用来实现的。
Netflix 选择 Dynomite,是因为:
所以 Dyno-queues 的高可用就自动解决了。
怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息自然是幂等的。就算有单点故障,其他节点可以立刻顶上。
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。需要保证每一个分区内的高可用性,也就是每一个分区至少要有一个主备且需要做数据的同步。
Dynomite 使用 redis 集群这个共享存储 做了幂等保证。
消息到达服务端后,如果不经过任何处理就到接收者,broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求,把消息存储下来,然后选择时机投递就显得是顺理成章的了。
这个存储可以做成很多方式。比如存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),并且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几次failover,最终投递出去也未尝不可。
Dynomite 使用 redis 集群这个共享存储 在一定程度上缓解了消息堆积问题。
我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上,从速度来看,文件系统 > 分布式KV(持久化)> 分布式文件系统 > 数据库,而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择。
如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高,但对性能和量的要求没有这么高,而且没有时间精力专门做文件存储系统的研究,DB是最好的选择。
但是DB受制于IOPS,如果要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。整体上可以采用数据文件 + 索引文件的方式处理。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由于其编程接口较友好,性能也比较可观,如果在可靠性要求不是那么高的场景,也不失为一个不错的选择。
因为 场景是 可靠性要求不那么高,所以 Dynomite 使用 redis 集群这个存储子系统 也是可以的。
下一个重要的事情就是解析发送接收关系,进行正确的消息投递了。抛开现象看本质,发送接收关系无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。
一般比较通用的设计是支持组间广播,不同的组注册不同的订阅。组内的不同机器,如果注册一个相同的ID,则单播;如果注册不同的ID(如IP地址+端口),则广播。
至于广播关系的维护,一般由于消息队列本身都是集群,所以都维护在公共存储上,如 config server、zookeeper等。维护广播关系所要做的事情基本是一致的:
本文后续会介绍如何维护发送关系。
数据分片的逻辑既可以实现在客户端,也可以实现在 Proxy
层,取决于你的架构如何设计。
传统的数据库中间件大多将分片逻辑实现在客户端,通过改写物理 SQL
访问不同的 MySQL
库;而在 NewSQL
数据库倡导的计算存储分离架构中,通常将分片逻辑实现在计算层,即 Proxy
层,通过无状态的计算节点转发用户请求到正确的存储节点。
在 Dynomite 之中,队列根据可用区域进行分片,将数据推送到队列时,通过轮训机制确定分片,这种机制可以确保所有分片的数据是平衡的,每个分片都代表Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的组合。
public class RoundRobinStrategy implements ShardingStrategy {
private final AtomicInteger nextShardIndex = new AtomicInteger(0);
/**
* Get shard based on round robin strategy.
* @param allShards
*/
@Override
public String getNextShard(List<String> allShards, Message message) {
int index = nextShardIndex.incrementAndGet();
if (index >= allShards.size()) {
nextShardIndex.set(0);
index = 0;
}
return allShards.get(index);
}
}
Dyno-queues 队列是在 Dynomite 的JAVA客户端 Dyno 之上建立的,Dyno 为持久连接提供连接池,并且可以配置为拓扑感知。关于 Dyno 具体可以参见前文:
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)
Dyno为应用程序提供特定的本地机架(在AWS中,机架是一个区域,例如 us-east-1a、us-east-1b等),us-east-1a的客户端将连接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种情况下该客户端将进行故障转移。这个属性被用于通过区域划分队列。
队列根据可用区域进行分片,将数据推送到队列时,通过轮训机制确定分片,这种机制可以确保所有分片的数据是平衡的,每个分片都代表Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的组合。
具体机制举例如下:
public class RoundRobinStrategy implements ShardingStrategy {
private final AtomicInteger nextShardIndex = new AtomicInteger(0);
/**
* Get shard based on round robin strategy.
* @param allShards
*/
@Override
public String getNextShard(List<String> allShards, Message message) {
int index = nextShardIndex.incrementAndGet();
if (index >= allShards.size()) {
nextShardIndex.set(0);
index = 0;
}
return allShards.get(index);
}
}
在分布式系统中有个CAP理论,对于P(分区容忍性)而言,是实际存在 从而无法避免的。因为分布系统中的处理不是在本机,而是网络中的许多机器相互通信,故网络分区、网络通信故障问题无法避免。因此,只能尽量地在C 和 A 之间寻求平衡。
对于数据存储而言,为了提高可用性(Availability),采用了副本备份,比如对于HDFS,默认每块数据存三份。某数据块所在的机器宕机了,就去该数据块副本所在的机器上读取(从这可以看出,数据分布方式是按“数据块”为单位分布的)
但是问题来了,当需要修改数据时,就需要更新所有的副本数据,这样才能保证数据的一致性(Consistency)。因此,就需要在 C(Consistency) 和 A(Availability) 之间权衡。
而Quorum机制,就是这样的一种权衡机制,一种将“读写转化”的模型。
显然,我们更想要做到强一致性的这种效果,那么有哪些方式可以实现呢,其中最为简单直接的就是 WARO,也就是Write All Read one。
WARO 是一种简单的副本控制协议,当 Client 请求向某副本写数据时(更新数据),只有当所有的副本都更新成功之后,这次写操作才算成功,否则视为失败。这样的话,只需要读任何一个副本上的数据即可。但是WARO带来的影响是写服务的可用性较低,因为只要有一个副本更新失败,此次写操作就视为失败了。
Quorum 的定义如下:假设有 N 个副本,更新操作 wi 在 W 个副本中更新成功之后,则认为此次更新操作 wi 成功,把这次成功提交的更新操作对应的数据叫做:“成功提交的数据”。对于读操作而言,至少需要读 R 个副本,其中,W+R>N ,即 W 和 R 有重叠,一般,W+R=N+1。
Quorum机制认为每次写入的机器数目达到大多数(W)时,就认为本次写操作成功了。即Quorum机制能够不需要更新完全部的数据,但又保证返回给用户的是有效数据的解决方案。
我们以 ES 为例。
我们在发送任何一个增删改操作的时候,都可以带上一个consistency参数,指明我们想要的写一致性是什么。
quorum = int((primary shard+number_of_replicas)/2)+1
如果节点数少于quorum,可能导致querum不齐全,进而导致无法执行任何写操作。quorum不齐全时,会进行等待。默认等待时间为1分钟,期待活跃的shard数量可以增加,最后实在不行,就会timeout。
Dynomite 能够将最终一致性(eventual consistency)扩展为协调一致性(tunable consistency)。
关于QUORUM,Dynomite有如下配置:
由测试得到的结果,Dynomite能从3,6,12,24一路扩展到48个节点,在DC_ONE和DC_QUORUM模式下,吞吐率都能线性地增长。与此同时,Dynomite在延迟方面只增加了很少的开支,即便在DC_QUORUM模式下,(延迟)也只有几毫秒。DC_QUORUM模式在延迟和吞吐量方面处于劣势,但是能为客户提供更好的读写保证。
对于Dyno-queues来说,则是在实现中有所体现。比如在 RedisQueues 中,有如下成员变量:
private final JedisCommands quorumConn;
private final JedisCommands nonQuorumConn;
在构建 RedisQueues 时,就需要注明使用哪一种。
而从注释中我们可知,
@param quorumConn
Dyno connection with dc_quorum enabled,就是 采用了Quorum的Redis;@param nonQuorumConn
Dyno connection to local Redis,就是本地Redis;生成 RedisQueues 的代码如下(注意其中注释):
/**
* @param quorumConn Dyno connection with dc_quorum enabled
* @param nonQuorumConn Dyno connection to local Redis
*/
public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) {
this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy);
}
在有分片时,就从nonQuorumConn(就是本地Redis)提取。
使用nonQuorumConn来预取的原因是:最终一致性(eventual consistency)。
因为 replication lag,在某一时刻不同分片的数据可能不一样,所以需要先预取。这就需要使用 nonQuorumConn 来预取,因为本地 redis 的数据才是正确的。
private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}
再比如处理没有 ack 的消息时,先从 nonQuorumConn 读取信息ID,再从 quorumConn 读取消息内容。
这就是因为一致性导致的,所以如下:
@Override
public void processUnacks() {
execute("processUnacks", keyName, () -> {
Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);
for (Tuple unack : unacks) {
double score = unack.getScore();
String member = unack.getElement();
String payload = quorumConn.hget(messageStoreKey, member);
long added_back = quorumConn.zadd(localQueueShard, score, member);
}
});
}
再比如从本地提取消息就使用了 nonQuorumConn。
@Override
public Message localGet(String messageId) {
try {
return execute("localGet", messageStoreKey, () -> {
String json = nonQuorumConn.hget(messageStoreKey, messageId);
Message msg = om.readValue(json, Message.class);
return msg;
});
}
}
再比如 popWithMsgIdHelper 也是先读取 nonQuorumConn,再从 quorumConn 读取其他内容。
public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {
try {
return execute("popWithMsgId", targetShard, () -> {
String queueShardName = getQueueShardKey(queueName, targetShard);
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
String unackShardName = getUnackKey(queueName, targetShard);
ZAddParams zParams = ZAddParams.zAddParams().nx();
Long exists = nonQuorumConn.zrank(queueShardName, messageId);
// If we get back a null type, then the element doesn't exist.
if (exists == null) {
// We only have a 'warnIfNotExists' check for this call since not all messages are present in
// all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0',
// we may have hit an inconsistency (because it's in the queue, but other calls have failed),
// so make sure to log those.
monitor.misses.increment();
return null;
}
String json = quorumConn.hget(messageStoreKey, messageId);
if (json == null) {
monitor.misses.increment();
return null;
}
long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
if (added == 0) {
monitor.misses.increment();
return null;
}
long removed = quorumConn.zrem(queueShardName, messageId);
if (removed == 0) {
monitor.misses.increment();
return null;
}
Message msg = om.readValue(json, Message.class);
return msg;
});
}
}
RedisQueues是为用户提供的外部接口,从其成员变量可以看出来其内部机制,比如各种策略。
public class RedisQueues implements Closeable {
private final Clock clock;
private final JedisCommands quorumConn;
private final JedisCommands nonQuorumConn;
private final Set<String> allShards;
private final String shardName;
private final String redisKeyPrefix;
private final int unackTime;
private final int unackHandlerIntervalInMS;
private final ConcurrentHashMap<String, DynoQueue> queues;
private final ShardingStrategy shardingStrategy;
private final boolean singleRingTopology;
}
用户通过get方法来得到DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")
。
public DynoQueue get(String queueName) {
String key = queueName.intern();
return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
.withUnackTime(unackTime)
.withNonQuorumConn(nonQuorumConn)
.withQuorumConn(quorumConn));
}
我们看看 Dyno-queues 中几种数据结构。
一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。
Dyno-queues 只有服务端落地的可靠消息。每个延时消息必须包括以下参数:
public class Message {
private String id;
private String payload;
private long timeout;
private int priority;
private String shard;
}
Dyno-queues 关于存储的总体思路是:hash 记录消息内容, zset 实现按到期时间排序的队列
,即:
具体逻辑如图,这里的虚线指的是两者通过 msg id 来进行逻辑上的管理,物理没有关联:
+----------+----------+----------+-----+----------+
| | | | | |
zset | msg id 1 | msg id 2 | msg id 3 | ... | msg id n |
| | | | | |
+---+------+----+-----+----+-----+-----+----+-----+
| | | |
| | | |
v v v v
+---+---+ +---+---+ +--+----+ +--+--+
hash | msg 1 | | msg 2 | | msg 3 | |msg n|
+-------+ +-------+ +-------+ +-----+
具体到代码,则是:
quorumConn.hset(messageStoreKey, message.getId(), json);
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
具体参见如下:
for (Message message : messages) {
String json = om.writeValueAsString(message);
quorumConn.hset(messageStoreKey, message.getId(), json);
double priority = message.getPriority() / 100.0;
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
String shard = shardingStrategy.getNextShard(allShards, message);
String queueShard = getQueueShardKey(queueName, shard);
quorumConn.zadd(queueShard, score, message.getId());
}
RedisDynoQueue是 Dyno-queues 延迟队列的主要实现。
从Redis角度来看,对于每个队列,维护三组Redis数据结构:
这三组Redis数据结构在RedisDynoQueue内部其实没有对应的成员变量,对于RedisDynoQueue 来说,看起来是逻辑概念,而事实上它们存在于Redis的内部存储中,由Dynomite负责高可用等等。
具体如下:
message list
zset +----------+----------+----------+-----+----------+
| | | | | |
| msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 |
| | | | | |
+---+------+----+-----+----+-----+-----+----+-----+
| | | |
| | | |
v v v v
hash +---+---+ +---+---+ +--+----+ +--+--+
| msg 1 | | msg 2 | | msg 3 | |msg 9|
+-------+ +-------+ +-------+ +-----+
unack list
+------------+-------------+--------------+
zset | | | |
| msg id 11 | msg id 12 | msg id 13 |
| | | |
+------------+-------------+--------------+
RedisDynoQueue 的成员变量可以分类如下:
ObjectMapper om:用来把消息序列化,写到redis中;
Clock clock:用以为分数生成时间戳;
String redisKeyPrefix:每个queue的用户会给自己定义key;
String messageStoreKey:对于每个Redis hash来说,可以设定自己的field(字段),比如:
this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
quorumConn.hget(messageStoreKey, messageId)
List
String localQueueShard:本地分区;
ShardingStrategy shardingStrategy:分区策略;
ConcurrentLinkedQueue
Map<String, ConcurrentLinkedQueue
this.unsafePrefetchedIdsAllShardsMap = new HashMap<>();
for (String shard : allShards) {
unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>());
}
int retryCount = 2:重试次数;
int unackTime = 60:用以生成ack队列的分数。
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
ScheduledExecutorService schedulerForUnacksProcessing:用以生成线程,来定期ack
schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
if (this.singleRingTopology) {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}
QueueMonitor monitor:监控与统计;
具体代码如下:
public class RedisDynoQueue implements DynoQueue {
private final Clock clock;
private final String queueName;
private final List<String> allShards;
private final String shardName;
private final String redisKeyPrefix;
private final String messageStoreKey;
private final String localQueueShard;
private volatile int unackTime = 60;
private final QueueMonitor monitor;
private final ObjectMapper om;
private volatile JedisCommands quorumConn;
private volatile JedisCommands nonQuorumConn;
private final ConcurrentLinkedQueue<String> prefetchedIds;
private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;
private final ScheduledExecutorService schedulerForUnacksProcessing;
private final int retryCount = 2;
private final ShardingStrategy shardingStrategy;
private final boolean singleRingTopology;
}
至此,Dyno-queues 基本功能初步分析完毕,我们下期继续介绍消息产生,消费。
干货分享 | 如何从零开始设计一个消息队列
消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列
消息队列设计精要
有赞延迟队列设计
基于Dynomite的分布式延迟队列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)
原创 Amazon Dynamo系统架构
Netlix Dynomite性能基准测试,基于AWS和Redis
为什么分布式一定要有延时任务?