当前位置 博文首页 > 词汇族:SpringCloud Alibaba Nacos注册中心源码浅析

    词汇族:SpringCloud Alibaba Nacos注册中心源码浅析

    作者:词汇族 时间:2021-01-25 16:56

    一、前置了解

    1.1 简介

    Nacos是一款阿里巴巴推出的一款微服务发现、配置管理框架。我们本次对将对它的服务注册发现功能进行简单源码分析。

    1.2 流程

    Nacos的分析分为两部分,一部分是我们的客户端(将自己注册到Nacos),另一部分是Nacos Server处理我们的注册请求等。

    1.3 要分析demo示例

    细节篇幅不多展示,大致如下

    1.3.1 客户端方面:

    引入了pom依赖

    <dependency>
       <groupId>com.alibaba.cloud</groupId>
       <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    

    并在application.yml配置好nacos地址(本地),我们的这个应用启动后会向Nacos服务端去注册。

    1.3.2 Nacos服务端方面

    我们从https://github.com/alibaba/nacos,即Nacos的官网github按tag拉下源码到本地。

    会有很多模块:address、api、client、cmdb、core、console等等。

    从console里的Nacos.java文件启动即可,它是个SpringBoot应用,启动后就可以处理注册等请求了。

    二、Nacos客户端源码流程

    2.1 自动配置触发逻辑入口

    打开客户端引入的依赖包的pom,只引入了spring-cloud-alibaba-nacos-discovery:

    SpringCloud系列都是通过spring.factories文件进行自动配置,我们打开spring-cloud-alibaba-nacos-discovery的spring.factories文件:

    去看看NacosDiscoveryAutoConfiguration这个名字的,名字可以看出它是和自动注册发现相关的配置类:

    @Configuration
    @EnableConfigurationProperties
    @ConditionalOnNacosDiscoveryEnabled
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
    		AutoServiceRegistrationAutoConfiguration.class })
    public class NacosDiscoveryAutoConfiguration {
    
    	@Bean
    	public NacosServiceRegistry nacosServiceRegistry(
    			NacosDiscoveryProperties nacosDiscoveryProperties) {
    		return new NacosServiceRegistry(nacosDiscoveryProperties);
    	}
    
    	@Bean
    	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
    	public NacosRegistration nacosRegistration(
    			NacosDiscoveryProperties nacosDiscoveryProperties,
    			ApplicationContext context) {
    		return new NacosRegistration(nacosDiscoveryProperties, context);
    	}
    
    	@Bean
    	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
    	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
    			NacosServiceRegistry registry,
    			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
    			NacosRegistration registration) {
    		return new NacosAutoServiceRegistration(registry,
    				autoServiceRegistrationProperties, registration);
    	}
    }
    

    注册了三个Bean,各个Bean名字也是见名知义,上面两个是服务与nacos注册逻辑本身,最后一个Auto的才是自动配置相关的,应该是入口。

    打开NacosAutoServiceRegistration源码,会发现它的父类AbstractAutoServiceRegistration实现了ApplicationListener接口,一般很多框架都是通过监听spring事件机制然后开始运作各自的源码逻辑,打开ApplicationListener接口的重写方法看看:

    public abstract class AbstractAutoServiceRegistration<R extends Registration>
    		implements AutoServiceRegistration, ApplicationContextAware,
    		ApplicationListener<WebServerInitializedEvent> {
    
    	//略***
    
    	@Override
    	@SuppressWarnings("deprecation")
    	public void onApplicationEvent(WebServerInitializedEvent event) {
    		bind(event);
    	}
    

    注册入口应该就是这里,bind方法开始执行nacos自己的逻辑,bind方法:

    	public void bind(WebServerInitializedEvent event) {
    		ApplicationContext context = event.getApplicationContext();
    		//略
    		this.port.compareAndSet(0, event.getWebServer().getPort());
    		this.start();
    	}
    

    start:

    public void start() {
       //略
       if (!this.running.get()) {
          this.context.publishEvent(
                new InstancePreRegisteredEvent(this, getRegistration()));
          register();
          if (shouldRegisterManagement()) {
             registerManagement();
          }
          this.context.publishEvent(
                new InstanceRegisteredEvent<>(this, getConfiguration()));
          this.running.compareAndSet(false, true);
       }
    
    }
    

    这里就可以发现自动配置触发的注册方法了,register();,后续就是如何注册了!

    2.2 客户端注册逻辑 register()

    不断跟进刚刚的多个register()重名方法,会来到真正的register方法,如下:

    	public void register(Registration registration) {
    		//略
    		String serviceId = registration.getServiceId();
    		Instance instance = getNacosInstanceFromRegistration(registration);
    
    		try {
    			namingService.registerInstance(serviceId, instance);
    			//略
    		}
    		catch (Exception e) {
    			//略
    		}
    	}
    

    逻辑比较直接,主要是获取服务id(比如服务名啥的)+这个实例的具体信息(封装成Instance),最后通过namingService去注册,跟进注册:

        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    		//判断是否是临时节点
            if (instance.isEphemeral()) {
                BeatInfo beatInfo = new BeatInfo();
                beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
                beatInfo.setIp(instance.getIp());
                beatInfo.setPort(instance.getPort());
                beatInfo.setCluster(instance.getClusterName());
                beatInfo.setWeight(instance.getWeight());
                beatInfo.setMetadata(instance.getMetadata());
                beatInfo.setScheduled(false);
                //略
                beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
            }
    
            serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
        }
    

    心跳机制

    其实这里可以看出如果不是临时节点是不需要发送心跳消息的,这里心跳机制是通过beatReactor.addBeatInfo里内部的一个定时任务去实现的,核心就是内部的:

                long result = serverProxy.sendBeat(beatInfo);
                long nextTime = result > 0 ? result : beatInfo.getPeriod();
                executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    

    通过线程池跑任务,定时访问Nacos服务端的/instance/beat接口,发送HTTP请求 表示自己活着

    继续看注册

    刚刚registerInstance里的

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    

    继续跟进:

        public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
            NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
                namespaceId, serviceName, instance);
    
            final Map<String, String> params = new HashMap<String, String>(9);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put(CommonParams.GROUP_NAME, groupName);
            params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
            params.put("ip", instance.getIp());
            params.put("port", String.valueOf(instance.getPort()));
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("enable", String.valueOf(instance.isEnabled()));
            params.put("healthy", String.valueOf(instance.isHealthy()));
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    
            reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    
        }
    

    其实就是准备参数准备发http请求了哈,注册的接口地址是NACOS_URL_INSTANCE,也就是:/instance的post请求

    客户端注册总结:

    1.通过SpringCloud一贯使用的spring.factories文件进行自动配置

    2.自动配置类将自己注入IOC容器,并实现了ApplicationListener接口,在web容器初始化事件发布之后加载自己的逻辑

    3.加载注册逻辑,通过发送http请求到/instance接口将本身的信息发给Nacos服务端,以及心跳任务定时发送,告诉自己活着

    三、Nacos服务端处理注册

    上面有说到nacos客户端注册是通过发送http请求到/instance接口。我们看看/instance接口做了什么。Nacos服务端的controller源码如下:

    @RestController
    @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
    public class InstanceController {
        //...略
    
        @CanDistro
        @PostMapping
        public String register(HttpServletRequest request) throws Exception {
    
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    
            serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
            return "ok";
        }
    }
    

    跟进里面的serviceManager.registerInstance注册方法:

        public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
            Service service = getService(namespaceId, serviceName);
    
            if (service == null) {
                throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
    
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }
    

    createEmptyService是要在放入instance实例(即注册的那个节点信息)之前确保service存在,不存在则创建一个,之后就可以通过getService取出来了。最后再通过addInstance继续注册

    看看createEmptyService是怎么创建的,什么结构?

    3.1 createEmptyService创建保证Service

    通过断点不断跟进createEmptyService方法源码,会来到ServiceManager.java的putService方法:

        public void putService(Service service) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                synchronized (putServiceLock) {
                    if (!serviceMap.containsKey(service.getNamespaceId())) {
                        serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                    }
                }
            }
            serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
        }
    

    最后是放到到一个serviceMap的Map结构去了,如下:

    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
    

    双层Map,内部含义其实是:

    Map<namespace,Map<group:serviceNmae,Service>>//第一层key是namespace,第二层里才是name和service
    

    实际上放入map之后,还会把service初始化,调用init方法,内部会执行健康检查:

    1.某个实例超过15秒没收到心跳则把它的healthy属性设置为false

    2.继续超过30秒没收到心跳就会直接剔除这个实例

    3.2 addInstance注册

    回到前面的注册地方,最后保证了有Service之后继续走主逻辑,addInstance:

    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    

    跟进

        public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
    
            String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
            Service service = getService(namespaceId, serviceName);
    
            synchronized (service) {
                List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    
                Instances instances = new Instances();
                instances.setInstanceList(instanceList);
    
                consistencyService.put(key, instances);
            }
        }
    

    最后是执行consistencyService.put(key, instances);注册,这里会有两个实现DistroConsistencyServiceImpl和RaftConsistencyServiceImpl,分别对应着注册中心的AP实现和CP实现,一个基于内存优先可用性(A),一个基于磁盘优先一致性(C),是CAP理论里的取舍。CAP具体可看:https://baike.baidu.com/item/CAP原则/5712863?fr=aladdin

    四、Nacos服务端AP模式实现:DistroConsistencyServiceImpl

    Nacos的AP模式采用distro协议,Distro是阿里的自创协议,Distro 协议被定位为 临时数据的一致性协议

    继续看之前的源码,注册最后是来到:

    consistencyService.put(key, instances);
    

    跟进:

        @Override
        public void put(String key, Record value) throws NacosException {
            //1.将注册实例更新到内存注册表
            onPut(key, value);
            //2.同步实例信息到Nacos Server集群其它节点
            taskDispatcher.addTask(key);
        }
    

    如加的注释这样,分了两步实现

    4.1 onPut将注册实例更新到内存注册表

    跟进onPut源码:

        public void onPut(String key, Record value) {
    
            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
                //封装数据节点保存
                Datum<Instances> datum = new Datum<>();
                datum.value = (Instances) value;
                datum.key = key;
                datum.timestamp.incrementAndGet();
                dataStore.put(key, datum);
            }
    
            if (!listeners.containsKey(key)) {
                return;
            }
    		//只要传key就拿到上面的节点去更新了
            notifier.addTask(key, ApplyAction.CHANGE);
        }
    

    这里也看到了最后notifier.addTask运用了生产者消费者的思想,里面是添加一个任务到阻塞队列中去,等着处理,因为这些操作本身不需要立即返回成功,对提升性能有很大帮助。

    传了ApplyAction.CHANGE类型,我们跟进notifier.addTask,会发现是在Notifier内部类里,它是多线程Runnable的实现类,逻辑都在run方法里,等着对应的线程调起执行。

    public class Notifier implements Runnable {
    		//略部分代码
            @Override
            public void run() {
    
                while (true) {
                    try {
                        //略部分代码
                        for (RecordListener listener : listeners.get(datumKey)) {
                            count++;
                            try {
                                if (action == ApplyAction.CHANGE) {
                                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                                    continue;
                                }
                                if (action == ApplyAction.DELETE) {
                                    listener.onDelete(datumKey);
                                    continue;
                                }
                            } catch (Throwable e) {
                                //略
                            }
                        }
    
                        //略
                    } catch (Throwable e) {
                        //略
                    }
                }
            }
        }
    

    判断是刚才我们传的ApplyAction.CHANGE会去执行listener.onChange,这里有多个实现,我们可以通过打断点进入的是com.alibaba.nacos.naming.core.Service类中

        public void onChange(String key, Instances value) throws Exception {
            //略
            updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
            recalculateChecksum();
        }
    

    核心就是updateIPs:

        public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
            Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
            for (String clusterName : clusterMap.keySet()) {
                ipMap.put(clusterName, new ArrayList<>());
            }
    
            for (Instance instance : instances) {
                try {
                    if (instance == null) {
                        Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                        continue;
                    }
    
                    if (StringUtils.isEmpty(instance.getClusterName())) {
                        instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                    }
    
                    if (!clusterMap.containsKey(instance.getClusterName())) {
                        Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJSON());
                        Cluster cluster = new Cluster(instance.getClusterName(), this);
                        cluster.init();
                        getClusterMap().put(instance.getClusterName(), cluster);
                    }
    
                    List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                    if (clusterIPs == null) {
                        clusterIPs = new LinkedList<>();
                        ipMap.put(instance.getClusterName(), clusterIPs);
                    }
    
                    clusterIPs.add(instance);
                } catch (Exception e) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
                }
            }
    
            for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
                //make every ip mine
                List<Instance> entryIPs = entry.getValue();
                clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
            }
    
            setLastModifiedMillis(System.currentTimeMillis());
            getPushService().serviceChanged(this);
            StringBuilder stringBuilder = new StringBuilder();
    
            for (Instance instance : allIPs()) {
                stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
            }
    
        }
    

    为了防止读写并发冲突,方法第一句直接创建了一个新的HashMap,然后去操作新的HashMap,操作完了之后再去替换老的Map数据,CopyOnWrite的思想。

    Eureka防止读写冲突用的是多级缓存结构,多级缓存定时同步,客户端感知及时性不如Nacos。

    最后还发布了服务变化事件

    4.2 同步实例信息到Nacos Server集群其它节点

    回到之前的代码,put方法中是taskDispatcher.addTask(key);进行同步信息到集群其它节点,跟进代码:

            public void addTask(String key) {
                queue.offer(key);
            }
    

    就是把节点的key加入到阻塞队列中了,等待之后执行,这是内部类TaskScheduler里的方法,看看整体:

    public class TaskScheduler implements Runnable {
    
            //略
    
            public void addTask(String key) {
                queue.offer(key);
            }
    
            @Override
            public void run() {
    
                List<String> keys = new ArrayList<>();
                while (true) {
    
                    try {
    
                        String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                            TimeUnit.MILLISECONDS);
    
                        if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                            Loggers.DISTRO.debug("got key: {}", key);
                        }
    
                        if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                            continue;
                        }
    
                        if (StringUtils.isBlank(key)) {
                            continue;
                        }
    
                        if (dataSize == 0) {
                            keys = new ArrayList<>();
                        }
    
                        keys.add(key);
                        dataSize++;
    
                        if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                            (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
    
                            for (Server member : dataSyncer.getServers()) {
                                if (NetUtils.localServer().equals(member.getKey())) {
                                    continue;
                                }
                                SyncTask syncTask = new SyncTask();
                                syncTask.setKeys(keys);
                                syncTask.setTargetServer(member.getKey());
    
                                if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                    Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                                }
    
                                dataSyncer.submit(syncTask, 0);
                            }
                            lastDispatchTime = System.currentTimeMillis();
                            dataSize = 0;
                        }
    
                    } catch (Exception e) {
                        Loggers.DISTRO.error("dispatch sync task failed.", e);
                    }
                }
            }
        }
    

    可以看到if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
    (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod())

    达到一定是数量或时间差,就开始提交批量发送同步任务。逻辑在同步类DataSyncer的run方法里,里面就是往/distro/datum接口发送数据同步。

    五、Nacos服务端CP模式实现:RaftConsistencyServiceImpl

    Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl具体就不展开了,这里只简单介绍一下大概实现方式:

    1.是阿里自己实现的CP模式的简单raft协议

    2.判断自己是Leader节点的话才执行逻辑,否则转发给Leader

    3.同步更新实例数据到磁盘,异步更新内存注册表

    4.用CountDownLatch实现,必须集群半数以上节点写入成功才返回客户端成功

    5.成功后调用/raft/datum/commit接口提交

    六、服务发现

    客户端通过调用/instance/list接口获取服务端map相关数据,并且会有个延时执行的定时任务去不断更新最新服务数据

    下一篇:没有了