ToB企服应用市场:ToB评测及商务社交产业平台

标题: 使用 bend-ingest-kafka 将数据流实时导入到 Databend [打印本页]

作者: 万万哇    时间: 2024-7-26 03:12
标题: 使用 bend-ingest-kafka 将数据流实时导入到 Databend
作者:韩山杰
  Databend Cloud 研发工程师
  https://github.com/hantmac
  


Databend是一个开源、高性能、低成本易于扩展的新一代云数据堆栈。bend-ingest-kafka 是一个专为 Databend 计划的实时数据导入工具,它允许用户从 Apache Kafka 直接将数据流导入到 Databend 中,实现数据的实时分析和处理惩罚。
为什么选择bend-ingest-kafka?


环境准备

在使用 bend-ingest-kafka 之前,必要确保以下环境已经搭建好:

快速开始

Step 1: 安装 bend-ingest-kafka

可以从 Databend 的官方 GitHub 堆栈 release 页面 下载对应 OS 架构的 bend-ingest-kafka 的可实行二进制文件,大概直接实行下令安装最新版本。
  1. go install  github.com/databendcloud/bend-ingest-kafka@latest
复制代码
Step 2: 设置 bend-ingest-kafka

设置文件通常包罗 Kafka 的连接以及设置信息、Databend 的连接信息以及数据转换的逻辑。以下是一个简单的设置示例:
  1. {
  2.   "kafkaBootstrapServers": "localhost:9092",
  3.   "kafkaTopic": "ingest_test",
  4.   "KafkaConsumerGroup": "test",
  5.   "mockData": "",
  6.   "isJsonTransform": false,
  7.   "databendDSN": "https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443",
  8.   "databendTable": "default.kfk_test",
  9.   "batchSize": 10,
  10.   "batchMaxInterval": 5,
  11.   "dataFormat": "json",
  12.   "workers": 1,
  13.   "copyPurge": false,
  14.   "copyForce": false,
  15.   "disableVariantCheck": true,
  16.   "minBytes": 1024,
  17.   "maxBytes": 1048576,
  18.   "maxWait": 10,
  19.   "useReplaceMode": false,
  20.   "userStage": "~"
  21. }
复制代码
具体的设置参数可以参考 Parameter References,这里对几个比较重要的参数睁开解释。

Step 3: 启动数据导入

这里使用 raw-data 模式作演示。
Kafka 的 Json 数据示例为:

  1. {"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
复制代码
模拟 kafka 生产数据

可以使用下面的脚本快速生成 kafka json 数据:
  1. from confluent_kafka import Producer
  2. # 创建一个Producer实例
  3. p = Producer({'bootstrap.servers': 'localhost:9092'})
  4. for i in range(1000000):
  5.     json_data = '{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}'
  6.     p.produce('ingest_test', json_data)
  7.     print(i)
  8.     p.flush()
复制代码
使用设置文件启动 bend-ingest-kafka

默认读取 ./config/conf.json 设置文件,开始将 Kafka 中的数据导入到 Databend。
  1. ./bend-ingest-kafka
复制代码
启动后可以看到 log 和 metrics:


到 Databend 中可以查询到已经同步的数据:

 由于 raw_data 和 record_metadata 的字段格式都是 JSON ,所以可以很灵活地做一些数据分析:
  1. select record_metadata['partition'] p,
  2.                 min(record_metadata['offset']::bigint) o1,
  3.         max(record_metadata['offset']::bigint) o2,
  4.         o2-o1+1 sub_count,
  5.         count(distinct record_metadata['offset']) distinct_cnt,
  6.         count(1) cnt
  7. from default.kfk_test
  8. group by p
  9. order by p;
复制代码


高级特性


结语

bend-ingest-kafka 作为一个强大的工具,为 Databend 用户提供了从 Kafka 实时导入数据的能力。通过本文的介绍,用户应该可以或许快速上手并利用这个工具来实现实时数据处理惩罚的需求。
关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓办理方案,打造新一代开源 Data Cloud。





欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4