当前位置 博文首页 > 罗西的思考:[源码分析] Dynomite 分布式存储引擎 之 DynoJedisC

    罗西的思考:[源码分析] Dynomite 分布式存储引擎 之 DynoJedisC

    作者:罗西的思考 时间:2021-02-03 22:27

    本文剖析 NetFlix Dynomite 的 Java 客户端 DynoJedisClient 如何实现。分析客户端是因为,此客户端的作用很类似于集群master,其思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。

    [源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)

    目录
    • [源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
      • 0x00 摘要
      • 0x01 背景概念
        • 1.1 Amazon Dynamo
        • 1.2 NetFlix Dynomite
      • 0x02 Netflix选型思路
      • 0x03 基础知识
        • 3.1 Data Center
        • 3.2 Rack
          • 3.2 Rings and Tokens
      • 0x04 需求 & 思路
      • 0x05 使用
      • 0x06 配置
        • 6.1 缺省配置
        • 6.2 策略配置
      • 0x07 定义
      • 0x08 逻辑连接池
        • 8.1 启动
        • 8.2 配置Host
        • 8.3 获取HostConnectionPool
        • 8.4 执行
      • 0x09 具体连接池
        • 9.1 生成Connection
        • 9.2 JedisConnectionFactory
        • 9.3 获取Connection
      • 0x10 拓扑
        • 10.1 只读视图
        • 10.2 具体实现
        • 10.3 如何使用
          • 10.3.1 ConnectionPoolImpl
          • 10.3.2 HostSelectionWithFallback
      • 0xFF 参考

    0x00 摘要

    前面我们有文章介绍了Amazon Dynamo系统架构 和 NetFlix Dynomite。

    我们今天来看看 NetFlix Dynomite 的 Java 客户端 DynoJedisClient 如何实现。分析客户端是因为,此客户端的作用很类似于集群master,其思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等。

    因为 Dynomite 对于本文来说,过于庞大&底层,而且 DynoJedisClient 与 Dynomite 耦合过于紧密, 所以我们从最简单的功能点出发看看 DynoJedisClient,于是我们可以想到的功能点是:

    • 如何提供基本功能,即提供数据库连接池;
    • 如何管理节点连接;
    • 如何拓扑感知;
    • 如何负载均衡;
    • 如何故障转移;
    • 故障转移;

    所以我们接下来就围绕这些基本功能点进行分析。

    0x01 背景概念

    1.1 Amazon Dynamo

    亚马逊在业务发展期间面临一些问题,主要受限于关系型数据库的可扩展性和高可用性,因此研发了一套新的、基于 KV 存储模型的数据库,将之命名为 Dynamo,其主要采取完全的分布式、去中心化的架构。

    相较于传统的关系型数据库 MySQLDynamo 的功能目标与之有一些细小的差别,例如: Amazon 的业务场景多数情况并不需要支持复杂查询,却要求必要的单节点故障容错性、数据最终一致性(即牺牲数据强一致优先保障可用性)、较强的可扩展性等。

    1.2 NetFlix Dynomite

    Dynomite 是 NetFlix 对亚马逊分布式存储引擎 Dynamo 的一个开源通用实现,它不仅支持基于内存的 K/V 数据库,还支持持久化的 Mysql、BerkeleyDb、LevelDb 等数据库,并具有简单、高效、支持跨数据中心的数据复制等优点。

    Dynomite 的最终目标是提供数据库存储引擎不能提供的简单、高效、跨数据中心的数据复制功能。目前,Dynomite 已经实现了对 Redis 和 Memcached 的支持。

    0x02 Netflix选型思路

    Netflix选择Dynomite,是因为:

    • 其具有性能,多数据中心复制和高可用性的特点;

    • Dynomite提供分片和可插拔的数据存储引擎,允许在数据需求增加垂直和水平扩展;

    • Dynomite在Redis之上提供了高可用性、对等复制以及一致性等特性,用于构建分布式集群队列。

    • Dyno为持久连接提供连接池;

    • Dyno可以为连接池配置为拓扑感知;

    • 故障转移:Dyno为应用程序提供特定的本地机架,us-east-1a的客户端将连接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种情况下该客户端将进行故障转移。这个属性被用于通过区域划分队列。

    Dynomite对于本文来说,过于底层。

    所以我们重点就看看 DynoJedisClient 如何实现后面几点,当然,这几点其实也无法脱离Dynomite,我们只是力争剥离出来

    0x03 基础知识

    3.1 Data Center

    Data Center 是由多个Rack组成的逻辑集合。

    Data Center 可以是一个机房或者一个区域的设备组合。

    3.2 Rack

    这是一个逻辑集合,有多个彼此临近node的组成。比如一个机架上的所有物理机器。可简单的理解为存放服务器的机柜。

    数据中心与机架是什么关系呢?N:1,1:N,M:N。

    • 如果只需要几台服务器就能满足业务需求,这些服务器至少有2个数据中心,那这种情况下多个数据中心可以放在1个机架上,不过这种情况对数据灾备来说是不太保险的。
    • 第2种情况是1个数据中心相当于1个机房,那机房里会有多个机架。
    • 第3种情况M:N为多个机房的多个数据中心置于多个机架上。

    3.2 Rings and Tokens

    由集群管理的数据就是一个环。环中的每个节点被分配一个或多个由token描述的数据范围,确定在环中的位置。

    Token是用于标识每个分区的64位整数ID,范围是-2^63 -- 2^63-1。通过hash算法计算partition key的hash值,以此确定存放在哪个节点。

    Token也决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环。

    0x04 需求 & 思路

    因为要为上层屏蔽信息,所以 DynoJedisClient 就需要应对各种复杂信息,需要对系统有深刻的了解,比如:

    • 如何维护连接,为持久连接提供连接池;
    • 如何维护拓扑;
    • 如何负载均衡;
    • 如何故障转移;
    • 如何自动重试及发现,比如自动重试挂掉的主机。自动发现集群中的其他主机。
    • 如何监控底层机架状态;

    因此,DynoJedisClient 的思路是:java驱动提供多个策略接口,可以用来驱动程序行为调优。包括负载均衡,重试请求,管理节点连接等等

    0x05 使用

    示例代码如下:

    public static void main(String[] args) throws IOException {
        final String clusterName = args[0];
        int version = Integer.parseInt(args[1]);
        final DynoQueueDemo demo = new DynoQueueDemo(clusterName, "us-east-1e");
        Properties props = new Properties();
        props.load(DynoQueueDemo.class.getResourceAsStream("/demo.properties"));
        for (String name : props.stringPropertyNames()) {
            System.setProperty(name, props.getProperty(name));
        }
        try {
            demo.initWithRemoteClusterFromEurekaUrl(args[0], 8102, false);
            if (version == 1) {
                demo.runSimpleV1Demo(demo.client);
            } else if (version == 2) {
                demo.runSimpleV2QueueDemo(demo.client);
            }
            Thread.sleep(10000);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            demo.stop();
            logger.info("Done");
        }
    }
    

    以及辅助函数:

    public void initWithRemoteClusterFromEurekaUrl(final String clusterName, final int port, boolean lock) throws Exception {
            initWithRemoteCluster(clusterName, getHostsFromDiscovery(clusterName), port, lock);
    }
        
    private void initWithRemoteCluster(String clusterName, final List<Host> hosts, final int port, boolean lock) throws Exception {
            final HostSupplier clusterHostSupplier = () -> hosts;
    
            if (lock)
                initDynoLockClient(clusterHostSupplier, null, "test", clusterName);
            else
                init(clusterHostSupplier, port, null);
    }
        
    public void initDynoLockClient(HostSupplier hostSupplier, TokenMapSupplier tokenMapSupplier, String appName,
                                       String clusterName) {
            dynoLockClient = new DynoLockClient.Builder().withApplicationName(appName)
                    .withDynomiteClusterName(clusterName)
                    .withTimeoutUnit(TimeUnit.MILLISECONDS)
                    .withTimeout(10000)
                    .withHostSupplier(hostSupplier)
                    .withTokenMapSupplier(tokenMapSupplier).build();
    }
    

    0x06 配置

    在 DynoJedisClient 之中,有如下重要配置类。

    6.1 缺省配置

    ConnectionPoolConfigurationImpl主要是提供缺省配置。

    public class ConnectionPoolConfigurationImpl implements ConnectionPoolConfiguration {
        // DEFAULTS
        private static final LoadBalancingStrategy DEFAULT_LB_STRATEGY = LoadBalancingStrategy.TokenAware;
        private static final CompressionStrategy DEFAULT_COMPRESSION_STRATEGY = CompressionStrategy.NONE;
    
        private HostSupplier hostSupplier;
        private TokenMapSupplier tokenSupplier;
        private HostConnectionPoolFactory hostConnectionPoolFactory;
        private HashPartitioner hashPartitioner;
        private LoadBalancingStrategy lbStrategy = DEFAULT_LB_STRATEGY;
        private CompressionStrategy compressionStrategy = DEFAULT_COMPRESSION_STRATEGY;
    }
    

    6.2 策略配置

    ArchaiusConnectionPoolConfiguration最主要是提供了若干策略,包括负载,压缩,重试:

    • LoadBalancingStrategy parseLBStrategy(String propertyPrefix) 是负载策略;
    • CompressionStrategy parseCompressionStrategy(String propertyPrefix) 是压缩策略;
    • RetryPolicyFactory parseRetryPolicyFactory(String propertyPrefix) 是重试策略;

    具体如下:

    public class ArchaiusConnectionPoolConfiguration extends ConnectionPoolConfigurationImpl {
        ......
    
        private final LoadBalancingStrategy loadBalanceStrategy;
        private final CompressionStrategy compressionStrategy;
        private final ErrorRateMonitorConfig errorRateConfig;
        private final RetryPolicyFactory retryPolicyFactory;
        private final DynamicBooleanProperty failOnStartupIfNoHosts;
        private final DynamicIntProperty lockVotingSize;
        
        ......
    }
    

    0x07 定义

    DynoJedisClient 定义如下,我们可以看到最重要的成员变量就是连接池ConnectionPool。

    public class DynoJedisClient implements JedisCommands, BinaryJedisCommands, MultiKeyCommands,ScriptingCommands, MultiKeyBinaryCommands, DynoJedisCommands {
    
        private final String appName;
        private final String clusterName;
        
        private final ConnectionPool<Jedis> connPool;
        
        private final AtomicReference<DynoJedisPipelineMonitor> pipelineMonitor = new AtomicReference<DynoJedisPipelineMonitor>();
    
        protected final DynoOPMonitor opMonitor;
        protected final ConnectionPoolMonitor cpMonitor;
    }
    

    0x08 逻辑连接池

    因为 DynoJedisClient 最主要是管理连接池,所以我们首先介绍 逻辑连接池 ConnectionPoolImpl。

    连接池层为应用程序抽象所有连接管理。在这里,我们可以配置所有内容,例如指定池选项,负载平衡策略,重试策略或默认一致性级别。

    ConnectionPoolImpl 是核心类,其主要功能是:

    • 对于从HostSupplier获得的各种HostConnectionPool进行维护,形成一个HostConnectionPool集合;
    • 对于HostSupplier检测到的hosts,进行添加删除;
    • 从HostConnectionPool提取Connection,进行Operation的执行;
    • 在执行Operation时,采用HostSelectionStrategy,比如:basically Round Robin 或者 TokenAware策略;
    • 使用health check monitor来进行错误率跟踪。health check monitor可以决定重用HostConnectionPool,以及fallback到remote数据中心的HostConnectionPools执行;
    • 使用RetryPolicy来执行operation;

    具体定义如下:

    public class ConnectionPoolImpl<CL> implements ConnectionPool<CL>, TopologyView {
    
        private final ConcurrentHashMap<Host, HostConnectionPool<CL>> cpMap = new ConcurrentHashMap<Host, HostConnectionPool<CL>>();
      
        private final ConnectionPoolHealthTracker<CL> cpHealthTracker;
    
        private final HostConnectionPoolFactory<CL> hostConnPoolFactory;
        private final ConnectionFactory<CL> connFactory;
        private final ConnectionPoolConfiguration cpConfiguration;
        private final ConnectionPoolMonitor cpMonitor;
    
        private final ScheduledExecutorService idleThreadPool = Executors.newSingleThreadScheduledExecutor();
    
        private final HostsUpdater hostsUpdater;
        private final ScheduledExecutorService connPoolThreadPool = Executors.newScheduledThreadPool(1);
    
        private HostSelectionWithFallback<CL> selectionStrategy;
    
        private Type poolType;
    }
    

    此时逻辑如下:

    +------------------------+
    |DynoJedisClient         |
    |                        |
    |                        |            +------------------------+
    |                        |            |                        |
    |          connPool +-------------->  |   ConnectionPoolImpl   |
    |                        |            |                        |
    |                        |            +------------------------+
    +------------------------+
    

    8.1 启动

    连接池 启动逻辑是:

    • 利用hostsUpdater来获取到的host进行配置添加;
    • 启用health check monitor来进行错误率跟踪;

    具体如下:

    @Override
    public Future<Boolean> start() throws DynoException {
    
            HostSupplier hostSupplier = cpConfiguration.getHostSupplier();
            HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
            cpMonitor.setHostCount(hostStatus.getHostCount());
    
            Collection<Host> hostsUp = hostStatus.getActiveHosts();
            final ExecutorService threadPool = Executors.newFixedThreadPool(Math.max(10, hostsUp.size()));
            final List<Future<Void>> futures = new ArrayList<Future<Void>>();
    
        	// 利用hostsUpdater来获取到的host进行配置添加
            for (final Host host : hostsUp) {
                // Add host connection pool, but don't init the load balancer yet
                futures.add(threadPool.submit(new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        addHost(host, false);
                        return null;
                    }
                }));
            }
    
        	// 启用health check monitor来进行错误率跟踪
            boolean success = started.compareAndSet(false, true);
            if (success) {
           
                selectionStrategy = initSelectionStrategy();
                cpHealthTracker.start();
                connPoolThreadPool.scheduleWithFixedDelay(new Runnable() {
    
                    @Override
                    public void run() {
                            HostStatusTracker hostStatus = hostsUpdater.refreshHosts();
                            cpMonitor.setHostCount(hostStatus.getHostCount());
                            Logger.debug(hostStatus.toString());
                            updateHosts(hostStatus.getActiveHosts(), hostStatus.getInactiveHosts());
                    }
    
                }, 15 * 1000, 30 * 1000, TimeUnit.MILLISECONDS);
    
                MonitorConsole.getInstance().registerConnectionPool(this);
                registerMonitorConsoleMBean(MonitorConsole.getInstance());
            }
            return getEmptyFutureTask(true);
    }
    

    8.2 配置Host

    启动过程中,添加host逻辑如下:

    • 依据host获取HostConnectionPool;
    • 把HostConnectionPool加入到集合;
    • 把 host,HostConnectionPool加入到选择策略selectionStrategy;
    • 依据host设置health check monitor;

    具体如下:

    public boolean addHost(Host host, boolean refreshLoadBalancer) {
    
            HostConnectionPool<CL> connPool = cpMap.get(host);
    
            final HostConnectionPool<CL> hostPool = hostConnPoolFactory.createHostConnectionPool(host, this);
    
            HostConnectionPool<CL> prevPool = cpMap.putIfAbsent(host, hostPool);
            if (prevPool == null) {
                // This is the first time we are adding this pool.
                try {
                    int primed = hostPool.primeConnections();
                    if (hostPool.isActive()) {
                        if (refreshLoadBalancer) {
                            selectionStrategy.addHost(host, hostPool);
                        }
                        cpHealthTracker.initializePingHealthchecksForPool(hostPool);
                        cpMonitor.hostAdded(host, hostPool);
                    } else {
                        cpMap.remove(host);
                    }
                    return primed > 0;
                } catch (DynoException e) {
                    cpMap.remove(host);
                    return false;
                }
            } 
    }
    
    

    8.3 获取HostConnectionPool

    关于获取HostConnectionPool,有同步和异步 两种实现方式,具体如下。

    private class SyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> {
            @Override
            public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) {
                return new HostConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor);
            }
    }
    
    private class AsyncHostConnectionPoolFactory implements HostConnectionPoolFactory<CL> {
            @Override
            public HostConnectionPool<CL> createHostConnectionPool(Host host, ConnectionPoolImpl<CL> parentPoolImpl) {
                return new SimpleAsyncConnectionPoolImpl<CL>(host, connFactory, cpConfiguration, cpMonitor);
            }
    }
    
    

    8.4 执行

    逻辑连接池 有两种执行方式:executeWithRing 与 executeWithFailover。

    • executeWithRing使用较少,所以不详细介绍。

    • executeWithFailover 是 利用selectionStrategy获取Connection,在此Connection之上进行执行。如果失败就各种重试。

    public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op) throws DynoException {
    
            RetryPolicy retry = cpConfiguration.getRetryPolicyFactory().getRetryPolicy();
            retry.begin();
    
            do {
                Connection<CL> connection = null;
    
                try {
                    connection = selectionStrategy.getConnectionUsingRetryPolicy(op,
                            cpConfiguration.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS, retry);
    
                    updateConnectionContext(connection.getContext(), connection.getHost());
    
                    OperationResult<R> result = connection.execute(op);
    
                    // Add context to the result from the successful execution
                    result.setNode(connection.getHost()).addMetadata(connection.getContext().getAll());
    
                    retry.success();
                    cpMonitor.incOperationSuccess(connection.getHost(), System.currentTimeMillis() - startTime);
    
                    return result;
    
                } finally {
                    if (connection != null) {
                        if (connection.getLastException() != null
                                && connection.getLastException() instanceof FatalConnectionException) {
                            connection.getParentConnectionPool().recycleConnection(connection);
                            // note - don't increment connection closed metric here;
                            // it's done in closeConnection
                        } else {
                            connection.getContext().reset();
                            connection.getParentConnectionPool().returnConnection(connection);
                        }
                    }
                }
    
            } while (retry.allowRetry());
            throw lastException;
        }
    
    

    此时逻辑如下:

                               +----------------------+
    +-------------------+      |ConnectionPoolImpl    |
    |DynoJedisClient    |      |                      |
    |                   |      |                      |         +--------------+
    |                   |      |       hostsUpdater +-------->  | HostSupplier |
    |                   |      |                      |         +--------------+
    |     connPool +---------> |                      |
    |                   |      |                      |         +--------------------------+
    |                   |      |              cpMap +-------->  |[Host, HostConnectionPool]|
    +-------------------+      |                      |         |               +          |
                               +----------------------+         |               |          |
                                                                +--------------------------+
                                                                                |
                                                                                |
                                                                                |
                                                                                v
                                                                +---------------+-----+
                                                                |                     |
                                                                | HostConnectionPool  |
                                                                |                     |
                                                                +---------------------+
    
    

    0x09 具体连接池

    HostConnectionPool 是具体连接池实现,此类为每一个Host节点维护一个有效连接池

    具体是:

    • HostConnectionPool 使用 LinkedBlockingQueue availableConnections 来维护所有有效连接,当client需要一个连接,需要从queue中提取。
    • 所以,availableConnections 就是有效连接池。
    • availableConnections 之中每一个 连接就是一个 Connection;
    • 这个 Connection (JedisConnection)是通过 JedisConnectionFactory 建立的;
    • 另外,每一个 JedisConnection 里面有:
      • HostConnectionPool hostPool;
      • Jedis jedisClient;

    具体如下:

    public class HostConnectionPoolImpl<CL> implements HostConnectionPool<CL> {
    
        // The connections available for this connection pool
        private final LinkedBlockingQueue<Connection<CL>> availableConnections = new LinkedBlockingQueue<Connection<CL>>(); 
    
        // Private members required by this class
        private final Host host;
        private final ConnectionFactory<CL> connFactory;
        private final ConnectionPoolConfiguration cpConfig;
        private final ConnectionPoolMonitor monitor;
    
        // states that dictate the behavior of the pool
    
        // cp not inited is the starting state of the pool. The pool will not allow connections to be borrowed in this state
        private final ConnectionPoolState<CL> cpNotInited = new ConnectionPoolNotInited();
        // cp active is where connections of the pool can be borrowed and returned
        private final ConnectionPoolState<CL> cpActive = new ConnectionPoolActive(this);
        // cp reconnecting is where connections cannot be borrowed and all returning connections will be shutdown
        private final ConnectionPoolState<CL> cpReconnecting = new ConnectionPoolReconnectingOrDown();
        // similar to reconnecting
        private final ConnectionPoolState<CL> cpDown = new ConnectionPoolReconnectingOrDown();
    
        // The thread safe reference to the pool state
        private final AtomicReference<ConnectionPoolState<CL>> cpState = new AtomicReference<ConnectionPoolState<CL>>(cpNotInited);
    }
    
    

    9.1 生成Connection

    首先我们要看看 如何生成 Connection,大致就是从 connFactory 中直接获取,然后执行监控等相应操作。

    @Override
    public Connection<CL> createConnection() {
    
                try {
                    Connection<CL> connection;
                    if (cpConfig.isConnectToDatastore()) {
                        
                        // 具体建立连接操作
                        connection = connFactory.createConnectionWithDataStore(pool);
                        
                    } else if (cpConfig.isConnectionPoolConsistencyProvided()) {
                        connection = connFactory.createConnectionWithConsistencyLevel(pool, cpConfig.getConnectionPoolConsistency());
                    } else {
                        connection = connFactory.createConnection(pool);
                    }
    
                    connection.open();
                    availableConnections.add(connection);
    
                    monitor.incConnectionCreated(host);
                    numActiveConnections.incrementAndGet();
    
                    return connection;
                } 
    }
    
    

    9.2 JedisConnectionFactory

    JedisConnectionFactory 的 createConnectionWithDataStore 函数执行了具体 建立连接操作,涉及到 Jedis 很多朋友应该都很熟悉。

    简略版代码如下:

    public class JedisConnectionFactory implements ConnectionFactory<Jedis> {
    
        private final OperationMonitor opMonitor;
        private final SSLSocketFactory sslSocketFactory;
    
        public JedisConnectionFactory(OperationMonitor monitor, SSLSocketFactory sslSocketFactory) {
            this.opMonitor = monitor;
            this.sslSocketFactory = sslSocketFactory;
        }
    
        @Override
        public Connection<Jedis> createConnectionWithDataStore(HostConnectionPool<Jedis> pool) {
            return new JedisConnection(pool, true);
        }
    
        // TODO: raghu compose redisconnection with jedisconnection in it
        public class JedisConnection implements Connection<Jedis> {
    
            private final HostConnectionPool<Jedis> hostPool;
            private final Jedis jedisClient;
    
            public JedisConnection(HostConnectionPool<Jedis> hostPool, boolean connectDataStore) {
                this.hostPool = hostPool;
                Host host = hostPool.getHost();
    
                int port = connectDataStore ? host.getDatastorePort() : host.getPort();
    
                if (sslSocketFactory == null) {
                    JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port,
                            hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT);
    
                    jedisClient = new Jedis(shardInfo);
                } else {
                    JedisShardInfo shardInfo = new JedisShardInfo(host.getHostAddress(), port,
                            hostPool.getConnectionTimeout(), hostPool.getSocketTimeout(), Sharded.DEFAULT_WEIGHT,
                            true, sslSocketFactory, new SSLParameters(), null);
    
                    jedisClient = new Jedis(shardInfo);
                }
            }
    
            @Override
            public HostConnectionPool<Jedis> getParentConnectionPool() {
                return hostPool;
            }
    
            public Jedis getClient() {
                return jedisClient;
            }
        }
    }
    
    

    此时逻辑如下:

                                                  +----------------------+
                       +-------------------+      |ConnectionPoolImpl    |
                       |DynoJedisClient    |      |                      |
                       |                   |      |                      |         +--------------+
                       |                   |      |       hostsUpdater +-------->  | HostSupplier |
                       |                   |      |                      |         +--------------+
                       |     connPool +---------> |                      |
                       |                   |      |                      |         +--------------------------+
                       |                   |      |              cpMap +-------->  |[Host, HostConnectionPool]|
                       +-------------------+      |                      |         |               +          |
                                                  +----------------------+         |               |          |
                                                                                   +--------------------------+
                                                                                                   |
                                                                                                   |
    +-----------------------------+                                                                |
    | JedisConnectionFactory      |                                                                v
    |                             |                                                +---------------+-------------------------------------------+
    |                             |          createConnectionWithDataStore         | HostConnectionPool                                        |
    |                             |                                                |                                                           |
    |  sslSocketFactory           |  <------------------------------------------------+ connFactory      Host                                  |
    |                             |                                                |                                                           |
    |                             |                                                |  LinkedBlockingQueue<Connection<CL<> availableConnections |
    +-----------------------------+                                                |                                                           |
                                                                                   +------------------------------+----------------------------+
                   +                                                                                              ^
                   |                  +----------------------------------------+                                  |
                   |                  |JedisConnection                         |                                  |
                   |                  |                                        |                                  |
                   |   return         |                                        |   return                         |
                   |                  |     HostConnectionPool<Jedis> hostPool |                                  |
                   +--------------->  |                                        | +--------------------------------+
                                      |     Jedis(shardInfo) jedisClient       |
                                      |                                        |
                                      +----------------------------------------+
    
    
    

    手机上如下:

    9.3 获取Connection

    用户使用 borrowConnection 来得到 连接,并且做监控。

    @Override
    public Connection<CL> borrowConnection(int duration, TimeUnit unit) {
                // Start recording how long it takes to get the connection - for insight/metrics
                long startTime = System.nanoTime() / 1000;
                Connection<CL> conn = null;
                // wait on the connection pool with a timeout
                conn = availableConnections.poll(duration, unit);
                long delay = System.nanoTime() / 1000 - startTime;
                monitor.incConnectionBorrowed(host, delay);
    }
    
    

    0x10 拓扑

    这里拓扑主要指的是token环,我们再复习下概念。

    在 Dynomite 之中,由集群管理的数据就是一个环。环中的每个节点被分配一个或多个由token描述的数据范围,toekn 可以确定在环中的位置。

    Token是用于标识每个分区的64位整数ID,范围是-2^63 -- 2^63-1。通过hash算法计算partition key的hash值,以此确定存放在哪个节点。

    Token决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环。