当前位置 博文首页 > 等你归去来:ES系列(五):获取单条数据get处理过程实现
前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。
get是用于搜索单条es的数据,是根据主键id查询数据方式。类比关系型数据库中的sql则相当于:
select * from test where id = #{id};
当然了,es中每个关键词,都有相当多的附加描述词汇。比如:指定输出字段,版本号。。。
从语义上讲,get的结果至多只有一条记录。所以,虽然es是集群存储数据的,但此处都需要从某节点取得一条数据即可。所以,理论上,只要能够快速定位到数据在哪个es节点上,然后向其发起请求,即可获取到结果了。
另外,对于使用主键id来进行查询数据,只要数据结构设计得当,应该会有非常高效的查询能力。
所以,通过本功能的实现方式分析,我们可以简要理解es key的分布方式。
get只是es语法中的一个小点,根据上一节我们分析,知道了如何可以找到处理get的处理器。此处,我们简单再回顾下:
// org.elasticsearch.rest.RestController#dispatchRequest @Override public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { try { // 尝试所有可能的处理器 tryAllHandlers(request, channel, threadContext); } catch (Exception e) { try { // 发生异常则响应异常信息 channel.sendResponse(new BytesRestResponse(channel, e)); } catch (Exception inner) { inner.addSuppressed(e); logger.error(() -> new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner); } } } // org.elasticsearch.rest.RestController#tryAllHandlers private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception { // 读取 header 信息 for (final RestHeaderDefinition restHeader : headersToCopy) { final String name = restHeader.getName(); final List<String> headerValues = request.getAllHeaderValues(name); if (headerValues != null && headerValues.isEmpty() == false) { final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList()); if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) { channel.sendResponse( BytesRestResponse. createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "].")); return; } else { threadContext.putHeader(name, String.join(",", distinctHeaderValues)); } } } // error_trace cannot be used when we disable detailed errors // we consume the error_trace parameter first to ensure that it is always consumed if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) { channel.sendResponse( BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled.")); return; } final String rawPath = request.rawPath(); final String uri = request.uri(); final RestRequest.Method requestMethod; try { // Resolves the HTTP method and fails if the method is invalid requestMethod = request.method(); // Loop through all possible handlers, attempting to dispatch the request // 获取可能的处理器,主要是有正则或者索引变量的存在,可能匹配多个处理器 Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath); while (allHandlers.hasNext()) { final RestHandler handler; // 一个处理器里支持多种请求方法 final MethodHandlers handlers = allHandlers.next(); if (handlers == null) { handler = null; } else { handler = handlers.getHandler(requestMethod); } if (handler == null) { // 未找到处理器不代表不能处理,有可能需要继续查找,如果确定不能处理,则直接响应客户端返回 if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) { return; } } else { // 找到了处理器,调用其方法 dispatchRequest(request, channel, handler); return; } } } catch (final IllegalArgumentException e) { handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e); return; } // If request has not been handled, fallback to a bad request error. // 降级方法调用 handleBadRequest(uri, requestMethod, channel); } // org.elasticsearch.rest.RestController#dispatchRequest private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception { final int contentLength = request.contentLength(); if (contentLength > 0) { final XContentType xContentType = request.getXContentType(); if (xContentType == null) { sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel); return; } if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) { channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE, "Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead")); return; } } RestChannel responseChannel = channel; try { // 熔断判定 if (handler.canTripCircuitBreaker()) { inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>"); } else { inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); } // iff we could reserve bytes for the request we need to send the response also over this channel responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); // TODO: Count requests double in the circuit breaker if they need copying? if (handler.allowsUnsafeBuffers() == false) { request.ensureSafeBuffers(); } if (handler.allowSystemIndexAccessByDefault() == false && request.header(ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER) == null) { // The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product with a plan // to move away from direct access to system indices, and thus deprecation warnings should not be emitted. // This header is intended for internal use only. client.threadPool().getThreadContext().putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString()); } // 调用handler处理方法,该handler可能会被过滤器先执行 handler.handleRequest(request, responseChannel, client); } catch (Exception e) { responseChannel.sendResponse(new BytesRestResponse(responseChannel, e)); } } // org.elasticsearch.xpack.security.rest.SecurityRestFilter#handleRequest @Override public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) { // CORS - allow for preflight unauthenticated OPTIONS request if (extractClientCertificate) { HttpChannel httpChannel = request.getHttpChannel(); SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel); } final String requestUri = request.uri(); authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap( authentication -> { if (authentication == null) { logger.trace("No authentication available for REST request [{}]", requestUri); } else { logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication); } secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap( secondaryAuthentication -> { if (secondaryAuthentication != null) { logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri); } RemoteHostHeader.process(request, threadContext); restHandler.handleRequest(request, channel, client); }, e -> handleException("Secondary authentication", request, channel, e))); }, e -> handleException("Authentication", request, channel, e))); } else { // 转发到下一处理器责任链 restHandler.handleRequest(request, channel, client); } }
一般地,很多具体的处理器都会继承 BaseRestHandler, 即 handleRequest() 整体调用时序图如下:
具体代码实现如下:
// org.elasticsearch.rest.BaseRestHandler#handleRequest @Override public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { // prepare the request for execution; has the side effect of touching the request parameters // 看起来叫准备请求,实际非常重要,它会组装后续的请求逻辑 final RestChannelConsumer action = prepareRequest(request, client); // validate unconsumed params, but we must exclude params used to format the response // use a sorted set so the unconsumed parameters appear in a reliable sorted order final SortedSet<String> unconsumedParams = request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new)); // validate the non-response params if (!unconsumedParams.isEmpty()) { final Set<String> candidateParams = new HashSet<>(); candidateParams.addAll(request.consumedParams()); candidateParams.addAll(responseParams()); throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); } if (request.hasContent() && request.isContentConsumed() == false) { throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body"); } usageCount.increment(); // execute the action // 即此处仅为调用前面设置好的方法 action.accept(channel); }
而处理get的处理器,我们实际上可以通过在最初注册的时候,可以看到是 RestGetAction, 它实现的 prepareRequest() 体现了其处理方法。
// org.elasticsearch.rest.action.document.RestGetAction#prepareRequest @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { GetRequest getRequest; if (request.hasParam("type")) { deprecationLogger.deprecate("get_with_types", TYPES_DEPRECATION_MESSAGE); getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id")); } else { getRequest = new GetRequest(request.param("index"), request.param("id")); } getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); getRequest.routing(request.param("routing")); getRequest.preference(request.param("preference")); getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime())); if (request.param("fields") != null) { throw new IllegalArgumentException("the parameter [fields] is no longer supported, " + "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source"); } final String fieldsParam = request.param("stored_fields"); if (fieldsParam != null) { final String[] fields = Strings.splitStringByCommaToArray(fieldsParam); if (fields != null) { getRequest.storedFields(fields); } } getRequest.version(RestActions.parseVersion(request)); getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType())); getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request)); // 封装具体业务处理方法 // 交由 NodeClient 处理 return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) { @Override protected RestStatus getStatus(final GetResponse response) { return response.isExists() ? OK : NOT_FOUND; } }); } // org.elasticsearch.search.fetch.subphase.FetchSourceContext#parseFromRestRequest public static FetchSourceContext parseFromRestRequest(RestRequest request) { Boolean fetchSource = null; String[] sourceExcludes = null; String[] sourceIncludes = null; String source = request.param("_source"); if (source != null) { if (Booleans.isTrue(source)) { fetchSource = true; } else if (Booleans.isFalse(source)) { fetchSource = false; }