当前位置 博文首页 > 努力的小雨:Nacos(二)源码分析Nacos服务端注册示例流程

    努力的小雨:Nacos(二)源码分析Nacos服务端注册示例流程

    作者:努力的小雨 时间:2021-01-18 20:04

    nacos源码分析注册流程

      上回我们讲解了客户端配置好nacos后,是如何进行注册到服务器的,那我们今天来讲解一下服务器端接收到注册实例请求后会做怎么样的处理。

      首先还是把博主画的源码分析图例发一下,让大家对整个流程有一个大概的理解:图示流程地址:https://www.processon.com/view/link/5f7e895be0b34d0711f65178

      大家先把nacos服务器端的源码下载下来。在自己本地运行一下,nacos的git地址:https://github.com/alibaba/naco

      下载好后,我们还是看一下nacos的官方文档:nacos的各种请求地址url:https://nacos.io/zh-cn/docs/open-api.html,

      看源码之前大家应该先了解一下nacos主要具有哪些功能,这样我们看源码的时候才可以顺藤摸瓜,不会被源码绕晕,我们今天主要看nacos最主要的核心功能:

      分析-服务发现和服务运行状况检查:Nacos使服务易于注册自己并通过DNS或HTTP接口发现其他服务。Nacos还提供服务的实时运行状况检查,以防止向不正常的主机或服务实例发送请求。

      进入正题,我们上节看到了nacos客户端,也就是我们的微服务启动时会进行注册调用服务器的url地址:

     

     1 com.alibaba.nacos.naming.controllers.InstanceController
     2 /**
     3      * Register new instance.
     4      *
     5      * @param request http request
     6      * @return 'ok' if success
     7      * @throws Exception any error during register
     8      */
     9     @CanDistro
    10     @PostMapping
    11     @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    12     public String register(HttpServletRequest request) throws Exception {
    13         
    14         final String namespaceId = WebUtils
    15                 .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    16         final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    17         NamingUtils.checkServiceNameFormat(serviceName);
    18         //创建服务实例对象,只是单个对象
    19         final Instance instance = parseInstance(request);
    20         //开始注册实例
    21         serviceManager.registerInstance(namespaceId, serviceName, instance);
    22         return "ok";
    23     }

     

      这个方法只是单纯的实例对象包装了客户端发送过来的请求信息,具体是哪些参数,可以看看上一节。

     1 private Instance parseInstance(HttpServletRequest request) throws Exception {
     2         
     3         String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
     4         String app = WebUtils.optional(request, "app", "DEFAULT");
     5         Instance instance = getIpAddress(request);
     6         instance.setApp(app);
     7         instance.setServiceName(serviceName);
     8         // Generate simple instance id first. This value would be updated according to
     9         // INSTANCE_ID_GENERATOR.
    10         instance.setInstanceId(instance.generateInstanceId());
    11         instance.setLastBeat(System.currentTimeMillis());
    12         String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
    13         if (StringUtils.isNotEmpty(metadata)) {
    14             instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
    15         }
    16         
    17         instance.validate();
    18         
    19         return instance;
    20     }

      我们主要来看看下面的开始注册服务器实例的方法:

     1 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
     2         //创建服务
     3         createEmptyService(namespaceId, serviceName, instance.isEphemeral());
     4         //还是一样从map中获取service
     5         Service service = getService(namespaceId, serviceName);
     6         
     7         if (service == null) {
     8             throw new NacosException(NacosException.INVALID_PARAM,
     9                     "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    10         }
    11         //此时服务还是个空壳,没有任何服务器实例,这一步才加入进来
    12         addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    13     }
     1 
    //createEmptyService最后调用的是这里
    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) 2 throws NacosException { 3 //查找是否存在服务,有则返回,无则创建 4 Service service = getService(namespaceId, serviceName); 5 if (service == null) { 6 7 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); 8 service = new Service(); 9 service.setName(serviceName); 10 service.setNamespaceId(namespaceId); 11 service.setGroupName(NamingUtils.getGroupName(serviceName)); 12 // now validate the service. if failed, exception will be thrown 13 service.setLastModifiedMillis(System.currentTimeMillis()); 14 service.recalculateChecksum(); 15 if (cluster != null) { 16 cluster.setService(service); 17 service.getClusterMap().put(cluster.getName(), cluster); 18 } 19 service.validate(); 20 //主要看这个方法: 21 putServiceAndInit(service); 22 if (!local) { 23 addOrReplaceService(service); 24 } 25 } 26 }
     1 //这里需要注意这个方法,便于知道nacos是以什么形式存储服务的
     2      public Service getService(String namespaceId, String serviceName) {
     3      //serviceMap是一个map,这就知道了nacos官方介绍中数据模型中的意思了
     4      /**
     5      * Map(namespace, Map(group::serviceName, Service)).
     6      */
     7    //全局是这样定义的:private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
     8         if (serviceMap.get(namespaceId) == null) {
     9             return null;
    10         }
    11         return chooseServiceMap(namespaceId).get(serviceName);
    12     }
     1 private void putServiceAndInit(Service service) throws NacosException {
     2         //把新创建的服务放到map中
     3         putService(service);
     4         service.init();
     5         //添加监听,记住这个,后面有用
     6         consistencyService
     7                 .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
     8         consistencyService
     9                 .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    10         Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    11     }
    1 public void init() {
    2         //这个时候开始启动心跳检测的定时任务,可以自行看一下这个任务做了哪些事情:发现超时则置为不健康状态,并调用接口进行删除服务
    3         HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    4         for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
    5             entry.getValue().setService(this);
    6             entry.getValue().init();
    7         }
    8     }
     1 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
     2             throws NacosException {
     3         
     4         String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
     5         
     6         Service service = getService(namespaceId, serviceName);
     7         
     8         synchronized (service) {
     9         //获取所有实例
    10             List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    11             
    12             Instances instances = new Instances();
    13             instances.setInstanceList(instanceList);
    14             //注意这里默认的我们的实例都是临时的,都是存储在内存当中,所以找service实现类的时候找DistroConsistencyServiceImpl
    15             consistencyService.put(key, instances);
    16         }
    17     }
     1 //addIpAddresses最后会调用该方法
     2     public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
     3             throws NacosException {
     4         
     5         Datum datum = consistencyService
     6                 .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
     7         
     8         List<Instance> currentIPs = service.allIPs(ephemeral);
     9         Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    10         Set<String> currentInstanceIds = Sets.newHashSet();
    11         
    12         for (Instance instance : currentIPs) {
    13             currentInstances.put(instance.toIpAddr(), instance);
    14             currentInstanceIds.add(instance.getInstanceId());
    15         }
    16         
    17         Map<String, Instance> instanceMap;
    18         if (datum != null && null != datum.value) {
    19             instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    20         } else {
    21             instanceMap = new HashMap<>(ips.length);
    22         }
    23         
    24         for (Instance instance : ips) {
    25             if (!service.getClusterMap().containsKey(instance.getClusterName())) {
    26                 Cluster cluster = new Cluster(instance.getClusterName(), service);
    27                 //跟service一样,健康检查定时任务
    28                 cluster.init();
    29                 service.getClusterMap().put(instance.getClusterName(), cluster);
    30                 Loggers.SRV_LOG
    31                         .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
    32                                 instance.getClusterName(), instance.toJson());
    33             }
    34             //我们这次进来的action是add,并不是remove,所以不走这里
    35             if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
    36                 instanceMap.remove(instance.getDatumKey());
    37             } else {
    38                 //最后就是实例放到map中,以ip+port等信息为key,value为当前实例
    39                 instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
    40                 instanceMap.put(instance.getDatumKey(), instance);
    41             }
    42             
    43         }
    44         
    45         if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
    46             throw new IllegalArgumentException(
    47                     "ip list can not be empty, service: " + service.getName() + ", ip list: " +