一、ElasticSearchUtils
- package com.wssnail.elasticsearch.util;
- import co.elastic.clients.elasticsearch.ElasticsearchClient;
- import co.elastic.clients.elasticsearch._types.FieldValue;
- import co.elastic.clients.elasticsearch._types.Refresh;
- import co.elastic.clients.elasticsearch._types.aggregations.*;
- import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
- import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
- import co.elastic.clients.elasticsearch._types.query_dsl.Query;
- import co.elastic.clients.elasticsearch._types.query_dsl.TermsQuery;
- import co.elastic.clients.elasticsearch.core.*;
- import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
- import co.elastic.clients.elasticsearch.core.search.Hit;
- import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
- import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
- import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
- import co.elastic.clients.elasticsearch.indices.ExistsRequest;
- import co.elastic.clients.transport.endpoints.BooleanResponse;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.wssnail.elasticsearch.constants.MetricConstant;
- import lombok.extern.slf4j.Slf4j;
- import java.io.IOException;
- import java.util.*;
- /**
- * @Author: 熟透的蜗牛
- * @CreateTime: 2023-01-05 16:47
- * @Description: TODO
- * @Version: 1.0
- */
- @Slf4j
- public class ElasticSearchUtils {
- private ElasticsearchClient elasticsearchClient;
- private static final Integer MAX_PAGE_SIZE = 1000;
- public ElasticSearchUtils(ElasticsearchClient elasticsearchClient) {
- this.elasticsearchClient = elasticsearchClient;
- }
- /*
- * @description:创建索引
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 16:58
- * @param index
- * @return: boolean
- **/
- public boolean createIndex(String index) {
- try {
- if (isIndexExist(index)) {
- log.error("Index is exits!");
- return false;
- }
- //1.创建索引请求
- CreateIndexResponse indexResponse = elasticsearchClient.indices().create(c -> c.index(index));
- return indexResponse.acknowledged();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>createIndex>>>id>>>>>>{}", e);
- }
- return false;
- }
- /*
- * @description:判断索引是否存在
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 16:59
- * @param index
- * @return: boolean
- **/
- public boolean isIndexExist(String index) {
- try {
- ElasticsearchIndicesClient indices = elasticsearchClient.indices();
- BooleanResponse response = indices.exists(ExistsRequest.of(x -> x.index(index)));
- return response.value();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>isIndexExist>>>id>>>>>>{}", e);
- }
- return false;
- }
- /*
- * @description:删除索引
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:00
- * @param null
- * @return: null
- **/
- public boolean deleteIndex(String index) {
- try {
- if (!isIndexExist(index)) {
- log.error("Index is not exits!");
- return false;
- }
- DeleteIndexResponse response = elasticsearchClient.indices().delete(x -> x.index(index));
- return response.acknowledged();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>deleteIndex>>>id>>>>>>{}", e);
- }
- return false;
- }
- /*
- * @description:更新数据,如果没有数据则新增
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:01
- * @param object 对象
- * @param index 索引名称
- * @param id 数据id
- * @return: java.lang.String
- **/
- public String submitDocument(Object object, String index, String id) {
- if (null == id) {
- return addDocument(object, index);
- }
- if (this.existsById(index, id)) {
- return this.updateDocumentByIdNoRealTime(object, index, id);
- } else {
- return addDocument(object, index, id);
- }
- }
- /*
- * @description:新增数据
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:03
- * @param object
- * @param index
- * @param id 数据id,为空时自动生成
- * @return: java.lang.String
- **/
- public String addDocument(Object object, String index, String id) {
- try {
- if (null == id) {
- return addDocument(object, index);
- }
- if (this.existsById(index, id)) {
- return this.updateDocumentByIdNoRealTime(object, index, id);
- }
- IndexResponse indexResponse = elasticsearchClient.index(IndexRequest.of(x -> x.id(id).document(object).index(index)));
- log.info("添加数据成功{}", indexResponse.result().jsonValue());
- return id;
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>addDocument>>>id>>>>>>{}", e);
- }
- return null;
- }
- /*
- * @description:新增数据,id用UUID生成
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:05
- * @param object
- * @param index
- * @return: java.lang.String
- **/
- public String addDocument(Object object, String index) {
- try {
- return addDocument(object, index, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>addDocument>>>>>>{}", e);
- }
- return null;
- }
- /*
- * @description:根据id判断文档是否存在
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:07
- * @param index
- * @param id
- * @return: boolean
- **/
- public boolean existsById(String index, String id) {
- try {
- BooleanResponse response = elasticsearchClient.existsSource(ExistsSourceRequest.of(builder -> builder.index(index).id(id)));
- return response.value();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>existsById>>>>>>{}", e);
- }
- return false;
- }
- /*
- * @description:通过id删除数据
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:08
- * @param index
- * @param id
- * @return: java.lang.String
- **/
- public String deleteDocumentById(String index, String id) {
- try {
- DeleteResponse delete = elasticsearchClient.delete(DeleteRequest.of(builder -> builder.id(id).index(index)));
- log.info("删除数据成功 >>>>>>>>id为{}", delete.id());
- return delete.id();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>deleteDocumentById>>>>>>{}", e);
- }
- return null;
- }
- /*
- * @description:通过id修改数据
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:09
- * @param null
- * @return: null
- **/
- public String updateDocumentById(Object object, String index, String id) {
- try {
- UpdateRequest<Object, Object> updateRequest = new UpdateRequest.Builder<>().doc(object).index(index).id(id).build();
- UpdateResponse<Object> updateResponse = elasticsearchClient.update(updateRequest, object.getClass());
- log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.id());
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>updateDocumentById>>>>>>{}", e);
- }
- return null;
- }
- /*
- * @description:通过id实时更新数据
- * @author: 熟透的蜗牛
- * @date: 2023/1/5 17:11
- * @param object
- * @param index
- * @param id
- * @return: java.lang.String
- **/
- public String updateDocumentByIdNoRealTime(Object object, String index, String id) {
- try {
- UpdateResponse<?> updateResponse = elasticsearchClient.update(x -> x.id(id).index(index).doc(object)
- .refresh(Refresh.WaitFor).timeout(t -> t.time("1s")), object.getClass());
- //执行更新请求
- log.info("索引为: {}, id为: {},updateResponseID:{}, 实时更新数据成功", index, id, updateResponse.id());
- return updateResponse.id();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>updateDocumentByIdNoRealTime>>>>>>{}", e);
- }
- return null;
- }
- /**
- * @param index
- * @param id
- * @param includeFields 指定返回字段
- * @param excludeFields 指定排除的字段
- * @description: 根据id查询指定数据
- * @return: java.util.Map
- * @author 熟透的蜗牛
- * @date: 2025/1/19 17:39
- */
- public Map searchDocumentById(String index, String id, List<String> includeFields, List<String> excludeFields) {
- try {
- GetResponse<Map> response = elasticsearchClient.get(builder -> builder.index(index).id(id).sourceIncludes(includeFields).sourceExcludes(excludeFields), Map.class);
- return response.source();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>查询失败searchDocumentById>>>>>>{}", e);
- }
- return new HashMap();
- }
- /**
- * @param index
- * @param objects
- * @description:批量插入,单词不能超20w
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/19 18:07
- */
- public boolean bulkPost(String index, List<Map> objects) {
- try {
- BulkRequest.Builder br = new BulkRequest.Builder();
- objects.forEach(list -> {
- br.operations(op -> op
- .index(idx -> idx.index(index).document(list.get("data")).id((String) list.get("id"))));
- });
- BulkResponse result = elasticsearchClient.bulk(br.build());
- if (result.errors()) {
- log.info(">>>>>>>>>>>>>>>批量插入有误");
- for (BulkResponseItem item : result.items()) {
- if (item.error() != null) {
- log.error(item.error().reason());
- }
- }
- return result.errors();
- }
- return true;
- } catch (Exception e) {
- log.error(">>>>>>异常信息{}", e);
- return false;
- }
- }
- /**
- * @param index
- * @param highlightField 需要高亮的字段
- * @param start
- * @param pageSize
- * @param query
- * @description: 高亮查询
- * @return: java.util.List<java.lang.Object>
- * @author 熟透的蜗牛
- * @date: 2025/1/19 20:26
- */
- public List<Hit<Map>> searchHighLightFieldDocument(String index,
- String[] highlightField, Integer start, Integer pageSize, Query... query) {
- List<Hit<Map>> hits = new ArrayList<>();
- try {
- SearchRequest.Builder searchBuilder = new SearchRequest.Builder().index(index);
- for (int i = 0; i < highlightField.length; i++) {
- int finalI = i;
- searchBuilder.highlight(builder -> builder.preTags("<span style='color:red'>")
- .postTags("</span>").fields(highlightField[finalI], h -> h));
- }
- for (Query q : query) {
- if (q._kind().equals(Query.Kind.Bool)) {
- searchBuilder.query(bb -> bb.bool(b -> b.must(q)));
- }
- if (q._kind().equals(Query.Kind.Range)) {
- q.range();
- }
- if (q._kind().equals(Query.Kind.Term)) {
- q.term();
- }
- if (q._kind().equals(Query.Kind.MultiMatch)) {
- q.multiMatch();
- }
- searchBuilder.query(q);
- }
- searchBuilder.size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize);
- searchBuilder.from((start - 1) * pageSize);
- SearchResponse searchResponse = elasticsearchClient.search(searchBuilder.build(), Map.class);
- log.info(">>>>>>>>>>>>>searchResponse{}", searchResponse);
- if (searchResponse.hits().total().value() > 0) {
- hits = searchResponse.hits().hits();
- }
- } catch (IOException e) {
- log.info(">>>>>>>>>>>>>>>searchListDocument>>>>>>{}", e);
- }
- return hits;
- }
- /**
- * @param index
- * @param field 需要查询的字段
- * @param value 查询字段的值
- * @param beanClass
- * @param start
- * @param pageSize
- * @description:按条件精确查询
- * @return: java.util.List<T>
- * @author 熟透的蜗牛
- * @date: 2025/1/21 1:11
- */
- public <T> List<T> termQuery(String index, String field, String value, Class<T> beanClass, Integer start, Integer pageSize) {
- List<T> list = new ArrayList<>();
- try {
- Query query = MatchQuery.of(m -> m
- .field(field)
- .query(value)
- )._toQuery();
- SearchResponse<T> response = elasticsearchClient.search(s -> s
- .index(index)
- .query(q -> q
- .bool(b -> b
- .must(query)
- )
- ).from((start - 1) * pageSize).size(pageSize),
- beanClass
- );
- this.queryEsData(beanClass, list, response);
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>termQuery>>>>>>{}", e);
- }
- return list;
- }
- /*
- * @description:自定义查询
- * @author: 熟透的蜗牛
- * @date: 2023/1/9 15:49
- * @param index
- * @param beanClass
- * @param query 查询条件
- * @return: java.util.List<T>
- **/
- public <T> List<T> selfDefine(String index, Class<T> beanClass, Query query) {
- List<T> list = new ArrayList<>();
- try {
- SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index).query(query), beanClass);
- this.queryEsData(beanClass, list, response);
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>termQuery>>>>>>{}", e);
- }
- return list;
- }
- /**
- * @param indexName
- * @param query
- * @description: 查询文档数量
- * @return: long
- * @author 熟透的蜗牛
- * @date: 2025/1/21 2:11
- */
- public <T> long count(String indexName, Query query) {
- try {
- CountResponse response = elasticsearchClient.count(q -> q.index(indexName).query(query));
- return response.count();
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>count>>>>>>{}", e);
- }
- return 0;
- }
- /*
- * @description:多值匹配
- * @author: 熟透的蜗牛
- * @date: 2023/1/9 14:39
- * @param index
- * @param field 字段
- * @param dataArgs 数值
- * @param beanClass 转化实体类
- * @param start
- * @param pageSize
- * @return: java.util.List<T>
- **/
- public <T> List<T> termsQuery(String index, String field, List<FieldValue> fieldValues, Class<T> beanClass, Integer start, Integer pageSize) {
- // 查询的数据列表
- List<T> list = new ArrayList<>();
- try {
- TermsQuery termsQuery = TermsQuery.of(terms -> terms.field(field).terms(value -> value.value(fieldValues)));
- SearchResponse<T> response = elasticsearchClient.search(builder ->
- builder.index(index).query(terms -> terms.terms(termsQuery))
- .from((start - 1) * pageSize)
- .size(pageSize)
- .trackTotalHits(f -> f.enabled(true))
- , beanClass);
- queryEsData(beanClass, list, response);
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>termsQuery>>>>>>{}", e);
- }
- return list;
- }
- /**
- * @param index
- * @param beanClass
- * @param start
- * @param pageSize
- * @description: match_all查询
- * @return: java.util.List<T>
- * @author 熟透的蜗牛
- * @date: 2025/1/22 0:10
- */
- public <T> List<T> matchAllQuery(String index, Class<T> beanClass, int start, int pageSize) {
- // 查询的数据列表
- List<T> list = new ArrayList<>();
- try {
- SearchResponse<T> searchResponse = elasticsearchClient.search(
- q -> q.index(index)
- .query(matchAll -> matchAll
- .matchAll(all -> all))
- .from((start - 1) * pageSize)
- .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
- .trackTotalHits(f -> f.enabled(true)),
- beanClass);
- queryEsData(beanClass, list, searchResponse);
- } catch (IOException e) {
- log.info(">>>>>>>>>>>>>>>matchAllQuery>>>>>>{}", e);
- }
- return list;
- }
- /**
- * @param index
- * @param beanClass
- * @param field
- * @param value
- * @description:词组查询
- * @return: java.util.List<T>
- * @author 熟透的蜗牛
- * @date: 2025/1/22 0:08
- */
- public <T> List<T> matchPhraseQuery(String index, Class<T> beanClass, String field, Object value, int start, int pageSize) {
- List<T> list = new ArrayList<>();
- try {
- SearchResponse<T> response = elasticsearchClient.search(q ->
- q.index(index).query(
- pp -> pp.matchPhrase(
- p -> p.field(field)
- .query(String.valueOf(value))))
- .from((start - 1) * pageSize)
- .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
- .trackTotalHits(f -> f.enabled(true)),
- beanClass);
- queryEsData(beanClass, list, response);
- } catch (IOException e) {
- log.info(">>>>>>>>>>>>>>>matchPhraseQuery>>>>>>{}", e);
- }
- return list;
- }
- /*
- * @description:内容在多字段查询
- * @author: 熟透的蜗牛
- * @date: 2023/1/9 15:27
- * @param index
- * @param beanClass
- * @param fields
- * @param text
- * @param start 起始位置
- * @param pageSize 页容量
- * @return: java.util.List<T>
- **/
- public <T> List<T> matchMultiQuery(String index, Class<T> beanClass, String[] fields, Object text, Integer start, Integer pageSize) {
- // 查询的数据列表
- List<T> list = new ArrayList<>();
- try {
- SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
- .query(multiMatch ->
- multiMatch.multiMatch(m ->
- m.fields(Arrays.asList(fields))
- .query(text.toString())))
- .from((start - 1) * pageSize)
- .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
- .trackTotalHits(f -> f.enabled(true)),
- beanClass);
- queryEsData(beanClass, list, response);
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>matchMultiQuery>>>>>>{}", e);
- }
- return list;
- }
- /*
- * @description: 通配符查询(wildcard):会对查询条件进行分词。还可以使用通配符 ?(任意单个字符) 和 * (0个或多个字符)
- * *:表示多个字符(0个或多个字符)
- * ?:表示单个字符
- * @author: 熟透的蜗牛
- * @date: 2023/1/6 15:02
- * @param index
- * @param beanClass
- * @param field
- * @param text
- * @return: java.util.List<T>
- **/
- public <T> List<T> wildcardQuery(String index, Class<T> beanClass, String field, String text, Integer start, Integer pageSize) {
- // 查询的数据列表
- List<T> list = new ArrayList<>();
- try {
- SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
- .query(wildcard -> wildcard.wildcard(w ->
- w.field(field)
- .value(text.toString())))
- .from((start - 1) * pageSize)
- .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
- .trackTotalHits(f -> f.enabled(true)),
- beanClass);
- queryEsData(beanClass, list, response);
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>wildcardQuery>>>>>>{}", e);
- }
- return list;
- }
- /**
- * @param index
- * @param beanClass
- * @param field
- * @param text
- * @param start
- * @param pageSize
- * @param prefixLength 指定匹配的前缀长度,前面多少个字符需要完全匹配
- * @param maxExpansion 限制查询扩展的最大数量。用于控制在查询中允许的最大变体数量
- * @description: 模糊匹配
- * @return: java.util.List<T>
- * @author 熟透的蜗牛
- * @date: 2025/1/22 1:02
- */
- public <T> List<T> fuzzyQuery(String index, Class<T> beanClass, String field, String text, Integer start,
- Integer pageSize, Integer prefixLength, Integer maxExpansion) {
- // 查询的数据列表
- List<T> list = new ArrayList<>();
- try {
- SearchResponse<T> response = elasticsearchClient.search(q -> q.index(index)
- .query(fuzzy ->
- fuzzy.fuzzy(f ->
- f.field(field)
- .value(text.toString())
- .fuzziness("auto")
- .prefixLength(prefixLength)
- .maxExpansions(maxExpansion)))
- .from((start - 1) * pageSize)
- .size(pageSize > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : pageSize)
- .trackTotalHits(f -> f.enabled(true)),
- beanClass);
- queryEsData(beanClass, list, response);
- } catch (IOException e) {
- log.info(">>>>>>>>>>>>>>>fuzzyQuery>>>>>>{}", e);
- }
- return list;
- }
- /*
- * @description:聚合查询
- * @author: 熟透的蜗牛
- * @date: 2023/1/6 15:34
- * @param index
- * @param type 常用的操作有:avg:求平均、max:最大值、min:最小值、sum:求和
- * @param field
- **/
- public Double metricQuery(String index, String type, String field) {
- try {
- Query query = MatchAllQuery.of(q -> q)._toQuery();
- SearchRequest.Builder searchBuilder = new SearchRequest.Builder().index(index).query(query);
- if (MetricConstant.MAX.equals(type)) {
- Aggregation max = AggregationBuilders.max(m -> m.field(field));
- searchBuilder.aggregations("max", max);
- }
- if (MetricConstant.MIN.equals(type)) {
- Aggregation min = AggregationBuilders.min(m -> m.field(field));
- searchBuilder.aggregations("min", min);
- }
- if (MetricConstant.AVG.equals(type)) {
- Aggregation avg = AggregationBuilders.avg(a -> a.field(field));
- searchBuilder.aggregations("avg", avg);
- }
- if (MetricConstant.SUM.equals(type)) {
- Aggregation sum = AggregationBuilders.sum(s -> s.field(field));
- searchBuilder.aggregations("sum", sum);
- }
- SearchResponse searchResponse = elasticsearchClient.search(searchBuilder.build(), Map.class);
- Map aggregations = searchResponse.aggregations();
- if (MetricConstant.MAX.equals(type)) {
- Aggregate maxAggregate = (Aggregate) aggregations.get(MetricConstant.MAX);
- MaxAggregate max = (MaxAggregate) maxAggregate._get();
- return max.value();
- }
- if (MetricConstant.MIN.equals(type)) {
- Aggregate minAggregate = (Aggregate) aggregations.get(MetricConstant.MIN);
- MinAggregate min = (MinAggregate) minAggregate._get();
- return min.value();
- }
- if (MetricConstant.AVG.equals(type)) {
- Aggregate avgAggregate = (Aggregate) aggregations.get(MetricConstant.AVG);
- AvgAggregate avg = (AvgAggregate) avgAggregate._get();
- return avg.value();
- }
- if (MetricConstant.SUM.equals(type)) {
- Aggregate sumAggregate = (Aggregate) aggregations.get(MetricConstant.SUM);
- SumAggregate sum = (SumAggregate) sumAggregate._get();
- return sum.value();
- }
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>metricQuery>>>>>>{}", e);
- }
- return null;
- }
- /**
- * @param index
- * @param bucketField 分组的字段,字段类型为keyword,bucketField分组别名
- * @description: 分组聚合查询,返回map集合
- * @return: java.util.List<java.util.HashMap < java.lang.String, java.lang.Object>>
- * @author 熟透的蜗牛
- * @date: 2025/1/22 14:42
- */
- public List<Map<String, Object>> bucketQuery(String index, String bucketField, String bucketFieldAlias) {
- List<Map<String, Object>> resultMapList = new ArrayList<>();
- try {
- SearchResponse<Void> response = elasticsearchClient.search(q -> q.index(index).query(query -> query.matchAll(all -> all))
- .aggregations(bucketFieldAlias, agg -> agg.terms(t -> t.field(bucketField))),
- Void.class);
- Map<String, Aggregate> aggregationsMap = response.aggregations();
- Aggregate aggregate = aggregationsMap.get(bucketFieldAlias);
- StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
- Buckets<StringTermsBucket> buckets = stringTermsAggregate.buckets();
- List<StringTermsBucket> list = (List) buckets._get();
- HashMap<String, Object> resulMap = new HashMap<>();
- int i = 0;
- for (StringTermsBucket st : list) {
- i++;
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(st.key()._toJsonString(), st.docCount());
- resulMap.put(bucketFieldAlias + i, jsonObject);
- resultMapList.add(resulMap);
- }
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>bucketQuery>>>>>>{}", e);
- }
- return resultMapList;
- }
- /**
- * @param index
- * @param bucketField
- * @param bucketFieldAlias 分组的别名
- * @param avgFiled
- * @param avgFiledAlias 平均值的别名
- * @description: 分组后的聚合
- * @author 熟透的蜗牛
- * @date: 2025/1/22 15:20
- */
- public List<Map<String, Object>> subBucketQuery(String index, String bucketField, String bucketFieldAlias, String avgFiled, String avgFiledAlias) {
- List<Map<String, Object>> resultMapList = new ArrayList<>();
- try {
- SearchResponse<Void> response = elasticsearchClient.search(q -> q.index(index).query(query -> query.matchAll(all -> all))
- .aggregations(bucketFieldAlias, agg -> agg.terms(t -> t.field(bucketField))
- .aggregations(avgFiledAlias, avg -> avg.avg(a -> a.field(avgFiled))))
- , Void.class);
- Map<String, Aggregate> aggregations = response.aggregations();
- log.info(">>>>>>>>>>>>aggregations{}", aggregations);
- Aggregate aggregate = aggregations.get(bucketFieldAlias);
- StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
- List<StringTermsBucket> list = (List<StringTermsBucket>) stringTermsAggregate.buckets()._get();
- int i = 0;
- for (StringTermsBucket st : list) {
- HashMap<String, Object> resultMap = new HashMap<>();
- JSONObject jsonObject = new JSONObject();
- Aggregate avgAggregate = st.aggregations().get(avgFiledAlias); //从聚合中取平均值
- AvgAggregate avg = (AvgAggregate) avgAggregate._get();
- jsonObject.put(st.key()._toJsonString(), st.docCount());
- jsonObject.put(avgFiledAlias, avg.value());
- resultMap.put(bucketFieldAlias + i, jsonObject);
- resultMapList.add(resultMap);
- }
- } catch (Exception e) {
- log.info(">>>>>>>>>>>>>>>subBucketQuery>>>>>>{}", e);
- }
- return resultMapList;
- }
- /**
- * @param beanClass
- * @param list
- * @param searchResponse
- * @description: 封装返回数据
- * @return: void
- * @author 熟透的蜗牛
- * @date: 2025/1/15 23:58
- */
- private <T> void queryEsData(Class<T> beanClass, List<T> list, SearchResponse<T> searchResponse)
- throws IOException {
- if (searchResponse.hits().total().value() > 0) {
- List<Hit<T>> hitList = searchResponse.hits().hits();
- for (Hit hit : hitList) {
- // 将 JSON 转换成对象
- T bean = JSON.parseObject(JSON.toJSONString(hit.source()), beanClass);
- list.add(bean);
- }
- }
- }
- }
复制代码 二、测试代码
- package com.snail.system.controller;
- import co.elastic.clients.elasticsearch._types.FieldValue;
- import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
- import co.elastic.clients.elasticsearch._types.query_dsl.MultiMatchQuery;
- import co.elastic.clients.elasticsearch._types.query_dsl.Query;
- import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
- import co.elastic.clients.elasticsearch.core.search.Hit;
- import co.elastic.clients.json.JsonData;
- import com.alibaba.fastjson2.JSON;
- import com.alibaba.fastjson2.JSONObject;
- import com.snail.manager.common.utils.ValidateCodeUtils;
- import com.snail.system.service.user.User;
- import com.wssnail.elasticsearch.util.ElasticSearchUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.CollectionUtils;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- import java.util.*;
- /**
- * @author 熟透的蜗牛
- * @version 1.0
- * @description: TODO
- * @date 2025/1/16 20:20
- */
- @RestController
- @Slf4j
- public class DemoController {
- @Autowired
- private ElasticSearchUtils elasticSearchUtils;
- @GetMapping("/demo")
- public String test(int type) {
- switch (type) {
- case 1:
- log.info("create......{}", elasticSearchUtils.createIndex("user_index"));
- break;
- case 2:
- log.info("delete......{}", elasticSearchUtils.deleteIndex("user_index"));
- break;
- case 3:
- log.info("isIndexExist......{}", elasticSearchUtils.isIndexExist("user_index"));
- break;
- case 4:
- for (int i = 0; i < 1000; i++) {
- User user = new User();
- user.setId("10000" + i);
- user.setName("tom" + i + ValidateCodeUtils.generateLetter(6));
- user.setAge((int) (Math.random() * 100));
- if (i < 50) {
- user.setAddress("北京市东城区1号路");
- user.setBirthday(new Date("1990/03/01 00:00:00"));
- }
- if (i > 50 && i < 100) {
- user.setAddress("北京市西城区88号路");
- user.setBirthday(new Date("1998/03/01 00:00:00"));
- }
- if (i > 100 && i < 200) {
- user.setAddress("北京市朝阳区青年路100号");
- user.setBirthday(new Date("2000/03/01 00:00:00"));
- }
- if (i > 200 && i < 500) {
- user.setAddress("天津市和平区青年路100号");
- user.setBirthday(new Date("2005/08/01 00:00:00"));
- }
- if (i > 500 && i < 800) {
- user.setAddress("天津市河西区青年路88号");
- }
- if (i > 800) {
- user.setAddress("石家庄市桥西区青年路100号");
- }
- String id = elasticSearchUtils.submitDocument(user, "user_index", user.getId());
- log.info("id:{}", id);
- }
- break;
- case 5:
- log.info("是否存在数据>>>>{}", elasticSearchUtils.existsById("user_index", "1000035"));
- break;
- case 6:
- log.info("根据地删除数据>>>>{}", elasticSearchUtils.deleteDocumentById("user_index", "1000036"));
- break;
- case 7:
- User user = new User("tom356666666", 20, "上海市闸北区霞飞路66号");
- log.info("根据id修改数据>>>>>{}", elasticSearchUtils.updateDocumentById(user, "user_index", "1000035"));
- break;
- case 8:
- log.info("时实修改数据>>>>>>>{}", elasticSearchUtils.updateDocumentByIdNoRealTime(User.builder().name("66666666").build(), "user_index", "1000037"));
- break;
- case 9:
- log.info("根据id查询数据>>>>>{}", JSON.to(User.class, elasticSearchUtils.searchDocumentById("user_index", "1000035", new ArrayList<>(), new ArrayList<>())));
- break;
- case 10:
- log.info("根据id查询数据返回指定字段>>>>>{}", JSON.to(User.class, elasticSearchUtils.searchDocumentById("user_index", "1000035", Arrays.asList("name", "address"), new ArrayList<>())));
- break;
- case 11:
- //批量插入
- ArrayList<Map> mapList = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- HashMap<Object, Object> userMap = new HashMap<>();
- User user1 = new User();
- user1.setId("1000000" + i);
- user1.setName(ValidateCodeUtils.generateLetter(10));
- user1.setAge((int) (Math.random() * 100));
- user1.setAddress("南京市淮河区夫子庙35号");
- user1.setBirthday(new Date("2010/10/01 00:00:00"));
- userMap.put("data", user1);
- userMap.put("id", user1.getId());
- mapList.add(userMap);
- }
- log.info(">>>>>>>>>>>>>>>批量插入结果{}", elasticSearchUtils.bulkPost("user_index", mapList));
- break;
- case 12:
- // Query termQuery = TermQuery.of(t -> t.field("address").value("北京"))._toQuery();
- // Query rangeQuery = RangeQuery.of(age -> age.field("age").gte(JsonData.of(60)))._toQuery();
- Query multiQuery = MultiMatchQuery.of(m -> m.fields("address").query("南京"))._toQuery();
- String[] highlightField = {"address"};
- List<Hit<Map>> hits = elasticSearchUtils.searchHighLightFieldDocument("user_index", highlightField, 1, 20, multiQuery);
- List<User> result = new ArrayList<>();
- for (Hit<Map> hit : hits) {
- Map source = hit.source();
- User userResult = JSONObject.parseObject(JSONObject.toJSONString(source), User.class);
- Map<String, List<String>> highlight = hit.highlight();
- userResult.setAddress(CollectionUtils.isEmpty(highlight.get("address")) ? userResult.getAddress() : highlight.get("address").get(0));
- userResult.setAge(CollectionUtils.isEmpty(highlight.get("age")) ? userResult.getAge() : Integer.valueOf(highlight.get("age").get(0)));
- result.add(userResult);
- }
- log.info(">>>>>>>>>>>>>>>>>数据结果{}", result);
- break;
- case 13:
- log.info(">>>>>>>>精确查询结果{}", elasticSearchUtils.termQuery("user_index", "address", "北京市东城区1号路", User.class, 1, 30));
- break;
- case 14:
- //自定义查询
- Query boolQuery = BoolQuery.of(b -> b.must(name -> name.term(n -> n.field("name.keyword").value("tom38ictwvq")))
- .must(m -> m.range(age -> age.field("age").gte(JsonData.of(60)).lte(JsonData.of(80))))
- .should(s -> s.term(address -> address.field("address").value("南京市淮河区夫子庙35号"))))._toQuery();
- log.info(">>>>>>>>>>自定义查询{}", elasticSearchUtils.selfDefine("user_index", User.class, boolQuery));
- break;
- case 15:
- //查询文档数量
- Query ageQuery = TermQuery.of(t -> t.field("age").value("63"))._toQuery();
- log.info(">>>>>>>>文档数量{}", elasticSearchUtils.count("user_index", ageQuery));
- break;
- case 16:
- List<FieldValue> list = new ArrayList<>();
- list.add(FieldValue.of("tom58dhmceA"));
- list.add(FieldValue.of("tom38ictwvq"));
- log.info(">>>>>>>多值查询{}", elasticSearchUtils.termsQuery("user_index", "name.keyword", list, User.class, 1, 20));
- break;
- case 17:
- log.info(">>>>>>>>>匹配所有{}", elasticSearchUtils.matchAllQuery("user_index", User.class, 1, 20));
- break;
- case 18:
- log.info(">>>>>>>>>phrase匹配{}", elasticSearchUtils.matchPhraseQuery("user_index", User.class, "address", "南京", 1, 20));
- break;
- case 19:
- log.info(">>>>>>>>>多字段匹配{}", elasticSearchUtils.matchMultiQuery("user_index", User.class, new String[]{"address", "age"}, "88", 1, 20));
- break;
- case 20:
- log.info("wildcardQuery>>>>>>{}", elasticSearchUtils.wildcardQuery("user_index", User.class, "name", "tom*", 1, 20));
- break;
- case 21:
- log.info("fuzzyQuery>>>>>>{}", elasticSearchUtils.fuzzyQuery("user_index", User.class, "name", "tom38ictvq", 1, 20, 5, 5));
- break;
- case 22:
- log.info("avg>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "avg", "age"));
- log.info("max>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "max", "age"));
- log.info("min>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "min", "age"));
- log.info("sum>>>>>>{}", elasticSearchUtils.metricQuery("user_index", "sum", "age"));
- break;
- case 23:
- log.info("聚合分组查询>>>>>>>{}", elasticSearchUtils.bucketQuery("user_index", "address.keyword", "address"));
- break;
- case 24:
- log.info(">>>>>>分组后的聚合查询>>{}", elasticSearchUtils.subBucketQuery("user_index", "address.keyword", "address", "age", "age_avg"));
- break;
- default:
- break;
- }
- return "success";
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |