Elasticsearch Java High Level Client [7.17] 使用

打印 上一主题 下一主题

主题 977|帖子 977|积分 2935

es 的 HighLevelClient存在es源代码的引用,结合springboot使用时,会存在es版本的冲突,这里纪录下解决冲突和使用方式(es已经不发起使用这个了)。

留意es服务端的版本必要与client的版本对齐,否则返回数据大概会解析失败!!!

文档地址:Java High Level REST Client | Java REST Client [7.17] | Elastic
1、起首创建个java springboot项目
源码地址:https://github.com/a66245753/es-7-high-level-client.git
pom依赖文件如下,在 dependencyManagement 里解决springboot引起的版本冲突-
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.   <modelVersion>4.0.0</modelVersion>
  4.   <parent>
  5.     <groupId>org.springframework.boot</groupId>
  6.     <artifactId>spring-boot-starter-parent</artifactId>
  7.     <version>2.6.4</version>
  8.     <relativePath/> <!-- lookup parent from repository -->
  9.   </parent>
  10.   <groupId>com.david</groupId>
  11.   <artifactId>es-7-high-level-client</artifactId>
  12.   <version>1.0-SNAPSHOT</version>
  13.   <packaging>jar</packaging>
  14.   <name>es-7-high-level-client</name>
  15.   <properties>
  16.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  17.     <elastic.version>7.17.28</elastic.version>
  18.   </properties>
  19.   <dependencyManagement>
  20.     <dependencies>
  21.       <!--规定版本,解决springboot引起的版本冲突-->
  22.       <dependency>
  23.         <groupId>org.elasticsearch.client</groupId>
  24.         <artifactId>elasticsearch-rest-client</artifactId>
  25.         <version>${elastic.version}</version>
  26.       </dependency>
  27.       <dependency>
  28.         <groupId>org.elasticsearch.client</groupId>
  29.         <artifactId>elasticsearch-rest-high-level-client</artifactId>
  30.         <version>${elastic.version}</version>
  31.       </dependency>
  32.       <dependency>
  33.         <groupId>org.elasticsearch</groupId>
  34.         <artifactId>elasticsearch</artifactId>
  35.         <version>${elastic.version}</version>
  36.       </dependency>
  37.     </dependencies>
  38.   </dependencyManagement>
  39.   <dependencies>
  40.     <dependency>
  41.       <groupId>org.springframework.boot</groupId>
  42.       <artifactId>spring-boot-starter-web</artifactId>
  43.     </dependency>
  44.     <dependency>
  45.       <groupId>org.springframework.boot</groupId>
  46.       <artifactId>spring-boot-starter-actuator</artifactId>
  47.     </dependency>
  48.     <dependency>
  49.       <groupId>org.elasticsearch.client</groupId>
  50.       <artifactId>elasticsearch-rest-high-level-client</artifactId>
  51.     </dependency>
  52.     <dependency>
  53.       <groupId>com.baomidou</groupId>
  54.       <artifactId>mybatis-plus-boot-starter</artifactId>
  55.       <version>3.4.1</version>
  56.     </dependency>
  57.     <dependency>
  58.       <groupId>mysql</groupId>
  59.       <artifactId>mysql-connector-java</artifactId>
  60.       <scope>runtime</scope>
  61.     </dependency>
  62.     <!-- Lombok 依赖 -->
  63.     <dependency>
  64.       <groupId>org.projectlombok</groupId>
  65.       <artifactId>lombok</artifactId>
  66.       <version>1.18.22</version>
  67.       <scope>provided</scope>
  68.     </dependency>
  69.   </dependencies>
  70. </project>
复制代码
2、配置es client为java bean
  1. @Configuration
  2. public class EsClientConfig {
  3.     @Bean
  4.     public RestHighLevelClient restHighLevelClient()
  5.     {
  6.         return new RestHighLevelClient(
  7.                 RestClient.builder(
  8.                         new HttpHost("localhost", 9200, "http")
  9.                 )
  10.         );
  11.     }
  12. }
复制代码
2.1 es id查询数据
  1. /**
  2.      * id查询
  3.      *
  4.      * @return
  5.      * @throws IOException
  6.      */
  7.     @PostMapping("/get")
  8.     public ResponseEntity<Object> get() throws IOException {
  9.         GetRequest getRequest = new GetRequest("ssp_ad_union_log_202403");
  10.         getRequest.id("fcBZZZUBJ3krEJ13KbOG");
  11.         GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
  12.         return new ResponseEntity<>(response.getSourceAsMap(), HttpStatus.OK);
  13.     }
复制代码
2.2 es 多个id查询数据
  1. /**
  2.      * ids查询
  3.      *
  4.      * @return
  5.      * @throws IOException
  6.      */
  7.     @PostMapping("/getBatch")
  8.     public ResponseEntity<Object> getBatch() throws IOException {
  9.         MultiGetRequest multiGetRequest = new MultiGetRequest();
  10.         multiGetRequest.add("ssp_ad_union_log_202403","fcBZZZUBJ3krEJ13KbOG");
  11.         multiGetRequest.add("ssp_ad_union_log_202403","VsBZZZUBJ3krEJ13KbOG");
  12.         MultiGetResponse multiGetItemResponses = restHighLevelClient.multiGet(multiGetRequest, RequestOptions.DEFAULT);
  13.         return new ResponseEntity<>(multiGetItemResponses.getResponses(), HttpStatus.OK);
  14.     }
复制代码
2.3 search 分页
  1. /**
  2.      * 分页查询
  3.      *
  4.      * @param request
  5.      * @return
  6.      * @throws IOException
  7.      */
  8.     @PostMapping("/search")
  9.     public ResponseEntity<EsSearchResult> search(@RequestBody EsSearchRequest request) throws IOException {
  10.         SearchRequest searchRequest = new SearchRequest(request.getIndex());
  11.         // 构建搜索请求
  12.         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  13.         sourceBuilder.trackTotalHits(true);
  14.         sourceBuilder.from((request.getPageIndex() - 1) * request.getPageSize());
  15.         sourceBuilder.size(request.getPageSize());
  16.         searchRequest.source(sourceBuilder);
  17.         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  18.         if (request.getEqualsParams() != null && !request.getEqualsParams().isEmpty()) {
  19.             for (Map.Entry<String, Object> entry : request.getEqualsParams().entrySet()) {
  20.                 queryBuilder.filter(QueryBuilders.termQuery(entry.getKey(), entry.getValue()));
  21.             }
  22.         }
  23.         if (request.getLikeParams() != null && !request.getLikeParams().isEmpty()) {
  24.             for (Map.Entry<String, String> entry : request.getLikeParams().entrySet()) {
  25.                 queryBuilder.must(QueryBuilders.matchQuery(entry.getKey(), entry.getValue()));
  26.             }
  27.         }
  28.         if (request.getRangeParams() != null && !request.getRangeParams().isEmpty()) {
  29.             for (EsRangeParams rangeParam : request.getRangeParams()) {
  30.                 RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(rangeParam.getField());
  31.                 if (rangeParam.getGte() != null) {
  32.                     rangeQuery.gte(rangeParam.getGte());
  33.                 }
  34.                 if (rangeParam.getLte() != null) {
  35.                     rangeQuery.lte(rangeParam.getLte());
  36.                 }
  37.                 queryBuilder.filter(rangeQuery);
  38.             }
  39.         }
  40.         sourceBuilder.query(queryBuilder);
  41.         // 打印查询语句,可以放到kibana中执行并分析性能
  42.         System.out.println(searchRequest.source().toString());
  43.         // 执行搜索
  44.         SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  45.         EsSearchResult result = new EsSearchResult();
  46.         result.setPageIndex(request.getPageIndex());
  47.         result.setPageSize(request.getPageSize());
  48.         result.setTotal(searchResponse.getHits().getTotalHits().value);
  49.         result.setList(Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(Collectors.toList()));
  50.         return new ResponseEntity<>(result, HttpStatus.OK);
  51.     }
复制代码
2.4 aggs 聚合统计
  1. /**
  2.      * 聚合统计
  3.      *
  4.      * @param request
  5.      * @return
  6.      * @throws IOException
  7.      */
  8.     @PostMapping("/aggs")
  9.     public ResponseEntity<List<Object>> aggs(@RequestBody EsSearchRequest request) throws IOException {
  10.         // 创建 SearchSourceBuilder 实例
  11.         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  12.         // 设置 track_total_hits
  13.         searchSourceBuilder.trackTotalHits(true);
  14.         // 设置分页参数
  15.         searchSourceBuilder.from(0);
  16.         searchSourceBuilder.size(1);
  17.         // 构建 bool 查询
  18.         BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
  19.                 .filter(QueryBuilders.termQuery("provinceName", "浙江"));
  20.         // 添加查询到 SearchSourceBuilder
  21.         searchSourceBuilder.query(boolQuery);
  22.         // 构建聚合
  23.         TermsAggregationBuilder cityGroup = AggregationBuilders.terms("city_group")
  24.                 .field("cityName").size(10)
  25.                 .subAggregation(AggregationBuilders.terms("network_group").field("network").size(10))
  26.                 .subAggregation(AggregationBuilders.terms("phoneBrand_group").field("phoneBrandName").size(10))
  27.                 .subAggregation(AggregationBuilders.terms("sdkVersion_group").field("sdkVersion").size(10))
  28.                 .subAggregation(AggregationBuilders.terms("platform_group").field("platformName").size(10))
  29.                 .subAggregation(AggregationBuilders.terms("req_group").field("bizType").size(10))
  30.                 .subAggregation(
  31.                         AggregationBuilders.filter("ecpm_group", QueryBuilders.termQuery("bizType", 2))
  32.                                 .subAggregation(AggregationBuilders.avg("avg_ecpm").field("ecpm"))
  33.                 );
  34.         // 添加聚合到 SearchSourceBuilder
  35.         searchSourceBuilder.aggregation(cityGroup);
  36.         // 创建 SearchRequest 并指定索引名称
  37.         SearchRequest searchRequest = new SearchRequest("ssp_ad_union_log_202403");
  38.         searchRequest.source(searchSourceBuilder);
  39.         // 执行搜索请求
  40.         SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  41.         List<Object> result = new ArrayList<>(searchResponse.getAggregations().asList());
  42.         for (Aggregation aggregation : searchResponse.getAggregations().asList()) {
  43.             String json = aggregation.toString();
  44. //            String json = OBJECT_MAPPER.writeValueAsString(aggregation);
  45. //            Map<String, Object> map = new HashMap<>();
  46. //            BeanUtils.copyProperties(aggregation, map);
  47.             result.add(json);
  48.         }
  49.         return new ResponseEntity<>(result, HttpStatus.OK);
  50.     }
复制代码
2.5 新增单条数据
  1. /**
  2.      * 通过json新增
  3.      *
  4.      * @return
  5.      * @throws IOException
  6.      */
  7.     @RequestMapping(path = "add")
  8.     public ResponseEntity<IndexResponse> add() throws IOException {
  9.         IndexRequest indexRequest = new IndexRequest("ssp_ad_union_log_202403");
  10. //        indexRequest.id("1234567890");
  11.         indexRequest.opType(DocWriteRequest.OpType.CREATE);
  12.         indexRequest.source("{\n" +
  13.                 "          "id": 27731976,\n" +
  14.                 "          "reqId": "d63e0377-639a-4ac4-96a7-677d507e627e",\n" +
  15.                 "          "device": "android",\n" +
  16.                 "          "platform": 3,\n" +
  17.                 "          "platformName": "快手",\n" +
  18.                 "          "clientType": 1,\n" +
  19.                 "          "myAppId": "300001",\n" +
  20.                 "          "deviceId": "d63e0377-639a-4ac4-96a7-677d507e627e",\n" +
  21.                 "          "adSiteGroupId": 100000055,\n" +
  22.                 "          "adSiteId": "6827003034",\n" +
  23.                 "          "packagePath": "com.jihuomiao.app",\n" +
  24.                 "          "ecpm": 10700,\n" +
  25.                 "          "location": null,\n" +
  26.                 "          "ip": null,\n" +
  27.                 "          "cityId": 422800,\n" +
  28.                 "          "areaId": 422822,\n" +
  29.                 "          "cityName": "恩施",\n" +
  30.                 "          "areaName": "建始",\n" +
  31.                 "          "provinceId": 420000,\n" +
  32.                 "          "provinceName": "湖北",\n" +
  33.                 "          "phoneBrand": "OPPO",\n" +
  34.                 "          "phoneBrandName": "oppo",\n" +
  35.                 "          "phoneModel": null,\n" +
  36.                 "          "idfa": null,\n" +
  37.                 "          "bizType": 1,\n" +
  38.                 "          "sdkVersion": "1.0.2",\n" +
  39.                 "          "network": "5g",\n" +
  40.                 "          "logTime": "2025-03-03 15:31:10",\n" +
  41.                 "          "createdAt": "2025-03-03 15:31:10"\n" +
  42.                 "        }", XContentType.JSON);
  43.         IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  44.         return new ResponseEntity<>(indexResponse, HttpStatus.OK);
  45.     }
复制代码
2.6 bulk 批量新增数据
  1. @RequestMapping(path = "addBatch")
  2.     public ResponseEntity<BulkResponse> addBatch() throws IOException {
  3.         // 创建 BulkRequest
  4.         BulkRequest bulkRequest = new BulkRequest();
  5.         // 添加多个
  6.         bulkRequest.add(new IndexRequest("ssp_ad_union_log_202403")
  7. //                .id("12345678909")
  8.                 .source("{ "field1": "value1", "field2": "value2" }", XContentType.JSON));
  9.         bulkRequest.add(new IndexRequest("my_index")
  10. //                .id("12345678987654")
  11.                 .source("{ "field1": "value2", "field2": "value2" }", XContentType.JSON));
  12.         // 执行批量创建操作
  13.         BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  14.         // 检查是否有错误
  15.         if (bulkResponse.hasFailures()) {
  16.             System.err.println("Bulk operation had failures: " + bulkResponse.buildFailureMessage());
  17.         } else {
  18.             System.out.println("All documents created successfully.");
  19.         }
  20.         return new ResponseEntity<>(bulkResponse, HttpStatus.OK);
  21.     }
复制代码
es 7 high Level client 主要的集成问题还是在与springboot的版本冲突,这点解决掉就可以了,官方文档里都能找到使用说明

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表