当前位置 博文首页 > [从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。
我们首先回顾总体业务流程,这部分属于数据分片。
回顾下“一次服务注册过程”的服务数据在内部流转过程。
因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。
当服务上线时,会计算新增服务的 dataInfoId Hash 值,从而对该服务进行分片,最后寻找最近的一个节点,存储到相应的节点上。
DataServer 服务在启动时添加了 publishDataProcessor 来处理相应的服务发布者数据发布请求,该 publishDataProcessor 就是 PublishDataHandler。当有新的服务发布者上线,DataServer 的 PublishDataHandler 将会被触发。
该 Handler 首先会判断当前节点的状态,若是非工作状态则返回请求失败。若是工作状态,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。
DataChangeEventQueue 中维护着一个 DataChangeEventQueue 队列数组,数组中的每个元素是一个事件队列。当上文中的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步确定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该方法会源源不断地从队列中获取新增事件,并且进行分发。新增数据会由此添加进节点内,实现分片。
与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。
这里需要首先讲解几个相关数据结构。
Publisher是数据发布者信息。
public class Publisher extends BaseInfo {
private List<ServerDataBox> dataList;
private PublishType publishType = PublishType.NORMAL;
}
是从SOFARegistry本身出发而汇集的数据发布者信息,里面核心是 :
和
<租户 instanceId>构成,例如在 SOFARPC 的场景下,一个 dataInfoId 通常是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名称,00001 是租户 id。group 和 instance 主要是方便对服务数据做逻辑上的切分,使不同 group 和 instance 的服务数据在逻辑上完全独立。模型里有 group 和 instanceId 字段,但这里不额外列出来,读者只要理解 dataInfoId 的含义即可;具体代码如下:
public class Datum implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataId;
private String instanceId;
private String group;
private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
private long version;
private boolean containsUnPub = false;
}
DatumCache 是最新的Datum。
public class DatumCache {
@Autowired
private DatumStorage localDatumStorage;
}
具体存储是在LocalDatumStorage中完成。
public class LocalDatumStorage implements DatumStorage {
/**
* row: dataCenter
* column: dataInfoId
* value: datum
*/
protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>();
/**
* all datum index
*
* row: ip:port
* column: registerId
* value: publisher
*/
protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
@Autowired
private DataServerConfig dataServerConfig;
}
Operator 是每一步Datum对应的操作。
public class Operator {
private Long version;
private Long sourceVersion;
private Datum datum;
private DataSourceTypeEnum sourceType;
}
记录了所有的Datum操作。其中:
public class Acceptor {
private final String dataInfoId;
private final String dataCenter;
private int maxBufferSize;
static final int DEFAULT_DURATION_SECS = 30;
private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>();
private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
private final DatumCache datumCache;
}
总结下这几个数据结构的联系:
我们先回顾下 Datum 的来龙去脉。
首先,我们讲讲Session Server 内部如何获取Datum
在 Session Server 内部,Datum存储在 SessionCacheService 之中。
比如在 DataChangeFetchCloudTask 内部,可以这样获取 Datum。
private Map<String, Datum> getDatumsCache() {
Map<String, Datum> map = new HashMap<>();
NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
Collection<String> dataCenters = nodeManager.getDataCenters();
if (dataCenters != null) {
Collection<Key> keys = dataCenters.stream().
map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
new DatumKey(fetchDataInfoId, dataCenter))).
collect(Collectors.toList());
Map<Key, Value> values = null;
values = sessionCacheService.getValues(keys);
if (values != null) {
values.forEach((key, value) -> {
if (value != null && value.getPayload() != null) {
map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
}
});
}
}
return map;
}
Session Server 会向 Data Server 发送 PublishDataRequest 请求。
在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
@Autowired
private ForwardService forwardService;
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DatumLeaseManager datumLeaseManager;
@Autowired
private ThreadPoolExecutor publishProcessorExecutor;
@Override
public Object doHandle(Channel channel, PublishDataRequest request) {
Publisher publisher = Publisher.internPublisher(request.getPublisher());
if (forwardService.needForward()) {
CommonResponse response = new CommonResponse();
response.setSuccess(false);
response.setMessage("Request refused, Server status is not working");
return response;
}
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
if (publisher.getPublishType() != PublishType.TEMPORARY) {
String connectId = WordCache.getInstance().getWordCache(
publisher.getSourceAddress().getAddressString());
sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
connectId);
// record the renew timestamp
datumLeaseManager.renew(connectId);
}
return CommonResponse.buildSuccessResponse();
}
}
在 DataChangeEventCenter 的 onChange 函数中,会进行投放。
public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if (publisher.getPublishType() != PublishType.TEMPORARY) {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB_TEMP, datum));
}
}
在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。
在 DataChangeHandler 之中,会提取ChangeData,然后进行Notify。
public void start() {
DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
int queueCount = queues.length;
Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
Executor notifyExecutor = ExecutorFactory
.newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
for (int idx = 0; idx < queueCount; idx++) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
executor.execute(() -> {
while (true) {
final ChangeData changeData = dataChangeEventQueue.take();
notifyExecutor.execute(new ChangeNotifier(changeData, name));
}
});
}
}
具体如下:
+
Session Server | Data Server
|
|
|
|
+--------------------------+ PublishDataRequest +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+ | +------+-------------+
^ | |
| getValues | | onChange(Publisher)
| | v
| | +--------+--------------+
+---------+----------+ | | DataChangeEventCenter |
|sessionCacheService | | +--------+--------------+
+--------------------+ | |
| | Datum
| |
| v
| +--------+-------------+
| | DataChangeEventQueue |
| +--------+-------------+
| |
| |
| | ChangeData
| v
| +-------+-----------+
| | DataChangeHandler |
+ +-------------------+
于是我们接着进行 DataChangeHandler 处理。即总述中提到的:DataChangeHandler 会把这个事件变更信息:
下面我们从第一部分 :把这个事件变更信息变成Operator,放到AbstractAcceptorStore 出发,进行讲解日志操作。
即如图所示:
+
Session Server | Data Server
|
|
|
+
+--------------------------+ PublishDataRequest +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+ | +------+-------------+
^ | |
| getValues | | onChange(Publisher)
| | v
| | +--------+--------------+
+---------+----------+ | | DataChangeEventCenter |
|sessionCacheService | | +--------+--------------+
+--------------------+ | |
| | Datum
| |
| v
| +--------+-------------+
| | DataChangeEventQueue |
| +--------+-------------+
| |
| |
| | ChangeData
| v
| +-------+-----------+
| | DataChangeHandler |
| +-------+-----------+
| |
| |
| v
| +-------+---------+
| | ChangeNotifier |
| +-------+---------+
| |
| |
| v
| +----------+------------+
| | AbstractAcceptorStore |
| +-----------------------+
+
Acceptor的appendOperator谁来调用?在Notifier 里面有,比如:
public class BackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
以及另一个:
public class SnapshotBackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
AbstractAcceptorStore是日志存储,我们下面详细分析。
对于操作信息,提供了一个Bean来存储。
@Bean
public AcceptorStore localAcceptorStore() {
return new LocalAcceptorStore();
}
作用是在 storeServiceMap 中存放各种 AcceptorStore,目前只有LocalAcceptorStore 这一个。
public class StoreServiceFactory implements ApplicationContextAware {
private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();
/**
* get AcceptorStore by storeType
* @param storeType
* @return
*/
public static AcceptorStore getStoreService(String storeType) {
return storeServiceMap.get(storeType);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);
map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
}
}
AbstractAcceptorStore 是存储的基本实现类,几个基本成员是。
acceptors :是一个矩阵,按照dataCenter,dataInfoId维度来分类,存储了此维度下的Acceptor;就是说,针对每一个dataCenter,dataInfoId的组合,都有一个Acceptor,用来存储这下面的Operator。
notifyAcceptorsCache :是一个矩阵,按照dataCente,dataInfoId维度来分类,缓存了此维度下需要进一步处理的Acceptor;
delayQueue :配合notifyAcceptorsCache使用,针对notifyAcceptorsCache的每一个新acceptor,系统会添加一个消息进入queue,这个queue等延时到了,就会取出,并且从notifyAcceptorsCache取出对应的新acceptor进行相应处理;
按说应该是 cache 有东西,所以dequeue 时候就会取出来,但是如果这期间多放入了几个进入 Cache,原有cache 的 value 只是被替换而已,等时间到了,也会取出来。
notifyAcceptorsCache 也是按照 data center 来控制的,只有定期 removeCache。
public abstract class AbstractAcceptorStore implements AcceptorStore {
private static final int DEFAULT_MAX_BUFFER_SIZE = 30;
@Autowired
protected IMetaServerService metaServerService;
@Autowired
private Exchange boltExchange;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DataServerConnectionFactory dataServerConnectionFactory;
@Autowired
private DatumCache datumCache;
private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors = new ConcurrentHashMap<>();
private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap<>();
private DelayQueue<DelayItem<Acceptor>> delayQueue
}
具体如下图:
+-----------------------------+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
|[AbstractAcceptorStore] | |
| | +-> dataCenter +---+
| | | |
| acceptors +--------------->+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| | |
| notifyAcceptorsCache | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| + | +-> dataCenter +-->+
+-----------------------------+ |
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
|
|
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| +-> dataCenter +-->+
| | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
+-------------------->+
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
+-> dataCenter +---+
+--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
手机如图:
有一点需要说明,就是delayQueue 为何要延迟队列。这是由于SOFA的“秒级服务上下线通知“特性造成的。
因为要实现此特性,所以涉及到了一个连接敏感性问题,即在 SOFARegistry 里,所有 Client 都与 SessionServer 保持长连接,每条长连接都会有基于 bolt 的连接心跳,如果连接断开,Client 会马上重新建连,时刻保证 Client 与 SessionServer 之间有可靠的连接。
因为强烈的连接敏感性,所以导致如果只是网络问题导致连接断开,实际的进程并没有宕机,那么 Client 会马上重连 SessionServer 并重新注册所有服务数据。这种大量的短暂的服务下线后又重新上线会给用户带来困扰和麻烦。
因此在 DataServer 内部实现了数据延迟合并的功能,就是这里的DelayQueue。
addOperator的基本逻辑是:
在操作中,都是使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用。
@Override
public void addOperator(Operator operator) {
Datum datum = operator.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
if (acceptorMap == null) {
Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
if (acceptorMap == null) {
acceptorMap = newMap;
}
}
Acceptor existAcceptor = acceptorMap.get(dataInfoId);
if (existAcceptor == null) {
Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
dataCenter, datumCache);
existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
if (existAcceptor == null) {
existAcceptor = newAcceptor;
}
}
if (operator instanceof SnapshotOperator) {
//snapshot: clear the queue, Make other data retrieve the latest memory data
existAcceptor.clearBefore();
} else {
existAcceptor.appendOperator(operator);
}
//put cache
putCache(existAcceptor);
}
}
putCache的作用是:
这里也使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用。
private void putCache(Acceptor acceptor) {
String dataCenter = acceptor.getDataCenter();
String dataInfoId = acceptor.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
if (acceptorMap == null) {
Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
if (acceptorMap == null) {
acceptorMap = newMap;
}
}
Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
if (existAcceptor == null) {
addQueue(acceptor);
}
}
}
具体消费是在定期任务中完成。消费日志的目的就是同步日志操作给其他 DataServer。
Scheduler类是定期任务,会启动两个线程池定期调用AcceptorStore的函数。
public class Scheduler {
private final ScheduledExecutorService scheduler;
public final ExecutorService versionCheckExecutor;
private final ThreadPoolExecutor expireCheckExecutor;
@Autowired
private AcceptorStore localAcceptorStore;
public Scheduler() {
scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
}
public void startScheduler() {
scheduler.schedule(
new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
30, TimeUnit.SECONDS);
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
}
}
AbstractAcceptorStore中函数如下:
changeDataCheck 内部是一个while true,所以不需要再使用线程池。
changeDataCheck绑定在delayQueue上,如果有新消息,则会取出Acceptor,也从notifyAcceptorsCache取出Acceptor,调用notifyChange(acceptor);进行处理 。
@Override
public void changeDataCheck() {
while (true) {
try {
DelayItem<Acceptor> delayItem = delayQueue.take();
Acceptor acceptor = delayItem.getItem();
removeCache(acceptor); // compare and remove
} catch (InterruptedException e) {
break;
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
}
消费Cache用到的是removeCache。
private void removeCache(Acceptor acceptor) {
String dataCenter = acceptor.getDataCenter();
String dataInfoId = acceptor.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
if (acceptorMap != null) {
boolean result = acceptorMap.remove(dataInfoId, acceptor);
if (result) {
//data change notify
notifyChange(acceptor);
}
}
}
}