当前位置 博文首页 > 等你归去来:ES系列(四):http请求分发框架解析

    等你归去来:ES系列(四):http请求分发框架解析

    作者:等你归去来 时间:2021-05-05 18:14

      上一篇讲解了es的网络通信模块实现过程,大致明白其工作原理。再总结一下,就是基于netty编程范式,形成es通信基础。从而,最终我们得到几个重要的handler: Netty4HttpPipeliningHandler/Netty4HttpRequestHandler/Netty4MessageChannelHandler...

      实际上,这种范式类的东西,没必要花太多精力去关注。因为这对于我们理解一个系统业务,可能不是那么重要。(话多了,实际上es中核心lucene难道不值得花精力关注?)但现在,我们可以进一步花精力,看看es都是如何处理http请求的,这样至少,对于之后的每个细节实现的研究是有前提基础的。

     

    1. 从handler开始

      因为是处理http请求,自然是从网络入口开始。而因为使用netty, 则必然显现在一个handler上。即上篇中看到的 Netty4HttpRequestHandler ... 我们再来回顾下。

            // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#HttpChannelHandler
            protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
                this.transport = transport;
                this.handlingSettings = handlingSettings;
                this.byteBufSizer =  new NettyByteBufSizer();
                this.requestCreator =  new Netty4HttpRequestCreator();
                this.requestHandler = new Netty4HttpRequestHandler(transport);
                this.responseCreator = new Netty4HttpResponseCreator();
            }
            // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel
            @Override
            protected void initChannel(Channel ch) throws Exception {
                Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
                // 此处 handler 配置的相当多, 自然是因其功能复杂的原因
                ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
                ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
                ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
                final HttpRequestDecoder decoder = new HttpRequestDecoder(
                    handlingSettings.getMaxInitialLineLength(),
                    handlingSettings.getMaxHeaderSize(),
                    handlingSettings.getMaxChunkSize());
                decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
                ch.pipeline().addLast("decoder", decoder);
                ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
                ch.pipeline().addLast("encoder", new HttpResponseEncoder());
                final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
                aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
                ch.pipeline().addLast("aggregator", aggregator);
                if (handlingSettings.isCompression()) {
                    ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
                }
                ch.pipeline().addLast("request_creator", requestCreator);
                ch.pipeline().addLast("response_creator", responseCreator);
                // 最后两个处理器, pipelineing, handler, 则处理真正的业务
                ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
                // Netty4HttpRequestHandler
                ch.pipeline().addLast("handler", requestHandler);
                transport.serverAcceptedChannel(nettyHttpChannel);
            }

      抛却 pipelineing 不说,就只剩下 handler 这个核心处理器了。而 handler 我们看到它是 Netty4HttpRequestHandler 的一个实例,而其构造方法中传入一个 transport, 这里面保存了相当多的上下文信息,包括配置,如何分发等等功能,细节无须多说。

      但其中有一个 dispatcher 需要说下,因为这会影响到后续的请求如何处理的问题。

      实际上,在node初始化时,就会创建一个 dispatcher, 是一个 RestController 实例,而这会在后其他组件进行注册时使用到,因为请求分发是由 RestController 控制的。它的初始化过程如下:

                // org.elasticsearch.node.Node#Node
                protected Node(final Environment initialEnvironment,
                       Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
                    ...
                    
                ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                    settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                    threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);
                modules.add(actionModule);
                // restController 在 ActionModule 中初始化, 被 networkModule 使用
                final RestController restController = actionModule.getRestController();
                final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                    threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                    networkService, restController, clusterService.getClusterSettings());
                Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders =
                    pluginsService.filterPlugins(Plugin.class).stream()
                        .map(Plugin::getIndexTemplateMetadataUpgrader)
                        .collect(Collectors.toList());
                        
                    ...
    
                }
                
        // org.elasticsearch.action.ActionModule#ActionModule
        public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
                            IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
                            ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
                            CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices) {
            this.transportClient = transportClient;
            this.settings = settings;
            this.indexNameExpressionResolver = indexNameExpressionResolver;
            this.indexScopedSettings = indexScopedSettings;
            this.clusterSettings = clusterSettings;
            this.settingsFilter = settingsFilter;
            this.actionPlugins = actionPlugins;
            this.threadPool = threadPool;
            actions = setupActions(actionPlugins);
            actionFilters = setupActionFilters(actionPlugins);
            autoCreateIndex = transportClient
                ? null
                : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
            destructiveOperations = new DestructiveOperations(settings, clusterSettings);
            Set<RestHeaderDefinition> headers = Stream.concat(
                actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
                Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
            ).collect(Collectors.toSet());
            UnaryOperator<RestHandler> restWrapper = null;
            for (ActionPlugin plugin : actionPlugins) {
                UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
                if (newRestWrapper != null) {
                    logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
                    if (restWrapper != null) {
                        throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
                    }
                    restWrapper = newRestWrapper;
                }
            }
            mappingRequestValidators = new RequestValidators<>(
                actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList()));
            indicesAliasesRequestRequestValidators = new RequestValidators<>(
                    actionPlugins.stream().flatMap(p -> p.indicesAliasesRequestValidators().stream()).collect(Collectors.toList()));
    
            if (transportClient) {
                restController = null;
            } else {
                // 直接实例化 restController
                restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
            }
        }

      实例化好 dispatcher 后,需要接受其他组件的注册行为。(接受注册的有两个组件: RestController 和 Transport$RequestHandlers)

      其中,RequestHandlers 的地址格式如: internal:transport/handshake, cluster:admin/snapshot/status[nodes], indices:admin/close[s][p] ;  而 RestController 的地址格式如: /{index}/_doc/{id}, /_nodes/{nodeId}/{metrics}, /_cluster/allocation/explain, /{index}/_alias/{name}

     

    1.1. RequestHandlers 的注册

      先来看看 RequestHandlers 的注册过程,这些注册更多的是用于集群内部通信使用:

        // org.elasticsearch.transport.TransportService#registerRequestHandler
        /**
         * Registers a new request handler
         *
         * @param action                The action the request handler is associated with
         * @param requestReader               The request class that will be used to construct new instances for streaming
         * @param executor              The executor the request handling will be executed on
         * @param forceExecution        Force execution on the executor queue and never reject it
         * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
         * @param handler               The handler itself that implements the request handling
         */
        public <Request extends TransportRequest> void registerRequestHandler(String action,
                                                                              String executor, boolean forceExecution,
                                                                              boolean canTripCircuitBreaker,
                                                                              Writeable.Reader<Request> requestReader,
                                                                              TransportRequestHandler<Request> handler) {
            validateActionName(action);
            handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
            RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
                action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
            transport.registerRequestHandler(reg);
        }
        // org.elasticsearch.transport.Transport#registerRequestHandler
        /**
         * Registers a new request handler
         */
        default <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
            getRequestHandlers().registerHandler(reg);
        }
            
            synchronized <Request extends TransportRequest> void registerHandler(RequestHandlerRegistry<Request> reg) {
                if (requestHandlers.containsKey(reg.getAction())) {
                    throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
                }
                requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
            }
    
        // 注册样例1
        // org.elasticsearch.transport.TransportService#TransportService
        public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                                Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                                Set<String> taskHeaders, ConnectionManager connectionManager) {
            ...
            registerRequestHandler(
                HANDSHAKE_ACTION_NAME,
                ThreadPool.Names.SAME,
                false, false,
                HandshakeRequest::new,
                (request, channel, task) -> channel.sendResponse(
                    new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));
            ...
        }
        // 注册样例2
        // org.elasticsearch.repositories.VerifyNodeRepositoryAction#VerifyNodeRepositoryAction
        public VerifyNodeRepositoryAction(TransportService transportService, ClusterService clusterService,
                                          RepositoriesService repositoriesService) {
            this.transportService = transportService;
            this.clusterService = clusterService;
            this.repositoriesService = repositoriesService;
            transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SNAPSHOT, VerifyNodeRepositoryRequest::new,
                new VerifyNodeRepositoryRequestHandler());
        }
        // 注册样例3
        // org.elasticsearch.discovery.PeerFinder#PeerFinder
        public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
                          ConfiguredHostsResolver configuredHostsResolver) {
            this.settings = settings;
            findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
            requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
            this.transportService = transportService;
            this.transportAddressConnector = transportAddressConnector;
            this.configuredHostsResolver = configuredHostsResolver;
    
            transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false,
                PeersRequest::new,
                (request, channel, task) -> channel.sendResponse(handlePeersRequest(request)));
    
            transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false,
                UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler());
        }

      整个 RequestHandlers 的注册,使用一个同步锁保证线程安全性,且使用一个类似写时复制的一个机制,保证读的安全性。然后,最终使用一个简单的 HashMap 包含handlers与path. 性能与安全性同时兼顾。

     

    1.2. RestController 的注册

      RestController 的注册则稍有不同,它主要用于处理客户端的请求如: /{index}/_doc/{id}

        // org.elasticsearch.rest.RestController#registerHandler
        /**
         * Registers a REST handler with the controller. The REST handler declares the {@code method}
         * and {@code path} combinations.
         */
        public void registerHandler(final RestHandler restHandler) {
            restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler));
            restHandler.deprecatedRoutes().forEach(route ->
                registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage()));
            restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(),
                restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath()));
        }
    
        /**
         * Registers a REST handler to be executed when one of the provided methods and path match the request.
         *
         * @param path Path to handle (e.g., "/{index}/{type}/_bulk")
         * @param handler The handler to actually execute
         * @param method GET, POST, etc.
         */
        protected void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
            if (handler instanceof BaseRestHandler) {
                // 如果是 BaseRestHandler, 则添加到 usageService 中
                usageService.addRestHandler((BaseRestHandler) handler);
            }
            registerHandlerNoWrap(method, path, handlerWrapper.apply(handler));
        }
    
        private void registerHandlerNoWrap(RestRequest.Method method, String path, RestHandler maybeWrappedHandler) {
            // 此处 handlers = new PathTrie<>(RestUtils.REST_DECODER);
            handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method),
                (mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method));
        }
        
        // org.elasticsearch.usage.UsageService#addRestHandler
        /**
         * Add a REST handler to this service.
         *
         * @param handler the {