当前位置 博文首页 > [从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志

    [从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志

    作者:罗西的思考 时间:2021-01-09 09:01

    SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。 本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。 本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。

    [从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志

    目录
    • [从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志
      • 0x00 摘要
      • 0x01 整体业务流程
        • 1.1 服务注册过程
        • 1.2 数据分片
      • 0x02 基础数据结构
        • 2.1 Publisher
        • 2.2 Datum
        • 2.3 DatumCache
        • 2.4 Operator
        • 2.5 Acceptor
        • 2.6 总结
      • 0x03 Datum的来龙去脉
        • 3.1 Session Server 内部
        • 3.2 PublishDataHandler
        • 3.3 DataChangeEventCenter
        • 3.4 DataChangeEventQueue
        • 3.5 DataChangeHandler
      • 0x04 DataChangeHandler处理
      • 0x05 AbstractAcceptorStore存储
        • 5.1 Bean
        • 5.2 StoreServiceFactory
        • 5.3 AbstractAcceptorStore
        • 5.4 加入
        • 5.5 使用
          • 5.5.1 Scheduler
          • 5.5.2 changeDataCheck
            • 5.5.2.1 通知NotifyDataSyncRequest
          • 5.5.3 checkAcceptorsChangAndExpired
      • 0x06 Acceptor日志操作
        • 6.1 appendOperator
        • 6.2 checkExpired
      • 0x07 NotifyDataSyncRequest通知数据同步
        • 7.1 NotifyDataSyncHandler
          • 7.1.1 doHandle
          • 7.1.2 executorRequest
          • 7.1.3 GetSyncDataHandler
          • 7.1.4 SyncDataCallback
      • 0x08 SyncDataRequest回送通知
        • 8.1 SyncDataRequest
          • 8.1.1 SyncDataRequest 从哪里来
        • 8.2 syncDataHandler
        • 8.3 SyncDataServiceImpl
        • 8.4 AbstractAcceptorStore
        • 8.5 Acceptor
      • 0x09 SyncDataCallback接受者回调
        • 9.1 DataChangeHandler
      • 0x10 总结
      • 0xFF 参考

    0x00 摘要

    SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

    本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

    本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。

    0x01 整体业务流程

    我们首先回顾总体业务流程,这部分属于数据分片。

    1.1 服务注册过程

    回顾下“一次服务注册过程”的服务数据在内部流转过程。

    1. Client 调用 publisher.register 向 SessionServer 注册服务。
    2. SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
    3. DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
    4. 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
    5. SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
    6. 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。

    因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点

    1.2 数据分片

    当服务上线时,会计算新增服务的 dataInfoId Hash 值,从而对该服务进行分片,最后寻找最近的一个节点,存储到相应的节点上。

    DataServer 服务在启动时添加了 publishDataProcessor 来处理相应的服务发布者数据发布请求,该 publishDataProcessor 就是 PublishDataHandler。当有新的服务发布者上线,DataServer 的 PublishDataHandler 将会被触发。

    该 Handler 首先会判断当前节点的状态,若是非工作状态则返回请求失败。若是工作状态,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。

    DataChangeEventQueue 中维护着一个 DataChangeEventQueue 队列数组,数组中的每个元素是一个事件队列。当上文中的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步确定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。

    DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该方法会源源不断地从队列中获取新增事件,并且进行分发。新增数据会由此添加进节点内,实现分片。

    与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步

    0x02 基础数据结构

    这里需要首先讲解几个相关数据结构。

    2.1 Publisher

    Publisher是数据发布者信息

    public class Publisher extends BaseInfo {
        private List<ServerDataBox> dataList;
        private PublishType         publishType      = PublishType.NORMAL;
    }
    

    2.2 Datum

    从SOFARegistry本身出发而汇集的数据发布者信息,里面核心是 :

    • dataInfoId:服务唯一标识,由``<分组 group><租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 通常是 com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据做逻辑上的切分,使不同 group 和 instance 的服务数据在逻辑上完全独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义即可;
    • dataCenter:一个物理机房,包含多个逻辑单元(zone)。zone:是一种单元化架构下的概念,代表一个机房内的逻辑单元。在服务发现场景下,发布服务时需指定逻辑单元(zone),而订阅服务者可以订阅逻辑单元(zone)维度的服务数据,也可以订阅物理机房(datacenter)维度的服务数据,即订阅该 datacenter 下的所有 zone 的服务数据。;
    • pubMap:包括的Publisher;
    • version:对应的版本

    具体代码如下:

    public class Datum implements Serializable {
        private String                                dataInfoId;
        private String                                dataCenter;
        private String                                dataId;
        private String                                instanceId;
        private String                                group;
        private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
        private long                                  version;
        private boolean                               containsUnPub    = false;
    }
    

    2.3 DatumCache

    DatumCache 是最新的Datum。

    public class DatumCache {
        @Autowired
        private DatumStorage localDatumStorage;
    }
    

    具体存储是在LocalDatumStorage中完成。

    public class LocalDatumStorage implements DatumStorage {
        /**
         * row:     dataCenter
         * column:  dataInfoId
         * value:   datum
         */
        protected final Map<String, Map<String, Datum>>     DATUM_MAP            = new ConcurrentHashMap<>();
    
        /**
         * all datum index
         *
         * row:     ip:port
         * column:  registerId
         * value:   publisher
         */
        protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
    
        @Autowired
        private DataServerConfig                            dataServerConfig;
    }
    

    2.4 Operator

    Operator 是每一步Datum对应的操作

    public class Operator {
        private Long               version;
        private Long               sourceVersion;
        private Datum              datum;
        private DataSourceTypeEnum sourceType;
    }
    

    2.5 Acceptor

    记录了所有的Datum操作。其中:

    • logOperatorsOrder记录了操作的顺序;
    • logOperators是所有的操作;
    public class Acceptor {
        private final String                    dataInfoId;
        private final String                    dataCenter;
        private int                             maxBufferSize;
        static final int                        DEFAULT_DURATION_SECS = 30;
        private final Deque<Long/*version*/>   logOperatorsOrder = new ConcurrentLinkedDeque<>();
        private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
        private final DatumCache                datumCache;
    }
    

    2.6 总结

    总结下这几个数据结构的联系:

    • Publisher是数据发布者信息
    • Datum是从SOFARegistry本身出发而汇集的数据发布者信息
    • DatumCache 是最新的Datum
    • Operator 是每一步Datum对应的操作
    • Acceptor记录了所有的Datum操作

    0x03 Datum的来龙去脉

    我们先回顾下 Datum 的来龙去脉。

    3.1 Session Server 内部

    首先,我们讲讲Session Server 内部如何获取Datum

    在 Session Server 内部,Datum存储在 SessionCacheService 之中。

    比如在 DataChangeFetchCloudTask 内部,可以这样获取 Datum。

    private Map<String, Datum> getDatumsCache() {
        Map<String, Datum> map = new HashMap<>();
        NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
        Collection<String> dataCenters = nodeManager.getDataCenters();
        if (dataCenters != null) {
            Collection<Key> keys = dataCenters.stream().
                    map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
                            new DatumKey(fetchDataInfoId, dataCenter))).
                    collect(Collectors.toList());
    
            Map<Key, Value> values = null;
            values = sessionCacheService.getValues(keys);
    
            if (values != null) {
                values.forEach((key, value) -> {
                    if (value != null && value.getPayload() != null) {
                        map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                    }
                });
            }
        }
        return map;
    }
    

    Session Server 会向 Data Server 发送 PublishDataRequest 请求

    3.2 PublishDataHandler

    在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest

    public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
        @Autowired
        private ForwardService                 forwardService;
    
        @Autowired
        private SessionServerConnectionFactory sessionServerConnectionFactory;
    
        @Autowired
        private DataChangeEventCenter          dataChangeEventCenter;
    
        @Autowired
        private DataServerConfig               dataServerConfig;
    
        @Autowired
        private DatumLeaseManager              datumLeaseManager;
    
        @Autowired
        private ThreadPoolExecutor             publishProcessorExecutor;
    
        @Override
        public Object doHandle(Channel channel, PublishDataRequest request) {
            Publisher publisher = Publisher.internPublisher(request.getPublisher());
            if (forwardService.needForward()) {
                CommonResponse response = new CommonResponse();
                response.setSuccess(false);
                response.setMessage("Request refused, Server status is not working");
                return response;
            }
    
            dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
    
            if (publisher.getPublishType() != PublishType.TEMPORARY) {
                String connectId = WordCache.getInstance().getWordCache(
                    publisher.getSourceAddress().getAddressString());
                sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                    connectId);
                // record the renew timestamp
                datumLeaseManager.renew(connectId);
            }
    
            return CommonResponse.buildSuccessResponse();
        }
    }
    

    3.3 DataChangeEventCenter

    在 DataChangeEventCenter 的 onChange 函数中,会进行投放

    public void onChange(Publisher publisher, String dataCenter) {
        int idx = hash(publisher.getDataInfoId());
        Datum datum = new Datum(publisher, dataCenter);
        if (publisher instanceof UnPublisher) {
            datum.setContainsUnPub(true);
        }
        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));
        } else {
            dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB_TEMP, datum));
        }
    }
    

    3.4 DataChangeEventQueue

    在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。

    3.5 DataChangeHandler

    在 DataChangeHandler 之中,会提取ChangeData,然后进行Notify。

    public void start() {
        DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
        int queueCount = queues.length;
        Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
        Executor notifyExecutor = ExecutorFactory
                .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
        for (int idx = 0; idx < queueCount; idx++) {
            final DataChangeEventQueue dataChangeEventQueue = queues[idx];
            final String name = dataChangeEventQueue.getName();
            executor.execute(() -> {
                while (true) {
                        final ChangeData changeData = dataChangeEventQueue.take();
                        notifyExecutor.execute(new ChangeNotifier(changeData, name));
                }
            });
        }
    }
    

    具体如下:

                                               +
                               Session Server  |  Data Server
                                               |
                                               |
                                               |
                                               |
    +--------------------------+  PublishDataRequest   +--------------------+
    | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
    +-----------+--------------+               |       +------+-------------+
                ^                              |              |
                | getValues                    |              |  onChange(Publisher)
                |                              |              v
                |                              |     +--------+--------------+
      +---------+----------+                   |     | DataChangeEventCenter |
      |sessionCacheService |                   |     +--------+--------------+
      +--------------------+                   |              |
                                               |              |  Datum
                                               |              |
                                               |              v
                                               |     +--------+-------------+
                                               |     | DataChangeEventQueue |
                                               |     +--------+-------------+
                                               |              |
                                               |              |
                                               |              | ChangeData
                                               |              v
                                               |      +-------+-----------+
                                               |      | DataChangeHandler |
                                               +      +-------------------+
    

    0x04 DataChangeHandler处理

    于是我们接着进行 DataChangeHandler 处理。即总述中提到的:DataChangeHandler 会把这个事件变更信息:

    1. 把这个事件变更信息变成Operator,放到AbstractAcceptorStore;
    2. 通过 ChangeNotifier 对外发布,通知其他节点进行数据同步;

    下面我们从第一部分 :把这个事件变更信息变成Operator,放到AbstractAcceptorStore 出发,进行讲解日志操作。

    即如图所示:

                                               +
                               Session Server  |  Data Server
                                               |
                                               |
                                               |
                                               +
    +--------------------------+  PublishDataRequest   +--------------------+
    | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
    +-----------+--------------+               |       +------+-------------+
                ^                              |              |
                | getValues                    |              |  onChange(Publisher)
                |                              |              v
                |                              |     +--------+--------------+
      +---------+----------+                   |     | DataChangeEventCenter |
      |sessionCacheService |                   |     +--------+--------------+
      +--------------------+                   |              |
                                               |              |  Datum
                                               |              |
                                               |              v
                                               |     +--------+-------------+
                                               |     | DataChangeEventQueue |
                                               |     +--------+-------------+
                                               |              |
                                               |              |
                                               |              | ChangeData
                                               |              v
                                               |      +-------+-----------+
                                               |      | DataChangeHandler |
                                               |      +-------+-----------+
                                               |              |
                                               |              |
                                               |              v
                                               |      +-------+---------+
                                               |      |  ChangeNotifier |
                                               |      +-------+---------+
                                               |              |
                                               |              |
                                               |              v
                                               |   +----------+------------+
                                               |   | AbstractAcceptorStore |
                                               |   +-----------------------+
                                               +
    

    Acceptor的appendOperator谁来调用?在Notifier 里面有,比如:

    public class BackUpNotifier implements IDataChangeNotifier {
    
        @Autowired
        private SyncDataService syncDataService;
    
        @Override
        public void notify(Datum datum, Long lastVersion) {
            syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
                DataSourceTypeEnum.BACKUP));
        }
    }
    

    以及另一个:

    public class SnapshotBackUpNotifier implements IDataChangeNotifier {
    
        @Autowired
        private SyncDataService syncDataService;
    
        @Override
        public void notify(Datum datum, Long lastVersion) {
            syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
                DataSourceTypeEnum.BACKUP));
        }
    }
    

    0x05 AbstractAcceptorStore存储

    AbstractAcceptorStore是日志存储,我们下面详细分析。

    5.1 Bean

    对于操作信息,提供了一个Bean来存储。

    @Bean
    public AcceptorStore localAcceptorStore() {
        return new LocalAcceptorStore();
    }
    

    5.2 StoreServiceFactory

    作用是在 storeServiceMap 中存放各种 AcceptorStore,目前只有LocalAcceptorStore 这一个。

    public class StoreServiceFactory implements ApplicationContextAware {
    
        private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();
    
        /**
         * get AcceptorStore by storeType
         * @param storeType
         * @return
         */
        public static AcceptorStore getStoreService(String storeType) {
            return storeServiceMap.get(storeType);
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);
    
            map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
        }
    }
    

    5.3 AbstractAcceptorStore

    AbstractAcceptorStore 是存储的基本实现类,几个基本成员是。

    • acceptors :是一个矩阵,按照dataCenter,dataInfoId维度来分类,存储了此维度下的Acceptor;就是说,针对每一个dataCenter,dataInfoId的组合,都有一个Acceptor,用来存储这下面的Operator。

    • notifyAcceptorsCache :是一个矩阵,按照dataCente,dataInfoId维度来分类,缓存了此维度下需要进一步处理的Acceptor;

    • delayQueue :配合notifyAcceptorsCache使用,针对notifyAcceptorsCache的每一个新acceptor,系统会添加一个消息进入queue,这个queue等延时到了,就会取出,并且从notifyAcceptorsCache取出对应的新acceptor进行相应处理;

    按说应该是 cache 有东西,所以dequeue 时候就会取出来,但是如果这期间多放入了几个进入 Cache,原有cache 的 value 只是被替换而已,等时间到了,也会取出来

    notifyAcceptorsCache 也是按照 data center 来控制的,只有定期 removeCache。

    public abstract class AbstractAcceptorStore implements AcceptorStore {
    
        private static final int               DEFAULT_MAX_BUFFER_SIZE = 30;
    
        @Autowired
        protected IMetaServerService           metaServerService;
    
        @Autowired
        private Exchange                       boltExchange;
    
        @Autowired
        private DataServerConfig               dataServerConfig;
    
        @Autowired
        private DataServerConnectionFactory    dataServerConnectionFactory;
    
        @Autowired
        private DatumCache                     datumCache;
    
        private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors               = new ConcurrentHashMap<>();
    
        private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache    = new ConcurrentHashMap<>();
    
        private DelayQueue<DelayItem<Acceptor>>     delayQueue 
    }
    

    具体如下图:

    +-----------------------------+                      +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
    |[AbstractAcceptorStore]      |                      |
    |                             |   +-> dataCenter +---+
    |                             |   |                  |
    |     acceptors  +--------------->+                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
    |                             |   |
    |     notifyAcceptorsCache    |   |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
    |           +                 |   +-> dataCenter +-->+
    +-----------------------------+                      |
                |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                |
                |
                |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                |                     +-> dataCenter +-->+
                |                     |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                +-------------------->+
                                      |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                                      +-> dataCenter +---+
                                                         +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
    
    

    手机如图:

    有一点需要说明,就是delayQueue 为何要延迟队列。这是由于SOFA的“秒级服务上下线通知“特性造成的

    因为要实现此特性,所以涉及到了一个连接敏感性问题,即在 SOFARegistry 里,所有 Client 都与 SessionServer 保持长连接,每条长连接都会有基于 bolt 的连接心跳,如果连接断开,Client 会马上重新建连,时刻保证 Client 与 SessionServer 之间有可靠的连接。

    因为强烈的连接敏感性,所以导致如果只是网络问题导致连接断开,实际的进程并没有宕机,那么 Client 会马上重连 SessionServer 并重新注册所有服务数据。这种大量的短暂的服务下线后又重新上线会给用户带来困扰和麻烦

    因此在 DataServer 内部实现了数据延迟合并的功能,就是这里的DelayQueue

    5.4 加入

    addOperator的基本逻辑是:

    • 从Operator的Datum中提取dataCenter和dataInfoId;
    • 从acceptors取出dataCenter对应的Map<dataInfoId, Acceptor> acceptorMap;
    • 从acceptorMap中提取dataInfoId对应的existAcceptor;
    • 如果新operator是SnapshotOperator类型,则清除之前的 opeator queue。
    • 否则加入新operator;
    • 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定时任务会处理;

    在操作中,都是使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用

    @Override
    public void addOperator(Operator operator) {
    
        Datum datum = operator.getDatum();
        String dataCenter = datum.getDataCenter();
        String dataInfoId = datum.getDataInfoId();
        try {
            Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
            if (acceptorMap == null) {
                Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
                acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
                if (acceptorMap == null) {
                    acceptorMap = newMap;
                }
            }
    
            Acceptor existAcceptor = acceptorMap.get(dataInfoId);
            if (existAcceptor == null) {
                Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
                    dataCenter, datumCache);
                existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
                if (existAcceptor == null) {
                    existAcceptor = newAcceptor;
                }
            }
    
            if (operator instanceof SnapshotOperator) {
                //snapshot: clear the queue, Make other data retrieve the latest memory data
                existAcceptor.clearBefore();
            } else {
                existAcceptor.appendOperator(operator);
            }
    
            //put cache
            putCache(existAcceptor);
        } 
    }
    

    putCache的作用是:

    • 从acceptor中提取dataCenter和dataInfoId;
    • 从notifyAcceptorsCache中取出dataCenter对应的Map<dataInfoId, Acceptor> acceptorMap;
    • 向acceptorMap中放入dataInfoId对应的acceptor;
    • 如果acceptorMap中之前没有对应的value,则把acceptor放入delayQueue;

    这里也使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用

    private void putCache(Acceptor acceptor) {
    
        String dataCenter = acceptor.getDataCenter();
        String dataInfoId = acceptor.getDataInfoId();
    
        try {
            Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
            if (acceptorMap == null) {
                Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
                acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
                if (acceptorMap == null) {
                    acceptorMap = newMap;
                }
            }
            Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
            if (existAcceptor == null) {
                addQueue(acceptor);
            }
        } 
    }
    

    5.5 使用

    具体消费是在定期任务中完成。消费日志的目的就是同步日志操作给其他 DataServer。

    5.5.1 Scheduler

    Scheduler类是定期任务,会启动两个线程池定期调用AcceptorStore的函数

    public class Scheduler {
        private final ScheduledExecutorService scheduler;
      
        public final ExecutorService           versionCheckExecutor;
    
        private final ThreadPoolExecutor       expireCheckExecutor;
    
        @Autowired
        private AcceptorStore                  localAcceptorStore;
    
       public Scheduler() {
            scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
    
            expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
                new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
    
            versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), new NamedThreadFactory(
                    "SyncDataScheduler-versionChangeCheck"));
    
        }
      
        public void startScheduler() {
            scheduler.schedule(
                    new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                            TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
                    30, TimeUnit.SECONDS);
    
            versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
    
        }
    }
    

    AbstractAcceptorStore中函数如下:

    5.5.2 changeDataCheck

    changeDataCheck 内部是一个while true,所以不需要再使用线程池。

    changeDataCheck绑定在delayQueue上,如果有新消息,则会取出Acceptor,也从notifyAcceptorsCache取出Acceptor,调用notifyChange(acceptor);进行处理 。

    @Override
    public void changeDataCheck() {
    
        while (true) {
            try {
                DelayItem<Acceptor> delayItem = delayQueue.take();
                Acceptor acceptor = delayItem.getItem();
                removeCache(acceptor); // compare and remove
            } catch (InterruptedException e) {
                break;
            } catch (Throwable e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    
    }
    

    消费Cache用到的是removeCache。

    private void removeCache(Acceptor acceptor) {
        String dataCenter = acceptor.getDataCenter();
        String dataInfoId = acceptor.getDataInfoId();
        try {
            Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
            if (acceptorMap != null) {
                boolean result = acceptorMap.remove(dataInfoId, acceptor);
                if (result) {
                    //data change notify
                    notifyChange(acceptor);
                }
            }
        } 
    }