前言
上文介绍了ES的各种查询,本文介绍如何在ES进行MySQL中的分组和聚合操作,并实现MySQL和ES之间的数据自动同步;
一、分组聚合
在ES中对于聚合查询,主要分为2大类:指标(Metric)聚合 与 桶(Bucket)聚合。
- 指标聚合:max、min、sum等,作用等同于Mysql中的相关聚合函数。
- 桶聚合:group by,作用等同于Mysql中根据哪1个字段进行分组
注意,我们不能对text类型的字段进行分组,因为text会进行分词,导致无法进行分组。
1.指标聚合(聚合函数)
指标聚合相当于MySQL中聚合函数,统计品牌为万豪的最贵酒店价格- GET /hotel/_search
- {
- "query": {
- "term": {
- "brand": {
- "value": "万豪"
- }
- }
- },
- "size": 0,
- "aggs": {
- "最贵的": {
- "max": {
- "field": "price"
- }
- },
- "最便宜的": {
- "min": {
- "field": "price"
- }
- }
- }
- }
复制代码
2.桶聚合(分组)
桶聚合相当于MySQL中的分组,统计品牌为万豪的酒店有哪些星级;- GET /hotel/_search
- {
- "size": 0,
- "query": {
- "term": {
- "brand": {
- "value": "万豪"
- }
- }
- },
- "aggs": {
- "按星级名称分组": {
- "terms": {
- "field": "specs",
- "size": 20
- }
- }
- }
-
- }
复制代码 对数据库中所有数据,按照星级和品牌分组;- GET /hotel/_search
- {
- "size": 0,
- "aggs": {
- "按品牌分组": {
- "terms": {
- "field": "brand",
- "size": 20
- }
- },
- "按星级分组": {
- "terms": {
- "field": "specs",
- "size": 20
- }
- }
- }
- }
复制代码 3.总结
在ES中1次请求,可以写多个聚合函数;
4.功能实现
根据搜索条件筛选之后,再根据品牌进行分组;
4.1.Kibana查询
根据搜索条件对品牌进行数据分组- GET hotel/_search
- {
- "size": 0,
- "query": {
- "query_string": {
- "fields": ["name","synopsis","area","address"],
- "query": "三亚 OR 商务"
- }
- },
- "aggs": {
- "hotel_brands": {
- "terms": {
- "field": "brand",
- "size": 100
- }
- }
- }
- }
复制代码 4.2.JavaAPI查询
- @Override
- public Map<String, Object> searchBrandGroupQuery(Integer current, Integer size, Map<String, Object> searchParam) {
- //设置查询请求头
- SearchRequest searchRequest = new SearchRequest("hotel");
- //设置查询请求体
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- //设置查询方式
- if (!StringUtils.isEmpty(searchParam.get("condition"))) {
- QueryBuilder queryBuilder = QueryBuilders.queryStringQuery(searchParam.get("condition").toString())
- .field("name")
- .field("synopsis")
- .field("area")
- .field("address")
- .defaultOperator(Operator.OR);
- searchSourceBuilder.query(queryBuilder);
- }
- //设置按品牌分组
- AggregationBuilder aggregationBuilder = AggregationBuilders.terms("brand_groups")
- .size(200)
- .field("brand");
- searchSourceBuilder.aggregation(aggregationBuilder);
- //设置分页
- searchSourceBuilder.from((current - 1) * size);
- searchSourceBuilder.size(size);
- searchRequest.source(searchSourceBuilder);
- try {
- SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
- SearchHits hits = searchResponse.getHits();
- long totalHits = hits.getTotalHits().value;
- ArrayList<String> groupNameList = new ArrayList<>();
- //获取并处理聚合查询结果
- Terms brandGroups = searchResponse.getAggregations().get("brand_groups");
- for (Terms.Bucket bucket : brandGroups.getBuckets()) {
- String key = (String) bucket.getKey();
- groupNameList.add(key);
- }
- Map<String, Object> map = new HashMap<>();
- // map.put("list", list);
- map.put("totalResultSize", totalHits);
- map.put("current", current);
- //设置总页数
- map.put("totalPage", (totalHits + size - 1) / size);
- //设置品牌分组列表
- map.put("brandList", groupNameList);
- return map;
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
复制代码 HotelServiceImpl.java
5.分组和聚合一起使用
通常情况我们统计数据时,会先进行分组,然后再在分组的基础上进行聚合操作;
根据用户输入的日期,统计某品牌下所有酒店销量。 对于该功能的实现,需要进行多层聚合。
- 对日期时间段范围查询
- 根据品牌进行分组查询
- 对分组查询结果进行sum聚合
5.1.Kibana查询
在桶聚合中嵌套指标聚合,就等于MySQL会先进行分组操作,然后再在数据分组的基础上,进行聚合函数统计;- GET hotel/_search
- {
- "size": 0,
- "query": {
- "range": {
- "createTime": {
- "gte": "2015-01-01",
- "lte": "2015-12-31"
- }
- }
- },
- "aggs": {
- "根据品牌分组": {
- "terms": {
- "field": "brand",
- "size": 100
- },
- "aggs": {
- "该品牌总销量": {
- "sum": {
- "field": "salesVolume"
- }
- },
- "该品牌销量平均值": {
- "avg": {
- "field": "salesVolume"
- }
- }
- }
- }
- }
- }
复制代码 5.2.JavaAPI查询
[code]public List searchDateHistogram(Map searchParam) { //定义结果集 List result = new ArrayList(); //设置查询 SearchRequest searchRequest = new SearchRequest("hotel"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //todo 自定义日期时间段范围查询 RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery("createTime") .gte(searchParam.get("minTime")) .lte(searchParam.get("maxTime")) .format("yyyy-MM-dd"); searchSourceBuilder.query(queryBuilder); //todo 聚合查询设置 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("hotel_brand").field("brand").size(100); //构建二级聚合 SumAggregationBuilder secondAggregation = AggregationBuilders.sum("hotel_salesVolume").field("salesVolume"); aggregationBuilder.subAggregation(secondAggregation); searchSourceBuilder.aggregation(aggregationBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //todo 获取聚合结果并处理 Aggregations aggregations = searchResponse.getAggregations(); Map aggregationMap = aggregations.asMap(); Terms terms = (Terms) aggregationMap.get("hotel_brand"); List |