先容
Quickwit可以将数据从一个或多个源插入到索引中。创建索引后,可以利用CLI 命令quickwit source create添加源,支持的源有:file、kafka、kinesis、pulsar。
本章解说如何从Quickwit搜刮引擎中创建Kafka源和获取Kafka源主题数据流,注意从Kafka流中读取数据流中的每条消息都必须包含一个JSON 对象,现在官方只支持数据源JSON格式数据导入。
官方关于Kafka源文档阐明:Kafka | Quickwit
创建Quickwit索引
因此前已在docker服务主机上创建了包含quickwit服务的docker容器,本文不详述quickwit安装过程,可参见《Docker安装Quickwit搜刮引擎》。
官方示例YMAL配置
- #
- # Index config file for gh-archive dataset.
- #
- version: 0.8
- index_id: gh-archive
- doc_mapping:
- field_mappings:
- - name: id
- type: text
- tokenizer: raw
- - name: type
- type: text
- fast: true
- tokenizer: raw
- - name: public
- type: bool
- fast: true
- - name: payload
- type: json
- tokenizer: default
- - name: org
- type: json
- tokenizer: default
- - name: repo
- type: json
- tokenizer: default
- - name: actor
- type: json
- tokenizer: default
- - name: other
- type: json
- tokenizer: default
- - name: created_at
- type: datetime
- fast: true
- input_formats:
- - rfc3339
- fast_precision: seconds
- timestamp_field: created_at
- indexing_settings:
- commit_timeout_secs: 10
复制代码 也可直接下载官方示例yaml文件,并通过quickwit服务命令创建索引
- # Download GH Archive index config.
- wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml
- # Create index.
- ./quickwit index create --index-config gh-archive.yaml
复制代码 Kafka服务安装
可自行百度,非本文焦点,略过...
下载Kafka测试数据
测试数据来源于Quickwit官方,参见:Kafka | Quickwit
创建kafka主题,下载官方提供的json测试数据包,将数据推送到Kafka主题;
- # Create a topic named `gh-archive` with 3 partitions.
- bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092
- # Download a few GH Archive files.
- wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz
- # Load the events into Kafka topic.
- gunzip -c 2022-05-12*.json.gz | \
- bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092
复制代码 由于文件包太大,本地下载其中1天json数据包,实行命令如下
- curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz
复制代码 创建Quickwit数据源
kafka数据源yaml配置示例
- #
- # Kafka source config file.
- #
- version: 0.8
- source_id: kafka-source
- source_type: kafka
- num_pipelines: 2
- params:
- topic: gh-archive
- client_params:
- bootstrap.servers: localhost:9092
复制代码 也可直接下载官方提供的kafka-source.yaml文件,通过quickwit服务命令直接创建gh-archive索引对应的kafka源;
- # Download Kafka source config.
- wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml
- # Create source.
- ./quickwit source create --index gh-archive --source-config kafka-source.yaml
复制代码 完成上述操纵后,Quickwit会创建索引器和搜刮器,索引器将会边接到Kafka源指定topic主题上,并从对应group组主题分区上获取数据,通过流式传输到Quickwit中;
Java推送到Kafka主题
基于Java发送消息到Kafka示例代码,工具类可参见《Kafka消息服务之Java工具类》,本章不作详细讲述,通过代码片断进行演示;
- //生产者发送消息
- KafkaUtils.KafkaStreamServer kafkaStreamServer = KafkaUtils.bulidServer().createKafkaStreamServer("192.168.1.3", 9092);
- String topic = "gh-archive";
- int n = 0;
- List<String>lines = FileUtils.readLines(new File("D:\\test\\kafka\\2022-05-12-10.json"), "UTF-8");
- for (String line : lines) {
- System.out.println("发送消息:" + line.substring(0,30) + " ...");
- //向kafka队列发送数据
- kafkaStreamServer.sendMsg(topic, line);
- if (n > 0 && n / 1000 == 0) {
- //线程休眠
- TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1, 200));
- }
- n ++;
- }
- //共累计推送到kafka数量:156040条
- System.out.println("共累计推送到kafka数量:" + n + "条");
- kafkaStreamServer.close();
复制代码 因官方测试数据比力大,本章节通过提前下载Json数据压缩包,只利用了其中一个Json压缩包:2022-05-12-10.json.gz;
- curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz
复制代码 Java创建Quickwit索引和Kafka数据源
Quickwit提供了丰富的REST API,因此支持通过HTTP请求创建、维护索引、索引查询以及数据源维护等;以下直接利用Java步伐通过Http演示示例;
QuickwitKafkaSourceTest.java
- package com.example;
- import org.junit.jupiter.api.Test;
- public class QuickwitKafkaSourceTest {
- private final static String QUICKWIT_URL = "http://192.168.1.3:7280/";
- /**
- * 创建索引
- * @throws Exception
- */
- @Test
- public void createIndex() throws Exception {
- String indexConf = """
- {
- "version": "0.8",
- "index_id": "gh-archive",
- "doc_mapping": {
- "field_mappings": [
- {
- "name": "id",
- "type": "text",
- "tokenizer": "raw"
- },
- {
- "name": "type",
- "type": "text",
- "fast": true,
- "tokenizer": "raw"
- },
- {
- "name": "public",
- "type": "bool",
- "fast": true
- },
- {
- "name": "payload",
- "type": "json",
- "tokenizer": "default"
- },
- {
- "name": "org",
- "type": "json",
- "tokenizer": "default"
- },
- {
- "name": "repo",
- "type": "json",
- "tokenizer": "default"
- },
- {
- "name": "actor",
- "type": "json",
- "tokenizer": "default"
- },
- {
- "name": "other",
- "type": "json",
- "tokenizer": "default"
- },
- {
- "name": "created_at",
- "type": "datetime",
- "fast": true,
- "input_formats": ["rfc3339"],
- "fast_precision": "seconds"
- }
- ],
- "timestamp_field": "created_at"
- },
- "indexing_settings": {
- "commit_timeout_secs": 10
- }
- }
- """;
- System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes", indexConf));
- }
- /**
- * 创建数据源, 源支持:(kafka,kinesis,file)
- * @throws Exception
- */
- @Test
- public void createSources() throws Exception {
- String indexId = "gh-archive";
- String jsonDate = """
- {
- "version": "0.8",
- "source_id": "kafka-source",
- "source_type": "kafka",
- "num_pipelines": 1,
- "input_format": "json",
- "params": {
- "topic": "gh-archive",
- "client_params": {
- "auto.offset.reset": "earliest",
- "bootstrap.servers": "192.168.1.5:9092"
- }
- }
- }
- """;
- System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources", jsonDate));
- }
- /**
- * 将索引切换到一个新的源上
- * @throws Exception
- */
- @Test
- public void indexesSourceToggle() throws Exception {
- String indexId = "gh-archive";
- String sourceId = "kafka-source";
- System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/toggle", "PUT", ""));
- }
-
- /**
- * 重置索引绑定的源
- * @throws Exception
- */
- @Test
- public void indexesSourceReset() throws Exception {
- String indexId = "gh-archive";
- String sourceId = "kafka-source";
- System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/reset-checkpoint", "PUT", ""));
- }
- /**
- * 删除索引绑定的源
- * @throws Exception
- */
- @Test
- public void indexesSourceDelete() throws Exception {
- String indexId = "gh-archive";
- String sourceId = "my-kafka-source";
- System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId, "DELETE", null));
- }
- /**
- * 获取索引描述信息
- * @throws Exception
- */
- @Test
- public void getIndexesDescribe() throws Exception {
- String indexId = "gh-archive";
- System.out.println("响应:" + HttpUtils.build().httpGet(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/describe"));
- }
- /**
- * 删除索引数据和所有元数据
- * @throws Exception
- */
- @Test
- public void deleteIndexes() throws Exception {
- String indexId = "gh-archive";
- System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId , "DELETE", null));
- }
- }
复制代码 HttpUtils.java
- package com.example;
- import java.net.URI;
- import java.net.http.HttpClient;
- import java.net.http.HttpRequest;
- import java.net.http.HttpResponse;
- import java.nio.charset.StandardCharsets;
- public class HttpUtils {
- private HttpClient httpClient;
- public static HttpUtils build(){
- return new HttpUtils();
- }
- private HttpUtils() {
- httpClient = HttpClient.newHttpClient();
- }
- public String httpPost(String url, String jsonData) throws Exception {
- return http(url, "POST", jsonData);
- }
- public String httpGet(String url) throws Exception {
- return http(url, "GET", null);
- }
- public String http(String url, String method, String jsonData) throws Exception {
- System.out.println("请求:" + url);
- HttpRequest request = null;
- HttpRequest.Builder builder = HttpRequest.newBuilder()
- .uri(new URI( url))
- .header("Content-Type", "application/json; charset=UTF-8" )
- .header("Timeout", "5000");
- if ("POST".equals(method)) {
- request = builder.POST(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
- } else if ("PUT".equals(method)) {
- request = builder.PUT(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
- } else if ("DELETE".equals(method)) {
- request = builder.DELETE().build();
- } else {
- request = builder.GET().build();
- }
- HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
- System.out.println("状态码:" + response.statusCode());
- return response.body();
- }
- }
复制代码 在Java开发工具中分别实行createIndex()和createSources()测试方法后,即可创建索引、绑定Kafka数据源;
检索Kafka数据
在推送Kafka数据后,通过Redpanda控制台工具(此工具本文不做详述,参见官方《redpanda-console-kafka-ui》)查看Kfaka主题巨细约438M;而且Json数据格式中有多层json嵌套;
在浏览器上输入Quickwit UI访问地点:http://127.0.0.1:7280,在Quickwit UI 》 Indexs 中查看已创建的gh-archive索引;
点击gh-archive索引,进入到SOURCES中查看已配Kafka源;
Quickwit索引器从Kafka源中获取数据流后写入到指定索引中,在Query editor功能中可以查询到已存储的索引数据;
参考:
REST API | Quickwit
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |