当前位置 博文首页 > 等你归去来:ES系列(三):网络通信模块解析

    等你归去来:ES系列(三):网络通信模块解析

    作者:等你归去来 时间:2021-05-02 18:12

      ES是一个分布式搜索引擎,其除了用户提供必要的通信服务外,集群间也必须保持紧密的通信联系,才能在必要的时候给出正确的结果。其则必然涉及到各种繁多且要求高的通信场景,那么如何实现高性能的通信,则是其必须要考虑的问题。

      今天,我们就以es的transportService的实现为窥点,观察es的高性能的通信模块实现吧。

     

    1. 前言概要

      谈到高性能的网络通信,相信很多人都明白大概是什么道理,或者看过我之前的一些文章,也必然清楚其核心原理。总结来说,其实就是利用io多路复用技术,充分利用带宽,从而达到高性能的目标。

      而具体到java语言上来,能聊的点也许就更少了。比如nio, netty, akka... 

      所以,其实本文所讨论的目标,看起来没有那么神秘,也没必要神秘。我们仅站在研究ES实现细节的方向,去深入理解一些实际的问题,目的仅是为了解惑。

      

    2. transportService的初始化

      es中几乎所有的模块,都是在服务启动的时候进行初始化的,这是自然。一来是启动时缓慢一点是可以的,二来是启动的时候有非常多的上下文信息可用非常方便各种初始化,三来是能够提前发现问题而不是运行了很久之后才发现不可解决的问题。

      而transportService是在创建Node时进行初始化的。

        // org.elasticsearch.node.Node#start
        /**
         * Constructs a node
         *
         * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
         * @param classpathPlugins           the plugins to be loaded from the classpath
         * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
         *                                   test framework for tests that rely on being able to set private settings
         */
        protected Node(final Environment initialEnvironment,
                       Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
            ...
            try {
                ...
                new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
                final Transport transport = networkModule.getTransportSupplier().get();
                Set<String> taskHeaders = Stream.concat(
                    pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                    Stream.of(Task.X_OPAQUE_ID)
                ).collect(Collectors.toSet());
                // 创建 transportService
                final TransportService transportService = newTransportService(settings, transport, threadPool,
                    networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
                final GatewayMetaState gatewayMetaState = new GatewayMetaState();
                ...
            } catch (IOException ex) {
                throw new ElasticsearchException("failed to bind service", ex);
            } finally {
                if (!success) {
                    IOUtils.closeWhileHandlingException(resourcesToClose);
                }
            }
        }

      即要初始化 transportService , 重点就要看 newTransportService() 如何处理了。在当然了,要进行这个方法的调用,它其实比较多的前提,即各种入参的初始化。重要一点的就是:线程池的创建,transport 的初始化。线程池咱们略去不说,主要是它会在非常多的地方用到,单独在这里讲也不合适。那么就主要看看 transport 是如何初始化的即可。

     

    2.1. NetworkModule 的实例化

      从上面的实现中,我们看到要获取 transport 实例,还需要先拿到 networkModule ,这又是如何初始化的呢?

                // 在 Node() 的构造方法中,直接new出来的 。
                final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                    threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                    networkService, restController, clusterService.getClusterSettings());
        // org.elasticsearch.common.network.NetworkModule#NetworkModule
        /**
         * Creates a network module that custom networking classes can be plugged into.
         * @param settings The settings for the node
         * @param transportClient True if only transport classes should be allowed to be registered, false otherwise.
         */
        public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                             BigArrays bigArrays,
                             PageCacheRecycler pageCacheRecycler,
                             CircuitBreakerService circuitBreakerService,
                             NamedWriteableRegistry namedWriteableRegistry,
                             NamedXContentRegistry xContentRegistry,
                             NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                             ClusterSettings clusterSettings) {
            this.settings = settings;
            this.transportClient = transportClient;
            // 这里的 plugin 可能有多个,如 XPackPlugin, Netty4Plugin, Security, VotingOnlyNodePlugin
            for (NetworkPlugin plugin : plugins) {
                Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                    pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
                if (transportClient == false) {
                    for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                        // 向 transportHttpFactories 中注册相关信息
                        registerHttpTransport(entry.getKey(), entry.getValue());
                    }
                }
                Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                    circuitBreakerService, namedWriteableRegistry, networkService);
                for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                    // 向 transportFactories 中注册相关信息
                    registerTransport(entry.getKey(), entry.getValue());
                }
                List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                    threadPool.getThreadContext());
                for (TransportInterceptor interceptor : transportInterceptors) {
                    // 向 transportIntercetors 中注册拦截器
                    registerTransportInterceptor(interceptor);
                }
            }
        }

      可见,整个 NetworkModule 的工作,重点在于注册相关的组件到自身,以便将来取用。这个容器则有可能是 map 形式的,也有可能是 list 形式的。总之,能够起到注册的作用即可。感兴趣的同学可以展开以下查看更多注册实现:

        private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
        private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
        private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();
    
        /** Adds an http transport implementation that can be selected by setting {@link #HTTP_TYPE_KEY}. */
        // TODO: we need another name than "http transport"....so confusing with transportClient...
        private void registerHttpTransport(String key, Supplier<HttpServerTransport> factory) {
            if (transportClient) {
                throw new IllegalArgumentException("Cannot register http transport " + key + " for transport client");
            }
            if (transportHttpFactories.putIfAbsent(key, factory) != null) {
                throw new IllegalArgumentException("transport for name: " + key + " is already registered");
            }
        }
        /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
        private void registerTransport(String key, Supplier<Transport> factory) {
            if (transportFactories.putIfAbsent(key, factory) != null) {
                throw new IllegalArgumentException("transport for name: " + key + " is already registered");
            }
        }
    
        /**
         * Registers a new {@link TransportInterceptor}
         */
        private void registerTransportInterceptor(TransportInterceptor interceptor) {
            this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
        }
    View Code

      当然了,还有很重要的东西,就是 NetworkPlugin 中的方法的实现。因为所有的注册来源,都是基于这些方法的。这也就为我们的 plugin 提供了方便的入口,我们先来看看ES都会 NetworkPlugin 提供了哪些入口:

    // org.elasticsearch.plugins
    /**
     * Plugin for extending network and transport related classes
     */
    public interface NetworkPlugin {
    
        /**
         * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
         * transport (inter-node) requests. This must not return <code>null</code>
         *
         * @param namedWriteableRegistry registry of all named writeables registered
         * @param threadContext a {@link ThreadContext} of the current nodes or clients {@link ThreadPool} that can be used to set additional
         *                      headers in the interceptors
         */
        default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
                                                                    ThreadContext threadContext) {
            return Collections.emptyList();
        }
    
        /**
         * Returns a map of {@link Transport} suppliers.
         * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
         */
        default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                               CircuitBreakerService circuitBreakerService,
                                                               NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
            return Collections.emptyMap();
        }
    
        /**
         * Returns a map of {@link HttpServerTransport} suppliers.
         * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
         */
        default Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                             PageCacheRecycler pageCacheRecycler,
                                                                             CircuitBreakerService circuitBreakerService,
                                                                             NamedXContentRegistry xContentRegistry,
                                                                             NetworkService networkService,
                                                                             HttpServerTransport.Dispatcher dispatcher,
                                                                             ClusterSettings clusterSettings) {
            return Collections.emptyMap();
        }
    }

      方法不多,刚好够前面的 NetworkModule 初始化场景用。且都有默认实现方法,即如果相应plugin不关注这块东西,就直接忽略即可。

      因为我们是冲着es的高性能服务来的,所以有必要看看netty的相关实现。netty中,将实现了两个获取 transport 的方法,而拦截器都未做处理,因为业务处理框架有需要。

        // org.elasticsearch.transport.Netty4Plugin#getTransports
        @Override
        public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                              CircuitBreakerService circuitBreakerService,
                                                              NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
            return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
                networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
        }
    
        @Override
        public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                            PageCacheRecycler pageCacheRecycler,
                                                                            CircuitBreakerService circuitBreakerService,
                                                                            NamedXContentRegistry xContentRegistry,
                                                                            NetworkService networkService,
                                                                            HttpServerTransport.Dispatcher dispatcher,
                                                                            ClusterSettings clusterSettings) {
            return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
                () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
                    clusterSettings, getSharedGroupFactory(settings)));
        }

      无它,返回两个Netty相关的服务实例,备用。

     

    2.2. transportService 的实例化

      上一节只是讲一些必要条件,本节才讲真正的初始化的逻辑。

        // org.elasticsearch.node.Node#newTransportService
        protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
                                                       TransportInterceptor interceptor,
                                                       Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
                                                       ClusterSettings clusterSettings, Set<String> taskHeaders) {
            return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
        }

      没有多余的,TransportService 就是一个完整的实现类。看一下其构建方法即可。

        // org.elasticsearch.transport.TransportService#TransportService
        /**
         * Build the service.
         *
         * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
         *    updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
         */
        public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                                Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                                Set<String> taskHeaders) {
            // ClusterConnectionManager 重要
            this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
                new ClusterConnectionManager(settings, transport));
        }
        
        public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                                Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                                Set<String> taskHeaders, ConnectionManager connectionManager) {
    
            final boolean isTransportClient = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));
    
            // If we are a transport client then we skip the check that the remote node has a compatible build hash
            this.requireCompatibleBuild = isTransportClient == false;
    
            // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
            this.validateConnections = isTransportClient == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
            // 保存各配置及服务上下文
            this.transport = transport;
            transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
            this.threadPool = threadPool;
            this.localNodeFactory = localNodeFactory;
            this.connectionManager = connectionManager;
            this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
            setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
            setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
            tracerLog = Loggers.getLogger(logger, ".tracer");
            // 任务管理器
            taskManager = createTaskManager(settings, threadPool, taskHeaders);
            // 拦截器获取
            this.interceptor = transportInterceptor;
            this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
            this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
            // 集群服务管理
            remoteClusterService =