当前位置 博文首页 > nacos统一配置中心源码解析

    nacos统一配置中心源码解析

    作者:努力工作的小码农 时间:2021-01-11 18:02

    配置文件想必大家都很熟悉,无论什么架构 都离不开配置,虽然spring boot已经大大简化了配置,但如果服务很多 环境也好几个,管理配置起来还是很麻烦,并且每次改完配置都需要重启服务,nacos config出现就解决了这些问题,它把配置统一放到服务进行管理,客户端这边进行有需要的获取,可以实时对配置进行修改和发布

    如何使用nacos config

    首先需要引入nacos config jar包

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        <version>2.2.1.RELEASE</version>
    </dependency>

     在nacos控制台提前配置需要的配置文件

     

     配置文件格式支持text、json、xml、yaml、html、properties,注意spring boot启动支持的配置文件格式只能为yaml或properties格式,其它格式的配置文件需要后续我们自己写代码去获取

    我们来看db.properties也是就数据库配置

     

     data id就是对应配置文件id,group为分组,配置内容就是properties格式的

    再来看bootstrap.properties如何引用这个配置文件

    spring.application.name=nacos-config
    server.port=20200
    
    #命名空间
    spring.cloud.nacos.config.namespace=${nacos_register_namingspace:0ca74337-8f42-49c3-aec9-32f268a937c4}
    #组名
    spring.cloud.nacos.config.group=${spring.application.name}
    #文件格式
    spring.cloud.nacos.config.file-extension=properties
    #nacos server地址
    spring.cloud.nacos.config.server-addr=localhost:8848
    
    #加载配置文件
    spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties
    spring.cloud.nacos.config.ext-config[1].data-id=db.properties
    spring.cloud.nacos.config.ext-config[2].data-id=mybatis-plus.properties

     

    注意 加载配置文件的分组名默认为DEFAULT_GROUP,如需指定分组 需要再指定

    spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties
    spring.cloud.nacos.config.ext-config[0].group=${spring.cloud.nacos.config.group}
    #或者
    spring.cloud.nacos.config.ext-config[1].data-id=undertow.properties
    spring.cloud.nacos.config.ext-config[1].group=MY_DEFAULT

     

    在这里解释下namespace和group的概念,namespace可以用来解决不同环境的问题,group是来管理配置分组的,它们的关系如下图

     spring boot启动容器如何加载nacos config配置文件

     

     

    这个配置作用是spring在启动之间准备上下文时会启用这个配置 来导入nacos相关配置文件,为后续容器启动做准备

    来看NacosConfigBootstrapConfiguration这个配置类

     

     

    NacosConfigProperties:对应我们上面在bootstrap.properties中对应的配置信息

    NacosConfigManager: 持有NacosConfigProperties和ConfigService,ConfigService用来查询 发布配置的相关接口

    NacosPropertySourceLocator:它实现了PropertySourceLocator ,spring boot启动时调用PropertySourceLocator.locate(env)用来加载配置信息,下面来看相关源码

    /******************************************NacosPropertySourceLocator******************************************/
    public PropertySource<?> locate(Environment env) {
        ConfigService configService = this.nacosConfigProperties.configServiceInstance();
        if (null == configService) {
            log.warn("no instance of config service found, can't load config from nacos");
            return null;
        } else {
            long timeout = (long)this.nacosConfigProperties.getTimeout();
            this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
            String name = this.nacosConfigProperties.getName();
            String dataIdPrefix = this.nacosConfigProperties.getPrefix();
            if (StringUtils.isEmpty(dataIdPrefix)) {
                dataIdPrefix = name;
            }
    
            if (StringUtils.isEmpty(dataIdPrefix)) {
                dataIdPrefix = env.getProperty("spring.application.name");
            }
    
            CompositePropertySource composite = new CompositePropertySource("NACOS");
            // 加载共享的配置文件 不同指定分组 默认DEFAULT_GROUP,对应配置spring.cloud.nacos.config.sharedDataids=shared_1.properties,shared_2.properties
            this.loadSharedConfiguration(composite);
            // 对应spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties的配置
            this.loadExtConfiguration(composite);
            // 加载当前应用配置
            this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);
            return composite;
        }
    }
    
    // 看一个加载实现即可 流程都差不多 具体实现在NacosPropertySourceBuilder.loadNacosData()方法完成
    /******************************************具体实现在NacosPropertySourceBuilder******************************************/
    private Properties loadNacosData(String dataId, String group, String fileExtension) {
            String data = null;
    
            try {
                // 向nacos server拉取配置文件
                data = this.configService.getConfig(dataId, group, this.timeout);
                if (!StringUtils.isEmpty(data)) {
                    log.info(String.format("Loading nacos data, dataId: '%s', group: '%s'", dataId, group));
                    // spring boot配置当然只支持properties和yaml文件格式
                    if (fileExtension.equalsIgnoreCase("properties")) {
                        Properties properties = new Properties();
                        properties.load(new StringReader(data));
                        return properties;
                    }
    
                    if (fileExtension.equalsIgnoreCase("yaml") || fileExtension.equalsIgnoreCase("yml")) {
                        YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
                        yamlFactory.setResources(new Resource[]{new ByteArrayResource(data.getBytes())});
                        return yamlFactory.getObject();
                    }
                }
            } catch (NacosException var6) {
                log.error("get data from Nacos error,dataId:{}, ", dataId, var6);
            } catch (Exception var7) {
                log.error("parse data from Nacos error,dataId:{},data:{},", new Object[]{dataId, data, var7});
            }
    
            return EMPTY_PROPERTIES;
        }

    至此我们在nacos上配置的properties和yaml文件都载入到spring配置文件中来了,后面可通过context.Environment.getProperty(propertyName)来获取相关配置信息

    配置如何随spring boot加载进来我们说完了,接来下来看修改完配置后如何实时刷新

    nacos config动态刷新

     当nacos config更新后,根据配置中的refresh属性来判断是否刷新配置,配置如下

    spring.cloud.nacos.config.ext-config[0].refresh=true

    首先sprin.factories 配置了EnableAutoConfiguration=NacosConfigAutoConfiguration,NacosConfigAutoConfiguration配置类会注入一个NacosContextRefresher,它首先监听了ApplicationReadyEvent,然后注册一个nacos listener用来监听nacos config配置修改后发布一个spring refreshEvent用来刷新配置和应用

    public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware
    
    public void onApplicationEvent(ApplicationReadyEvent event) {
        // 只注册一次
        if (this.ready.compareAndSet(false, true)) {
            this.registerNacosListenersForApplications();
        }
    }
        
    private void registerNacosListenersForApplications() {
        if (this.refreshProperties.isEnabled()) {
            Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
            while(var1.hasNext()) {
                NacosPropertySource nacosPropertySource = (NacosPropertySource)var1.next();
                // 对应刚才所说的配置 需要配置文件是否需要刷新
                if (nacosPropertySource.isRefreshable()) {
                    String dataId = nacosPropertySource.getDataId();
                    // 注册nacos监听器
                    this.registerNacosListener(nacosPropertySource.getGroup(), dataId);
                }
            }
        }
    
    }
        
    private void registerNacosListener(final String group, final String dataId) {
        Listener listener = (Listener)this.listenerMap.computeIfAbsent(dataId, (i) -> {
            return new Listener() {
                public void receiveConfigInfo(String configInfo) {
                    NacosContextRefresher.refreshCountIncrement();
                    String md5 = "";
                    if (!StringUtils.isEmpty(configInfo)) {
                        try {
                            MessageDigest md = MessageDigest.getInstance("MD5");
                            md5 = (new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))).toString(16);
                        } catch (UnsupportedEncodingException | NoSuchAlgorithmException var4) {
                            NacosContextRefresher.log.warn("[Nacos] unable to get md5 for dataId: " + dataId, var4);
                        }
                    }
                    // 添加刷新记录
                    NacosContextRefresher.this.refreshHistory.add(dataId, md5);
                    // 发布一个spring refreshEvent事件 对应监听器为RefreshEventListener 该监听器会完成配置的更新应用
                    NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
                    if (NacosContextRefresher.log.isDebugEnabled()) {
                        NacosContextRefresher.log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
                    }
    
                }
                public Executor getExecutor() {
                    return null;
                }
            };
        });
    
        try {
            this.configService.addListener(dataId, group, listener);
        } catch (NacosException var5) {
            var5.printStackTrace();
        }
    
    }

    我们说完了nacos config动态刷新,那么肯定有对应的动态监听,nacos config会监听nacos server上配置的更新状态

    nacos config动态监听

    一般来说客户端和服务端数据交互无非就两种方式

    pull:客户端主动从服务器拉取数据

    push: 由服务端主动向客户端推送数据

    这两种模式优缺点各不一样,pull模式需要考虑的是什么时候向服务端拉取数据 可能会存在数据延迟问题,而push模式需要客户端和服务端维护一个长连接 如果客户端较多会给服务端造成压力 但它的实时性会更好

    nacos采用的是pull模式,但它作了优化 可以看做是pull+push,客户端会轮询向服务端发出一个长连接请求,这个长连接最多30s就会超时,服务端收到客户端的请求会先判断当前是否有配置更新,有则立即返回

    如果没有服务端会将这个请求拿住“hold”29.5s加入队列,最后0.5s再检测配置文件无论有没有更新都进行正常返回,但等待的29.5s期间有配置更新可以提前结束并返回,下面会在源码中讲解具体怎么处理的

    nacos client处理

    动态监听的发起是在ConfigService的实现类NacosConfigService的构造方法中,它是对外nacos config api接口,在之前加载配置文件和NacosContextRefresher构造方法中都会获取或创建

     

     

     

     这里都会先判断是否已经创建了ConfigServer,没有则实例化一个NacosConfigService,来看它的构造函数

    /***************************************** NacosConfigService *****************************************/
    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        // 用来向nacos server发起请求的代理,这里用到了装饰模式
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        agent.start();
        // 客户端的一个工作类,agent作为它的构造传参 可猜想到里面肯定会做一些远程调用
        worker = new ClientWorker(agent, configFilterChainManager, properties);
    }
    
    /***************************************** ClientWorker *****************************************/
    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
    
        // Initialize the timeout parameter
    
        init(properties);
        // 这个线程池只有一个核心线程 用来执行checkConfigInfo()方法
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        // 其它需要执行线程的地方都交给这个线程池来处理
        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        
        // 执行一个调用checkConfigInfo()方法的周期性任务,每10ms执行一次,首次执行延迟1ms后执行
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

    NacosConfigService构造方法主要创建一个agent 它是用来向nacos server发出请求的,然后又创建了一个clientwoker,它的构造方法创建了两个线程池,第一个线程池只有一个核心线程,它会执行一个周期性任务只用来调用checkconfiginfo()方法,第二个线程是后续由需要执行线程的地方都交给它来执行,下面重点来看checkconfiginfo()方法

    public void checkConfigInfo() {
        // 分任务
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
    AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
    new HashMap<String, CacheData>());

     cacheMap:缓存着需要刷新的配置,它是在调用ConfigService 添加监听器方式时会放入,可以自定义监听配置刷新

    // 添加一个config监听器,用来监听dataId为ErrorCode,group为DEFAULT_GROUP的config
    configService.addListener("ErrorCode","DEFAULT_GROUP",new Listener() {
        @Override
        public Executor getExecutor() {
            return null;
        }
    
        @Override
        public void receiveConfigInfo(String s) { //当配置更新时会调用监听器该方法
            Map<String, Map<String, String>> map = JSON.parseObject(s, Map.class);
            // 根据自己的业务需要来处理
        }
    });

    这里采用了一个策略:将cacheMap中的数量以3000分一个组,分别创建一个LongPollingRunnable用来监听配置更新,这个LongPollingRunnable就是我们之前所说的长连接任务,来看这个长连接任务

    class LongPollingRunnable implements Runnable {
        private int taskId;
    
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }
    
        @Override
        public void run() {
    
            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            
    
    下一篇:没有了