当前位置 博文首页 > 罗西的思考:[源码分析] Dynomite 分布式存储引擎 之 DynoJedisC
前面我们有文章介绍了Amazon Dynamo系统架构 和 NetFlix Dynomite。
我们今天来看看 NetFlix Dynomite 的 Java 客户端 DynoJedisClient 如何实现。分析客户端是因为,此客户端的作用很类似于集群master,其思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。
因为 Dynomite 对于本文来说,过于庞大&底层,而且 DynoJedisClient 与 Dynomite 耦合过于紧密, 所以我们从最简单的功能点出发看看 DynoJedisClient,于是我们可以想到的功能点是:
所以我们接下来就围绕这些基本功能点进行分析。
亚马逊在业务发展期间面临一些问题,主要受限于关系型数据库的可扩展性和高可用性,因此研发了一套新的、基于 KV
存储模型的数据库,将之命名为 Dynamo
,其主要采取完全的分布式、去中心化的架构。
相较于传统的关系型数据库 MySQL
,Dynamo
的功能目标与之有一些细小的差别,例如: Amazon
的业务场景多数情况并不需要支持复杂查询,却要求必要的单节点故障容错性、数据最终一致性(即牺牲数据强一致优先保障可用性)、较强的可扩展性等。
Dynomite 是 NetFlix 对亚马逊分布式存储引擎 Dynamo 的一个开源通用实现,它不仅支持基于内存的 K/V 数据库,还支持持久化的 Mysql、BerkeleyDb、LevelDb 等数据库,并具有简单、高效、支持跨数据中心的数据复制等优点。
Dynomite 的最终目标是提供数据库存储引擎不能提供的简单、高效、跨数据中心的数据复制功能。目前,Dynomite 已经实现了对 Redis 和 Memcached 的支持。
Netflix选择Dynomite,是因为:
其具有性能,多数据中心复制和高可用性的特点;
Dynomite提供分片和可插拔的数据存储引擎,允许在数据需求增加垂直和水平扩展;
Dynomite在Redis之上提供了高可用性、对等复制以及一致性等特性,用于构建分布式集群队列。
Dyno为持久连接提供连接池;
Dyno可以为连接池配置为拓扑感知;
故障转移:Dyno为应用程序提供特定的本地机架,us-east-1a的客户端将连接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种情况下该客户端将进行故障转移。这个属性被用于通过区域划分队列。
Dynomite对于本文来说,过于底层。
所以我们重点就看看 DynoJedisClient 如何实现后面几点,当然,这几点其实也无法脱离Dynomite,我们只是力争剥离出来。
Data Center 是由多个Rack组成的逻辑集合。
Data Center 可以是一个机房或者一个区域的设备组合。
这是一个逻辑集合,有多个彼此临近node的组成。比如一个机架上的所有物理机器。可简单的理解为存放服务器的机柜。
数据中心与机架是什么关系呢?N:1,1:N,M:N。
由集群管理的数据就是一个环。环中的每个节点被分配一个或多个由token描述的数据范围,确定在环中的位置。
Token是用于标识每个分区的64位整数ID,范围是-2^63 -- 2^63-1。通过hash算法计算partition key的hash值,以此确定存放在哪个节点。
Token也决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环。
因为要为上层屏蔽信息,所以 DynoJedisClient 就需要应对各种复杂信息,需要对系统有深刻的了解,比如:
因此,DynoJedisClient 的思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。
示例代码如下:
public static void main(String[] args) throws IOException {
final String clusterName = args[0];
int version = Integer.parseInt(args[1]);
final DynoQueueDemo demo = new DynoQueueDemo(clusterName, "us-east-1e");
Properties props = new Properties();
props.load(DynoQueueDemo.class.getResourceAsStream("/demo.properties"));
for (String name : props.stringPropertyNames()) {
System.setProperty(name, props.getProperty(name));
}
try {
demo.initWithRemoteClusterFromEurekaUrl(args[0], 8102, false);
if (version == 1) {
demo.runSimpleV1Demo(demo.client);
} else if (version == 2) {
demo.runSimpleV2QueueDemo(demo.client);
}
Thread.sleep(10000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
demo.stop();
logger.info("Done");
}
}
以及辅助函数:
public void initWithRemoteClusterFromEurekaUrl(final String clusterName, final int port, boolean lock) throws Exception {
initWithRemoteCluster(clusterName, getHostsFromDiscovery(clusterName), port, lock);
}
private void initWithRemoteCluster(String clusterName, final List<Host> hosts, final int port, boolean lock) throws Exception {
final HostSupplier clusterHostSupplier = () -> hosts;
if (lock)
initDynoLockClient(clusterHostSupplier, null, "test", clusterName);
else
init(clusterHostSupplier, port, null);
}
public void initDynoLockClient(HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier, String appName,
String clusterName) {
dynoLockClient = new DynoLockClient.Builder().withApplicationName(appName)
.withDynomiteClusterName(clusterName)
.withTimeoutUnit(TimeUnit.MILLISECONDS)
.withTimeout(10000)
.withHostSupplier(hostSupplier)
.withTokenMapSupplier(tokenMapSupplier).build();
}
在 DynoJedisClient 之中,有如下重要配置类。
ConnectionPoolConfigurationImpl主要是提供缺省配置。
public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfiguration {
// DEFAULTS
private static final LoadBalancingStrategy DEFAULT_LB_STRATEGY = LoadBalancingStrategy.TokenAware;
private static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.NONE;
private HostSupplier hostSupplier;
private TokenMapSupplier tokenSupplier;
private HostConnectionPoolFactory hostConnectionPoolFactory;
private HashPartitioner hashPartitioner;
private LoadBalancingStrategy lbStrategy = DEFAULT_LB_STRATEGY;
private CompressionStrategy compressionStrategy = DEFAULT_COMPRESSION_STRATEGY;
}
ArchaiusConnectionPoolConfiguration最主要是提供了若干策略,包括负载,压缩,重试:
具体如下:
public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigurationImpl {
......
private final LoadBalancingStrategy loadBalanceStrategy;
private final CompressionStrategy compressionStrategy;
private final ErrorRateMonitorConfig errorRateConfig;
private final RetryPolicyFactory retryPolicyFactory;
private final DynamicBooleanProperty failOnStartupIfNoHosts;
private final DynamicIntProperty lockVotingSize;
......
}
DynoJedisClient 定义如下,我们可以看到最重要的成员变量就是连接池ConnectionPool。
public class DynoJedisClient implements JedisCommands, BinaryJedisCommands, MultiKeyCommands,ScriptingCommands, MultiKeyBinaryCommands, DynoJedisCommands {
private final String appName;
private final String clusterName;
private final ConnectionPool<Jedis> connPool;
private final AtomicReference<DynoJedisPipelineMonitor> pipelineMonitor = new AtomicReference<DynoJedisPipelineMonitor>();
protected final DynoOPMonitor opMonitor;
protected final ConnectionPoolMonitor cpMonitor;
}
因为 DynoJedisClient 最主要是管理连接池,所以我们首先介绍 逻辑连接池 ConnectionPoolImpl。
连接池层为应用程序抽象所有连接管理。在这里,我们可以配置所有内容,例如指定池选项,负载平衡策略,重试策略或默认一致性级别。
ConnectionPoolImpl 是核心类,其主要功能是:
具体定义如下:
public class ConnectionPoolImpl<CL> implements ConnectionPool<CL>, TopologyView {
private final ConcurrentHashMap<Host, HostConnectionPool<CL>> cpMap = new ConcurrentHashMap<Host, HostConnectionPool<CL>>();
private final ConnectionPoolHealthTracker<CL> cpHealthTracker;
private final HostConnectionPoolFactory<CL> hostConnPoolFactory;
private final ConnectionFactory<CL> connFactory;
private final ConnectionPoolConfiguration cpConfiguration;
private final ConnectionPoolMonitor cpMonitor;
private final ScheduledExecutorService idleThreadPool = Executors.newSingleThreadScheduledExecutor();
private final HostsUpdater hostsUpdater;
private final ScheduledExecutorService connPoolThreadPool = Executors.newScheduledThreadPool(1);
private HostSelectionWithFallback<CL> selectionStrategy;
private Type poolType;
}
此时逻辑如下:
+------------------------+
|DynoJedisClient |
| |
| | +------------------------+
| | | |
| connPool +--------------> | ConnectionPoolImpl |
| | | |
| | +------------------------+
+------------------------+
连接池 启动逻辑是:
具体如下:
@Override
public Future<Boolean> start() throws DynoException {
HostSupplier hostSupplier = cpConfiguration.getHostSupplier();
HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
cpMonitor.setHostCount(hostStatus.getHostCount());
Collection<Host> hostsUp = hostStatus.getActiveHosts();
final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size()));
final List<Future<Void>> futures = new ArrayList<Future<Void>>();
// 利用hostsUpdater来获取到的host进行配置添加
for (final Host host : hostsUp) {
// Add host connection pool, but don't init the load balancer yet
futures.add(threadPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
addHost(host, false);
return null;
}
}));
}
// 启用health check monitor来进行错误率跟踪
boolean success = started.compareAndSet(false, true);
if (success) {
selectionStrategy = initSelectionStrategy();
cpHealthTracker.start();
connPoolThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
cpMonitor.setHostCount(hostStatus.getHostCount());
Logger.debug(hostStatus.toString());
updateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts());
}
}, 15 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
MonitorConsole.getInstance().registerConnectionPool(this);
registerMonitorConsoleMBean(MonitorConsole.getInstance());
}
return getEmptyFutureTask(true);
}
启动过程中,添加host逻辑如下:
具体如下:
public boolean addHost(Host host, boolean refreshLoadBalancer) {
HostConnectionPool<CL> connPool = cpMap.get(host);
final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this);
HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool);
if (prevPool == null) {
// This is the first time we are adding this pool.
try {
int primed = hostPool.primeConnections();
if (hostPool.isActive()) {
if (refreshLoadBalancer) {
selectionStrategy.addHost(host, hostPool);
}
cpHealthTracker.initializePingHealthchecksForPool(hostPool);
cpMonitor.hostAdded(host, hostPool);
} else {
cpMap.remove(host);
}
return primed > 0;
} catch (DynoException e) {
cpMap.remove(host);
return false;
}
}
}
关于获取HostConnectionPool,有同步和异步 两种实现方式,具体如下。
private class SyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> {
@Override
public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) {
return new HostConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor);
}
}
private class AsyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> {
@Override
public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) {
return new SimpleAsyncConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor);
}
}
逻辑连接池 有两种执行方式:executeWithRing 与 executeWithFailover。
executeWithRing使用较少,所以不详细介绍。
executeWithFailover 是 利用selectionStrategy获取Connection,在此Connection之上进行执行。如果失败就各种重试。
public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException {
RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
retry.begin();
do {
Connection<CL> connection = null;
try {
connection = selectionStrategy.getConnectionUsingRetryPolicy(op,
cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);
updateConnectionContext(connection.getContext(), connection.getHost());
OperationResult<R> result = connection.execute(op);
// Add context to the result from the successful execution
result.setNode(connection.getHost()).addMetadata(connection.getContext().getAll());
retry.success();
cpMonitor.incOperationSuccess(connection.getHost(), System.currentTimeMillis() - startTime);
return result;
} finally {
if (connection != null) {
if (connection.getLastException() != null
&& connection.getLastException() instanceof FatalConnectionException) {
connection.getParentConnectionPool().recycleConnection(connection);
// note - don't increment connection closed metric here;
// it's done in closeConnection
} else {
connection.getContext().reset();
connection.getParentConnectionPool().returnConnection(connection);
}
}
}
} while (retry.allowRetry());
throw lastException;
}
此时逻辑如下:
+----------------------+
+-------------------+ |ConnectionPoolImpl |
|DynoJedisClient | | |
| | | | +--------------+
| | | hostsUpdater +--------> | HostSupplier |
| | | | +--------------+
| connPool +---------> | |
| | | | +--------------------------+
| | | cpMap +--------> |[Host, HostConnectionPool]|
+-------------------+ | | | + |
+----------------------+ | | |
+--------------------------+
|
|
|
v
+---------------+-----+
| |
| HostConnectionPool |
| |
+---------------------+
HostConnectionPool 是具体连接池实现,此类为每一个Host节点维护一个有效连接池。
具体是:
具体如下:
public class HostConnectionPoolImpl<CL> implements HostConnectionPool<CL> {
// The connections available for this connection pool
private final LinkedBlockingQueue<Connection<CL>> availableConnections = new LinkedBlockingQueue<Connection<CL>>();
// Private members required by this class
private final Host host;
private final ConnectionFactory<CL> connFactory;
private final ConnectionPoolConfiguration cpConfig;
private final ConnectionPoolMonitor monitor;
// states that dictate the behavior of the pool
// cp not inited is the starting state of the pool. The pool will not allow connections to be borrowed in this state
private final ConnectionPoolState<CL> cpNotInited = new ConnectionPoolNotInited();
// cp active is where connections of the pool can be borrowed and returned
private final ConnectionPoolState<CL> cpActive = new ConnectionPoolActive(this);
// cp reconnecting is where connections cannot be borrowed and all returning connections will be shutdown
private final ConnectionPoolState<CL> cpReconnecting = new ConnectionPoolReconnectingOrDown();
// similar to reconnecting
private final ConnectionPoolState<CL> cpDown = new ConnectionPoolReconnectingOrDown();
// The thread safe reference to the pool state
private final AtomicReference<ConnectionPoolState<CL>> cpState = new AtomicReference<ConnectionPoolState<CL>>(cpNotInited);
}
首先我们要看看 如何生成 Connection,大致就是从 connFactory 中直接获取,然后执行监控等相应操作。
@Override
public Connection<CL> createConnection() {
try {
Connection<CL> connection;
if (cpConfig.isConnectToDatastore()) {
// 具体建立连接操作
connection = connFactory.createConnectionWithDataStore(pool);
} else if (cpConfig.isConnectionPoolConsistencyProvided()) {
connection = connFactory.createConnectionWithConsistencyLevel(pool, cpConfig.getConnectionPoolConsistency());
} else {
connection = connFactory.createConnection(pool);
}
connection.open();
availableConnections.add(connection);
monitor.incConnectionCreated(host);
numActiveConnections.incrementAndGet();
return connection;
}
}
JedisConnectionFactory 的 createConnectionWithDataStore 函数执行了具体 建立连接操作,涉及到 Jedis 很多朋友应该都很熟悉。
简略版代码如下:
public class JedisConnectionFactory implements ConnectionFactory<Jedis> {
private final OperationMonitor opMonitor;
private final SSLSocketFactory sslSocketFactory;
public JedisConnectionFactory(OperationMonitor monitor, SSLSocketFactory sslSocketFactory) {
this.opMonitor = monitor;
this.sslSocketFactory = sslSocketFactory;
}
@Override
public Connection<Jedis> createConnectionWithDataStore(HostConnectionPool<Jedis> pool) {
return new JedisConnection(pool, true);
}
// TODO: raghu compose redisconnection with jedisconnection in it
public class JedisConnection implements Connection<Jedis> {
private final HostConnectionPool<Jedis> hostPool;
private final Jedis jedisClient;
public JedisConnection(HostConnectionPool<Jedis> hostPool, boolean connectDataStore) {
this.hostPool = hostPool;
Host host = hostPool.getHost();
int port = connectDataStore ? host.getDatastorePort() : host.getPort();
if (sslSocketFactory == null) {
JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port,
hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT);
jedisClient = new Jedis(shardInfo);
} else {
JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port,
hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT,
true, sslSocketFactory, new SSLParameters(), null);
jedisClient = new Jedis(shardInfo);
}
}
@Override
public HostConnectionPool<Jedis> getParentConnectionPool() {
return hostPool;
}
public Jedis getClient() {
return jedisClient;
}
}
}
此时逻辑如下:
+----------------------+
+-------------------+ |ConnectionPoolImpl |
|DynoJedisClient | | |
| | | | +--------------+
| | | hostsUpdater +--------> | HostSupplier |
| | | | +--------------+
| connPool +---------> | |
| | | | +--------------------------+
| | | cpMap +--------> |[Host, HostConnectionPool]|
+-------------------+ | | | + |
+----------------------+ | | |
+--------------------------+
|
|
+-----------------------------+ |
| JedisConnectionFactory | v
| | +---------------+-------------------------------------------+
| | createConnectionWithDataStore | HostConnectionPool |
| | | |
| sslSocketFactory | <------------------------------------------------+ connFactory Host |
| | | |
| | | LinkedBlockingQueue<Connection<CL<> availableConnections |
+-----------------------------+ | |
+------------------------------+----------------------------+
+ ^
| +----------------------------------------+ |
| |JedisConnection | |
| | | |
| return | | return |
| | HostConnectionPool<Jedis> hostPool | |
+---------------> | | +--------------------------------+
| Jedis(shardInfo) jedisClient |
| |
+----------------------------------------+
手机上如下:
用户使用 borrowConnection 来得到 连接,并且做监控。
@Override
public Connection<CL> borrowConnection(int duration, TimeUnit unit) {
// Start recording how long it takes to get the connection - for insight/metrics
long startTime = System.nanoTime() / 1000;
Connection<CL> conn = null;
// wait on the connection pool with a timeout
conn = availableConnections.poll(duration, unit);
long delay = System.nanoTime() / 1000 - startTime;
monitor.incConnectionBorrowed(host, delay);
}
这里拓扑主要指的是token环,我们再复习下概念。
在 Dynomite 之中,由集群管理的数据就是一个环。环中的每个节点被分配一个或多个由token描述的数据范围,toekn 可以确定在环中的位置。
Token是用于标识每个分区的64位整数ID,范围是-2^63 -- 2^63-1。通过hash算法计算partition key的hash值,以此确定存放在哪个节点。
Token决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环。