当前位置 博文首页 > 闫玉林的博客:使用Elasticsearch聚合搜索进行数据的分类统计

    闫玉林的博客:使用Elasticsearch聚合搜索进行数据的分类统计

    作者:[db:作者] 时间:2021-08-17 18:55

    HTTP请求

    • 单个条件聚合,根据某一个维度去聚合统计
    GET /zwfw/_search
    {
      "size": 0, 
     "aggregations" : {
        "organDn.keyword_arr" : {
          "terms" : {
            "field" : "organDn.keyword",
            "size" : 100,
            "min_doc_count" : 1,
            "shard_min_doc_count" : 0,
            "show_term_doc_count_error" : false,
            "order" : [
              {
                "_count" : "desc"
              },
              {
                "_term" : "asc"
              }
            ]
          }
        }
      }
    }
    
    • 子聚合,先根据某一个维度聚合,再在这个维度里,根据另一个维度聚合
    GET /zwfw/_search
    {
      "size" : 0,
      "aggregations" : {
        "platformCode.keyword_arr" : {
          "terms" : {
            "field" : "platformCode.keyword",
            "size" : 20,
            "min_doc_count" : 1,
            "shard_min_doc_count" : 0,
            "show_term_doc_count_error" : false,
            "order" : [
              {
                "_count" : "desc"
              },
              {
                "_term" : "asc"
              }
            ]
          },
          "aggregations" : {
            "siteId_arr" : {
              "terms" : {
                "field" : "siteId",
                "size" : 100,
                "min_doc_count" : 1,
                "shard_min_doc_count" : 0,
                "show_term_doc_count_error" : false,
                "order" : [
                  {
                    "_count" : "desc"
                  },
                  {
                    "_term" : "asc"
                  }
                ]
              }
            }
          }
        }
      }
    }
    
    

    java连接客户端请求

     @Override
        public Map<String, Object> aggregateByField(EsQueryVO queryVO) {
            Map<String, Object> result = new HashedMap();
    
            SearchRequestBuilder builder = esQueryBuild(queryVO);
            TermsAggregationBuilder aggregation = AggregationBuilders.terms(queryVO.getAggregateKey()+"_arr")
                    .field(queryVO.getAggregateKey());
            if(queryVO.getAggregateOrder() != null){
                aggregation.order(Terms.Order.count(queryVO.getAggregateOrder()));
            }
            if(null != queryVO.getAggregateSize()){
                aggregation.size(queryVO.getAggregateSize());
            }else {
                aggregation.size(2000);
            }
            builder = builder.addAggregation(aggregation).setSize(0);
            logger.info("builder:\n{}",builder);
            SearchResponse response = builder.get();
            if(response != null){
                String bucketKey = queryVO.getAggregateKey()+"_arr";
                Terms terms = response.getAggregations().get(bucketKey);
                Map<String,Long> staticMap = new LinkedHashMap();
                for (Terms.Bucket entry1 : terms.getBuckets()) {
                    String key = entry1.getKey().toString();          // bucket key
                    Long docCount = entry1.getDocCount();            // Doc count
                    staticMap.put(key,docCount);
                }
                result.put(bucketKey,staticMap);
            }
    
            return result;
        }
    
        @Override
        public Map<String, Long> aggregateByField(EsQueryVO queryVO, String subField) {
            Map<String, Long> result = new HashedMap();
    
            //如果包含子查询
            if(!StringUtils.isEmpty(subField)){
                SearchRequestBuilder builder = esQueryBuild(queryVO);
                TermsAggregationBuilder aggregation = AggregationBuilders.terms(queryVO.getAggregateKey()+"_arr")
                        .field(queryVO.getAggregateKey()).size(20);
                aggregation = aggregation.subAggregation(
                        AggregationBuilders.terms(subField+"_arr").field(subField).size(2000));
                if(queryVO.getAggregateOrder() != null){
                    aggregation.order(Terms.Order.count(queryVO.getAggregateOrder()));
                }
                if(null != queryVO.getAggregateSize()){
                    aggregation.size(queryVO.getAggregateSize());
                }else {
                    aggregation.size(2000);
                }
                builder = builder.addAggregation(aggregation).setSize(0);
                logger.info("builder:\n{}",builder);
                SearchResponse response = builder.get();
                if(response != null){
                    String bucketKey = queryVO.getAggregateKey()+"_arr";
                    Terms terms = response.getAggregations().get(bucketKey);
                    for (Terms.Bucket entry1 : terms.getBuckets()) {
                        String key = entry1.getKey().toString();          // bucket key
                        Aggregations subAgg = entry1.getAggregations();
                        if(subAgg.asList().size() > 0){
                            Terms terms2 = subAgg.get(subField+"_arr");
                            for (Terms.Bucket entry2 : terms2.getBuckets()) {
                                String key2 = entry2.getKey().toString();          // bucket key
                                Long docCount2 = entry2.getDocCount();            // Doc count
                                result.put(key + "_" + key2,docCount2);
                            }
                        }
                    }
                }
    
                return result;
            }else {
                Map<String, Object> aggs = aggregateByField(queryVO);
                if(aggs != null && aggs.size() > 0){
                    return (Map<String, Long>)aggs.get(queryVO.getAggregateKey()+"_arr");
                }
                return null;
            }
        }
    
    • 核心是构造需要聚合的维度对象TermsAggregationBuilder,然后再使用addAggregation加到搜索后
    TermsAggregationBuilder aggregation = AggregationBuilders.terms(searchQueryVO.getAggregateKey()+"_arr")
                    .field(searchQueryVO.getAggregateKey());
    aggregation.size(2000);
    
    • 得到的结果需要处理下,才方便使用。
    			Terms terms = response.getAggregations().get(bucketKey);
                Map<String,Long> staticMap = new LinkedHashMap();
                for (Terms.Bucket entry1 : terms.getBuckets()) {
                    String key = entry1.getKey().toString();          // bucket key
                    Long docCount = entry1.getDocCount();            // Doc count
                    staticMap.put(key,docCount);
                    logger.info("[{}]key [{}], doc_count [{}]",bucketKey, key, docCount);
                }
                result.put(bucketKey,staticMap);
    
    cs