当前位置 博文首页 > 等你归去来:ES系列(五):获取单条数据get处理过程实现

    等你归去来:ES系列(五):获取单条数据get处理过程实现

    作者:等你归去来 时间:2021-05-24 18:27

      前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。

     

    1. get语义说明

      get是用于搜索单条es的数据,是根据主键id查询数据方式。类比关系型数据库中的sql则相当于:

    select * from test where id = #{id};

      当然了,es中每个关键词,都有相当多的附加描述词汇。比如:指定输出字段,版本号。。。

     

    2. get的实现简要说明

      从语义上讲,get的结果至多只有一条记录。所以,虽然es是集群存储数据的,但此处都需要从某节点取得一条数据即可。所以,理论上,只要能够快速定位到数据在哪个es节点上,然后向其发起请求,即可获取到结果了。

      另外,对于使用主键id来进行查询数据,只要数据结构设计得当,应该会有非常高效的查询能力。

      所以,通过本功能的实现方式分析,我们可以简要理解es key的分布方式。

     

    3. get的具体实现

      get只是es语法中的一个小点,根据上一节我们分析,知道了如何可以找到处理get的处理器。此处,我们简单再回顾下:

        // org.elasticsearch.rest.RestController#dispatchRequest
        @Override
        public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
            try {
                // 尝试所有可能的处理器
                tryAllHandlers(request, channel, threadContext);
            } catch (Exception e) {
                try {
                    // 发生异常则响应异常信息
                    channel.sendResponse(new BytesRestResponse(channel, e));
                } catch (Exception inner) {
                    inner.addSuppressed(e);
                    logger.error(() ->
                        new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
                }
            }
        }
        // org.elasticsearch.rest.RestController#tryAllHandlers
        private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
            // 读取 header 信息
            for (final RestHeaderDefinition restHeader : headersToCopy) {
                final String name = restHeader.getName();
                final List<String> headerValues = request.getAllHeaderValues(name);
                if (headerValues != null && headerValues.isEmpty() == false) {
                    final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
                    if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
                        channel.sendResponse(
                            BytesRestResponse.
                                createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
                        return;
                    } else {
                        threadContext.putHeader(name, String.join(",", distinctHeaderValues));
                    }
                }
            }
            // error_trace cannot be used when we disable detailed errors
            // we consume the error_trace parameter first to ensure that it is always consumed
            if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
                channel.sendResponse(
                        BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
                return;
            }
    
            final String rawPath = request.rawPath();
            final String uri = request.uri();
            final RestRequest.Method requestMethod;
            try {
                // Resolves the HTTP method and fails if the method is invalid
                requestMethod = request.method();
                // Loop through all possible handlers, attempting to dispatch the request
                // 获取可能的处理器,主要是有正则或者索引变量的存在,可能匹配多个处理器
                Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);
                while (allHandlers.hasNext()) {
                    final RestHandler handler;
                    // 一个处理器里支持多种请求方法
                    final MethodHandlers handlers = allHandlers.next();
                    if (handlers == null) {
                        handler = null;
                    } else {
                        handler = handlers.getHandler(requestMethod);
                    }
                    if (handler == null) {
                      // 未找到处理器不代表不能处理,有可能需要继续查找,如果确定不能处理,则直接响应客户端返回
                      if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
                          return;
                      }
                    } else {
                        // 找到了处理器,调用其方法
                        dispatchRequest(request, channel, handler);
                        return;
                    }
                }
            } catch (final IllegalArgumentException e) {
                handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
                return;
            }
            // If request has not been handled, fallback to a bad request error.
            // 降级方法调用
            handleBadRequest(uri, requestMethod, channel);
        }
            // org.elasticsearch.rest.RestController#dispatchRequest
        private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
            final int contentLength = request.contentLength();
            if (contentLength > 0) {
                final XContentType xContentType = request.getXContentType();
                if (xContentType == null) {
                    sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
                    return;
                }
                if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
                    channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,
                        "Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"));
                    return;
                }
            }
            RestChannel responseChannel = channel;
            try {
                // 熔断判定
                if (handler.canTripCircuitBreaker()) {
                    inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
                } else {
                    inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
                }
                // iff we could reserve bytes for the request we need to send the response also over this channel
                responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
                // TODO: Count requests double in the circuit breaker if they need copying?
                if (handler.allowsUnsafeBuffers() == false) {
                    request.ensureSafeBuffers();
                }
                if (handler.allowSystemIndexAccessByDefault() == false && request.header(ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER) == null) {
                    // The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product with a plan
                    // to move away from direct access to system indices, and thus deprecation warnings should not be emitted.
                    // This header is intended for internal use only.
                    client.threadPool().getThreadContext().putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString());
                }
                // 调用handler处理方法,该handler可能会被过滤器先执行
                handler.handleRequest(request, responseChannel, client);
            } catch (Exception e) {
                responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
            }
        }
        // org.elasticsearch.xpack.security.rest.SecurityRestFilter#handleRequest
        @Override
        public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {
                // CORS - allow for preflight unauthenticated OPTIONS request
                if (extractClientCertificate) {
                    HttpChannel httpChannel = request.getHttpChannel();
                    SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);
                }
    
                final String requestUri = request.uri();
                authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(
                    authentication -> {
                        if (authentication == null) {
                            logger.trace("No authentication available for REST request [{}]", requestUri);
                        } else {
                            logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication);
                        }
                        secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(
                            secondaryAuthentication -> {
                                if (secondaryAuthentication != null) {
                                    logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri);
                                }
                                RemoteHostHeader.process(request, threadContext);
                                restHandler.handleRequest(request, channel, client);
                            },
                            e -> handleException("Secondary authentication", request, channel, e)));
                    }, e -> handleException("Authentication", request, channel, e)));
            } else {
                // 转发到下一处理器责任链
                restHandler.handleRequest(request, channel, client);
            }
        }
    View Code

     

      一般地,很多具体的处理器都会继承 BaseRestHandler, 即 handleRequest() 整体调用时序图如下:

    handleRequest() 整体调用时序图

     

     

     

      具体代码实现如下:

        // org.elasticsearch.rest.BaseRestHandler#handleRequest
        @Override
        public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
            // prepare the request for execution; has the side effect of touching the request parameters
            // 看起来叫准备请求,实际非常重要,它会组装后续的请求逻辑
            final RestChannelConsumer action = prepareRequest(request, client);
    
            // validate unconsumed params, but we must exclude params used to format the response
            // use a sorted set so the unconsumed parameters appear in a reliable sorted order
            final SortedSet<String> unconsumedParams =
                request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
    
            // validate the non-response params
            if (!unconsumedParams.isEmpty()) {
                final Set<String> candidateParams = new HashSet<>();
                candidateParams.addAll(request.consumedParams());
                candidateParams.addAll(responseParams());
                throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
            }
    
            if (request.hasContent() && request.isContentConsumed() == false) {
                throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
            }
    
            usageCount.increment();
            // execute the action
            // 即此处仅为调用前面设置好的方法
            action.accept(channel);
        }

      而处理get的处理器,我们实际上可以通过在最初注册的时候,可以看到是 RestGetAction, 它实现的 prepareRequest() 体现了其处理方法。

        // org.elasticsearch.rest.action.document.RestGetAction#prepareRequest
        @Override
        public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
            GetRequest getRequest;
            if (request.hasParam("type")) {
                deprecationLogger.deprecate("get_with_types", TYPES_DEPRECATION_MESSAGE);
                getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
            } else {
                getRequest = new GetRequest(request.param("index"), request.param("id"));
            }
    
            getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
            getRequest.routing(request.param("routing"));
            getRequest.preference(request.param("preference"));
            getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
            if (request.param("fields") != null) {
                throw new IllegalArgumentException("the parameter [fields] is no longer supported, " +
                    "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source");
            }
            final String fieldsParam = request.param("stored_fields");
            if (fieldsParam != null) {
                final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
                if (fields != null) {
                    getRequest.storedFields(fields);
                }
            }
    
            getRequest.version(RestActions.parseVersion(request));
            getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));
    
            getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));
            // 封装具体业务处理方法
            // 交由 NodeClient 处理
            return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
                @Override
                protected RestStatus getStatus(final GetResponse response) {
                    return response.isExists() ? OK : NOT_FOUND;
                }
            });
        }
        // org.elasticsearch.search.fetch.subphase.FetchSourceContext#parseFromRestRequest
        public static FetchSourceContext parseFromRestRequest(RestRequest request) {
            Boolean fetchSource = null;
            String[] sourceExcludes = null;
            String[] sourceIncludes = null;
    
            String source = request.param("_source");
            if (source != null) {
                if (Booleans.isTrue(source)) {
                    fetchSource = true;
                } else if (Booleans.isFalse(source)) {
                    fetchSource = false;
                } 
    
    下一篇:没有了