Quickwit获取Kafka数据源消息

打印 上一主题 下一主题

主题 887|帖子 887|积分 2661

先容

Quickwit可以将数据从一个或多个源插入到索引中。创建索引后,可以利用CLI 命令quickwit source create添加源,支持的源有:file、kafka、kinesis、pulsar。
本章解说如何从Quickwit搜刮引擎中创建Kafka源和获取Kafka源主题数据流,注意从Kafka流中读取数据流中的每条消息都必须包含一个JSON 对象,现在官方只支持数据源JSON格式数据导入。
官方关于Kafka源文档阐明:Kafka | Quickwit
创建Quickwit索引

因此前已在docker服务主机上创建了包含quickwit服务的docker容器,本文不详述quickwit安装过程,可参见《Docker安装Quickwit搜刮引擎》。
官方示例YMAL配置
  1. #
  2. # Index config file for gh-archive dataset.
  3. #
  4. version: 0.8
  5. index_id: gh-archive
  6. doc_mapping:
  7.   field_mappings:
  8.     - name: id
  9.       type: text
  10.       tokenizer: raw
  11.     - name: type
  12.       type: text
  13.       fast: true
  14.       tokenizer: raw
  15.     - name: public
  16.       type: bool
  17.       fast: true
  18.     - name: payload
  19.       type: json
  20.       tokenizer: default
  21.     - name: org
  22.       type: json
  23.       tokenizer: default
  24.     - name: repo
  25.       type: json
  26.       tokenizer: default
  27.     - name: actor
  28.       type: json
  29.       tokenizer: default
  30.     - name: other
  31.       type: json
  32.       tokenizer: default
  33.     - name: created_at
  34.       type: datetime
  35.       fast: true
  36.       input_formats:
  37.         - rfc3339
  38.       fast_precision: seconds
  39.   timestamp_field: created_at
  40. indexing_settings:
  41.   commit_timeout_secs: 10
复制代码
也可直接下载官方示例yaml文件,并通过quickwit服务命令创建索引
  1. # Download GH Archive index config.
  2. wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml
  3. # Create index.
  4. ./quickwit index create --index-config gh-archive.yaml
复制代码
Kafka服务安装

可自行百度,非本文焦点,略过...

下载Kafka测试数据

测试数据来源于Quickwit官方,参见:Kafka | Quickwit
创建kafka主题,下载官方提供的json测试数据包,将数据推送到Kafka主题;
  1. # Create a topic named `gh-archive` with 3 partitions.
  2. bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092
  3. # Download a few GH Archive files.
  4. wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz
  5. # Load the events into Kafka topic.
  6. gunzip -c 2022-05-12*.json.gz | \
  7. bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092
复制代码
由于文件包太大,本地下载其中1天json数据包,实行命令如下
  1. curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz
复制代码
创建Quickwit数据源

kafka数据源yaml配置示例
  1. #
  2. # Kafka source config file.
  3. #
  4. version: 0.8
  5. source_id: kafka-source
  6. source_type: kafka
  7. num_pipelines: 2
  8. params:
  9.   topic: gh-archive
  10.   client_params:
  11.     bootstrap.servers: localhost:9092
复制代码
也可直接下载官方提供的kafka-source.yaml文件,通过quickwit服务命令直接创建gh-archive索引对应的kafka源;
  1. # Download Kafka source config.
  2. wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml
  3. # Create source.
  4. ./quickwit source create --index gh-archive --source-config kafka-source.yaml
复制代码
完成上述操纵后,Quickwit会创建索引器和搜刮器,索引器将会边接到Kafka源指定topic主题上,并从对应group组主题分区上获取数据,通过流式传输到Quickwit中;
Java推送到Kafka主题

基于Java发送消息到Kafka示例代码,工具类可参见《Kafka消息服务之Java工具类》,本章不作详细讲述,通过代码片断进行演示;
  1. //生产者发送消息
  2. KafkaUtils.KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("192.168.1.3", 9092);
  3. String topic = "gh-archive";
  4. int n = 0;
  5. List<String>lines = FileUtils.readLines(new File("D:\\test\\kafka\\2022-05-12-10.json"), "UTF-8");
  6. for (String line : lines) {
  7.     System.out.println("发送消息:" + line.substring(0,30) + " ...");
  8.     //向kafka队列发送数据
  9.     kafkaStreamServer.sendMsg(topic, line);
  10.     if (n > 0 && n / 1000  == 0) {
  11.         //线程休眠
  12.         TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1, 200));
  13.     }
  14.     n ++;
  15. }
  16. //共累计推送到kafka数量:156040条
  17. System.out.println("共累计推送到kafka数量:" + n + "条");
  18. kafkaStreamServer.close();
复制代码
因官方测试数据比力大,本章节通过提前下载Json数据压缩包,只利用了其中一个Json压缩包:2022-05-12-10.json.gz
  1. curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz
复制代码
Java创建Quickwit索引和Kafka数据源

Quickwit提供了丰富的REST API,因此支持通过HTTP请求创建、维护索引、索引查询以及数据源维护等;以下直接利用Java步伐通过Http演示示例;
QuickwitKafkaSourceTest.java
  1. package com.example;
  2. import org.junit.jupiter.api.Test;
  3. public class QuickwitKafkaSourceTest {
  4.     private final static String QUICKWIT_URL = "http://192.168.1.3:7280/";
  5.     /**
  6.      * 创建索引
  7.      * @throws Exception
  8.      */
  9.     @Test
  10.     public void createIndex() throws Exception {
  11.         String indexConf = """
  12.                 {
  13.                     "version": "0.8",
  14.                     "index_id": "gh-archive",
  15.                     "doc_mapping": {
  16.                         "field_mappings": [
  17.                             {
  18.                                 "name": "id",
  19.                                 "type": "text",
  20.                                 "tokenizer": "raw"
  21.                             },
  22.                             {
  23.                                 "name": "type",
  24.                                 "type": "text",
  25.                                 "fast": true,
  26.                                 "tokenizer": "raw"
  27.                             },
  28.                             {
  29.                                 "name": "public",
  30.                                 "type": "bool",
  31.                                 "fast": true
  32.                             },
  33.                             {
  34.                                 "name": "payload",
  35.                                 "type": "json",
  36.                                 "tokenizer": "default"
  37.                             },
  38.                             {
  39.                                 "name": "org",
  40.                                 "type": "json",
  41.                                 "tokenizer": "default"
  42.                             },
  43.                             {
  44.                                 "name": "repo",
  45.                                 "type": "json",
  46.                                 "tokenizer": "default"
  47.                             },
  48.                             {
  49.                                 "name": "actor",
  50.                                 "type": "json",
  51.                                 "tokenizer": "default"
  52.                             },
  53.                             {
  54.                                 "name": "other",
  55.                                 "type": "json",
  56.                                 "tokenizer": "default"
  57.                             },
  58.                             {
  59.                                 "name": "created_at",
  60.                                 "type": "datetime",
  61.                                 "fast": true,
  62.                                 "input_formats": ["rfc3339"],
  63.                                 "fast_precision": "seconds"
  64.                             }
  65.                         ],
  66.                         "timestamp_field": "created_at"
  67.                     },
  68.                     "indexing_settings": {
  69.                         "commit_timeout_secs": 10
  70.                     }
  71.                 }
  72.                 """;
  73.         System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes", indexConf));
  74.     }
  75.     /**
  76.      * 创建数据源, 源支持:(kafka,kinesis,file)
  77.      * @throws Exception
  78.      */
  79.     @Test
  80.     public void createSources() throws Exception {
  81.         String indexId = "gh-archive";
  82.         String jsonDate = """
  83.                 {
  84.                     "version": "0.8",
  85.                     "source_id": "kafka-source",
  86.                     "source_type": "kafka",
  87.                     "num_pipelines": 1,
  88.                     "input_format": "json",
  89.                     "params": {
  90.                         "topic": "gh-archive",
  91.                         "client_params": {
  92.                             "auto.offset.reset": "earliest",
  93.                             "bootstrap.servers": "192.168.1.5:9092"
  94.                         }
  95.                     }
  96.                 }
  97.                 """;
  98.         System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources", jsonDate));
  99.     }
  100.     /**
  101.      * 将索引切换到一个新的源上
  102.      * @throws Exception
  103.      */
  104.     @Test
  105.     public void indexesSourceToggle() throws Exception {
  106.         String indexId = "gh-archive";
  107.         String sourceId = "kafka-source";
  108.         System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/toggle", "PUT", ""));
  109.     }
  110.    
  111.     /**
  112.      * 重置索引绑定的源
  113.      * @throws Exception
  114.      */
  115.     @Test
  116.     public void indexesSourceReset() throws Exception {
  117.         String indexId = "gh-archive";
  118.         String sourceId = "kafka-source";
  119.         System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/reset-checkpoint", "PUT", ""));
  120.     }
  121.     /**
  122.      * 删除索引绑定的源
  123.      * @throws Exception
  124.      */
  125.     @Test
  126.     public void indexesSourceDelete() throws Exception {
  127.         String indexId = "gh-archive";
  128.         String sourceId = "my-kafka-source";
  129.         System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId, "DELETE", null));
  130.     }
  131.     /**
  132.      * 获取索引描述信息
  133.      * @throws Exception
  134.      */
  135.     @Test
  136.     public void getIndexesDescribe() throws Exception {
  137.         String indexId = "gh-archive";
  138.         System.out.println("响应:" + HttpUtils.build().httpGet(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/describe"));
  139.     }
  140.     /**
  141.      * 删除索引数据和所有元数据
  142.      * @throws Exception
  143.      */
  144.     @Test
  145.     public void deleteIndexes() throws Exception {
  146.         String indexId = "gh-archive";
  147.         System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId , "DELETE", null));
  148.     }
  149. }
复制代码
HttpUtils.java
  1. package com.example;
  2. import java.net.URI;
  3. import java.net.http.HttpClient;
  4. import java.net.http.HttpRequest;
  5. import java.net.http.HttpResponse;
  6. import java.nio.charset.StandardCharsets;
  7. public class HttpUtils {
  8.     private HttpClient httpClient;
  9.     public static HttpUtils build(){
  10.         return new HttpUtils();
  11.     }
  12.     private HttpUtils() {
  13.         httpClient = HttpClient.newHttpClient();
  14.     }
  15.     public String httpPost(String url, String jsonData) throws Exception {
  16.         return http(url, "POST", jsonData);
  17.     }
  18.     public String httpGet(String url) throws Exception {
  19.         return http(url, "GET", null);
  20.     }
  21.     public String http(String url, String method, String jsonData) throws Exception {
  22.         System.out.println("请求:" + url);
  23.         HttpRequest request = null;
  24.         HttpRequest.Builder builder = HttpRequest.newBuilder()
  25.                 .uri(new URI( url))
  26.                 .header("Content-Type", "application/json; charset=UTF-8" )
  27.                 .header("Timeout", "5000");
  28.         if ("POST".equals(method)) {
  29.             request = builder.POST(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
  30.         } else if ("PUT".equals(method)) {
  31.             request = builder.PUT(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
  32.         } else if ("DELETE".equals(method)) {
  33.             request = builder.DELETE().build();
  34.         } else {
  35.             request = builder.GET().build();
  36.         }
  37.         HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
  38.         System.out.println("状态码:" + response.statusCode());
  39.         return response.body();
  40.     }
  41. }
复制代码
在Java开发工具中分别实行createIndex()和createSources()测试方法后,即可创建索引、绑定Kafka数据源;
检索Kafka数据

在推送Kafka数据后,通过Redpanda控制台工具(此工具本文不做详述,参见官方《redpanda-console-kafka-ui》)查看Kfaka主题巨细约438M;而且Json数据格式中有多层json嵌套;

在浏览器上输入Quickwit UI访问地点:http://127.0.0.1:7280,在Quickwit UI 》 Indexs 中查看已创建的gh-archive索引;

点击gh-archive索引,进入到SOURCES中查看已配Kafka源;

Quickwit索引器从Kafka源中获取数据流后写入到指定索引中,在Query editor功能中可以查询到已存储的索引数据;

参考:
REST API | Quickwit

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

缠丝猫

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表