当前位置 博文首页 > 等你归去来:ES系列(四):http请求分发框架解析
上一篇讲解了es的网络通信模块实现过程,大致明白其工作原理。再总结一下,就是基于netty编程范式,形成es通信基础。从而,最终我们得到几个重要的handler: Netty4HttpPipeliningHandler/Netty4HttpRequestHandler/Netty4MessageChannelHandler...
实际上,这种范式类的东西,没必要花太多精力去关注。因为这对于我们理解一个系统业务,可能不是那么重要。(话多了,实际上es中核心lucene难道不值得花精力关注?)但现在,我们可以进一步花精力,看看es都是如何处理http请求的,这样至少,对于之后的每个细节实现的研究是有前提基础的。
因为是处理http请求,自然是从网络入口开始。而因为使用netty, 则必然显现在一个handler上。即上篇中看到的 Netty4HttpRequestHandler ... 我们再来回顾下。
// org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#HttpChannelHandler protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) { this.transport = transport; this.handlingSettings = handlingSettings; this.byteBufSizer = new NettyByteBufSizer(); this.requestCreator = new Netty4HttpRequestCreator(); this.requestHandler = new Netty4HttpRequestHandler(transport); this.responseCreator = new Netty4HttpResponseCreator(); } // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel @Override protected void initChannel(Channel ch) throws Exception { Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch); // 此处 handler 配置的相当多, 自然是因其功能复杂的原因 ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); ch.pipeline().addLast("byte_buf_sizer", byteBufSizer); ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)); final HttpRequestDecoder decoder = new HttpRequestDecoder( handlingSettings.getMaxInitialLineLength(), handlingSettings.getMaxHeaderSize(), handlingSettings.getMaxChunkSize()); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor()); ch.pipeline().addLast("encoder", new HttpResponseEncoder()); final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline().addLast("aggregator", aggregator); if (handlingSettings.isCompression()) { ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); } ch.pipeline().addLast("request_creator", requestCreator); ch.pipeline().addLast("response_creator", responseCreator); // 最后两个处理器, pipelineing, handler, 则处理真正的业务 ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents)); // Netty4HttpRequestHandler ch.pipeline().addLast("handler", requestHandler); transport.serverAcceptedChannel(nettyHttpChannel); }
抛却 pipelineing 不说,就只剩下 handler 这个核心处理器了。而 handler 我们看到它是 Netty4HttpRequestHandler 的一个实例,而其构造方法中传入一个 transport, 这里面保存了相当多的上下文信息,包括配置,如何分发等等功能,细节无须多说。
但其中有一个 dispatcher 需要说下,因为这会影响到后续的请求如何处理的问题。
实际上,在node初始化时,就会创建一个 dispatcher, 是一个 RestController 实例,而这会在后其他组件进行注册时使用到,因为请求分发是由 RestController 控制的。它的初始化过程如下:
// org.elasticsearch.node.Node#Node protected Node(final Environment initialEnvironment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) { ... ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices); modules.add(actionModule); // restController 在 ActionModule 中初始化, 被 networkModule 使用 final RestController restController = actionModule.getRestController(); final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController, clusterService.getClusterSettings()); Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexTemplateMetadataUpgrader) .collect(Collectors.toList()); ... } // org.elasticsearch.action.ActionModule#ActionModule public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver, IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter, ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient, CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices) { this.transportClient = transportClient; this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexScopedSettings = indexScopedSettings; this.clusterSettings = clusterSettings; this.settingsFilter = settingsFilter; this.actionPlugins = actionPlugins; this.threadPool = threadPool; actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices); destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set<RestHeaderDefinition> headers = Stream.concat( actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false)) ).collect(Collectors.toSet()); UnaryOperator<RestHandler> restWrapper = null; for (ActionPlugin plugin : actionPlugins) { UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext()); if (newRestWrapper != null) { logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName()); if (restWrapper != null) { throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper"); } restWrapper = newRestWrapper; } } mappingRequestValidators = new RequestValidators<>( actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())); indicesAliasesRequestRequestValidators = new RequestValidators<>( actionPlugins.stream().flatMap(p -> p.indicesAliasesRequestValidators().stream()).collect(Collectors.toList())); if (transportClient) { restController = null; } else { // 直接实例化 restController restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService); } }
实例化好 dispatcher 后,需要接受其他组件的注册行为。(接受注册的有两个组件: RestController 和 Transport$RequestHandlers)
其中,RequestHandlers 的地址格式如: internal:transport/handshake, cluster:admin/snapshot/status[nodes], indices:admin/close[s][p] ; 而 RestController 的地址格式如: /{index}/_doc/{id}, /_nodes/{nodeId}/{metrics}, /_cluster/allocation/explain, /{index}/_alias/{name}
先来看看 RequestHandlers 的注册过程,这些注册更多的是用于集群内部通信使用:
// org.elasticsearch.transport.TransportService#registerRequestHandler /** * Registers a new request handler * * @param action The action the request handler is associated with * @param requestReader The request class that will be used to construct new instances for streaming * @param executor The executor the request handling will be executed on * @param forceExecution Force execution on the executor queue and never reject it * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. * @param handler The handler itself that implements the request handling */ public <Request extends TransportRequest> void registerRequestHandler(String action, String executor, boolean forceExecution, boolean canTripCircuitBreaker, Writeable.Reader<Request> requestReader, TransportRequestHandler<Request> handler) { validateActionName(action); handler = interceptor.interceptHandler(action, executor, forceExecution, handler); RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>( action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); transport.registerRequestHandler(reg); } // org.elasticsearch.transport.Transport#registerRequestHandler /** * Registers a new request handler */ default <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) { getRequestHandlers().registerHandler(reg); } synchronized <Request extends TransportRequest> void registerHandler(RequestHandlerRegistry<Request> reg) { if (requestHandlers.containsKey(reg.getAction())) { throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered"); } requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); } // 注册样例1 // org.elasticsearch.transport.TransportService#TransportService public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders, ConnectionManager connectionManager) { ... registerRequestHandler( HANDSHAKE_ACTION_NAME, ThreadPool.Names.SAME, false, false, HandshakeRequest::new, (request, channel, task) -> channel.sendResponse( new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName))); ... } // 注册样例2 // org.elasticsearch.repositories.VerifyNodeRepositoryAction#VerifyNodeRepositoryAction public VerifyNodeRepositoryAction(TransportService transportService, ClusterService clusterService, RepositoriesService repositoriesService) { this.transportService = transportService; this.clusterService = clusterService; this.repositoriesService = repositoriesService; transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SNAPSHOT, VerifyNodeRepositoryRequest::new, new VerifyNodeRepositoryRequestHandler()); } // 注册样例3 // org.elasticsearch.discovery.PeerFinder#PeerFinder public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.transportAddressConnector = transportAddressConnector; this.configuredHostsResolver = configuredHostsResolver; transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false, PeersRequest::new, (request, channel, task) -> channel.sendResponse(handlePeersRequest(request))); transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false, UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler()); }
整个 RequestHandlers 的注册,使用一个同步锁保证线程安全性,且使用一个类似写时复制的一个机制,保证读的安全性。然后,最终使用一个简单的 HashMap 包含handlers与path. 性能与安全性同时兼顾。
RestController 的注册则稍有不同,它主要用于处理客户端的请求如: /{index}/_doc/{id}
// org.elasticsearch.rest.RestController#registerHandler /** * Registers a REST handler with the controller. The REST handler declares the {@code method} * and {@code path} combinations. */ public void registerHandler(final RestHandler restHandler) { restHandler.routes().forEach(route -> registerHandler(route.getMethod(), route.getPath(), restHandler)); restHandler.deprecatedRoutes().forEach(route -> registerAsDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecationMessage())); restHandler.replacedRoutes().forEach(route -> registerWithDeprecatedHandler(route.getMethod(), route.getPath(), restHandler, route.getDeprecatedMethod(), route.getDeprecatedPath())); } /** * Registers a REST handler to be executed when one of the provided methods and path match the request. * * @param path Path to handle (e.g., "/{index}/{type}/_bulk") * @param handler The handler to actually execute * @param method GET, POST, etc. */ protected void registerHandler(RestRequest.Method method, String path, RestHandler handler) { if (handler instanceof BaseRestHandler) { // 如果是 BaseRestHandler, 则添加到 usageService 中 usageService.addRestHandler((BaseRestHandler) handler); } registerHandlerNoWrap(method, path, handlerWrapper.apply(handler)); } private void registerHandlerNoWrap(RestRequest.Method method, String path, RestHandler maybeWrappedHandler) { // 此处 handlers = new PathTrie<>(RestUtils.REST_DECODER); handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method), (mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method)); } // org.elasticsearch.usage.UsageService#addRestHandler /** * Add a REST handler to this service. * * @param handler the {