使用SeaTunnel从InfluxDB同步数据到Doris

打印 上一主题 下一主题

主题 917|帖子 917|积分 2751

本文先容了如何使用SeaTunnel将数据从InfluxDB同步到Doris。通过SeaTunnel强盛的数据集成功能,用户可以高效地将存储于InfluxDB中的时间序列数据传输至Doris,便于数据的访问与分析。
版本信息:
SeaTunnel 2.3.3
InfluxDB 2.7.6
Doris 2.1.3 rc09
预备事项

SeaTunnel2.3.3的安装过程这里就省略了,可以参考官网文档。
SeaTunnel2.3.3安装好以后须要删掉两个连接用的jar包,不然后面同步数据库会报错:connector-hudi-2.3.3.jar和connector-datahub-2.3.3.jar.
须要增加的jar包:seatunnel-api-2.3.3.jar,seatunnel-transforms-v2-2.3.3.jar,mysql-connector-java-8.0.28.jar,jersey-client-1.19.4.jar,这四个jar包必须添加,不然无法同步数据运行同步脚本直接报错没有某个类。
InfluxDB 2.7.6 须要做的前提事项:下面这个步骤必须要做,不然查不到数据,
InfluxDB Studio-0.2.0(这个客户端工具有个好处,可以查看字段类型,方便同步文件中的字段类型的界说,其他的客户端似乎没有,也有可能是我没发现),可下载这个客户端进行连接查询数据。
Linux安装influxDB 2.7.6版本后,正常使用ip:8086 可访问influxdb UI,填写用户名、密码、org、buckets。
同步过程及踩坑点

SeaTunnel 2.3中集成InfluxDB配置用户名、密码后,执行同步任务总是报获取字段异常信息。
于是乎跟踪SeaTunnel代码,发现内部一直401权限认证失败。于是使用InfluxDB Studio数据库管理工具连接,输入ui页面相同的用户名,密码后一直报401权限认证不通过。通过查资料发现ui页面的用户名密码仅供ui页面使用,不能作为数据库自己访问的用户名密码。
使用iInfluxDB client客户端,查询权限influx v1 auth list结果为空。
使用命令分配权限
influx v1 auth create -o orgName --read-bucket bucketId --username=username,
或者:influx v1 auth create -o "构造名称" --write-bucket bucketId(桶id,不须要引号) --read-bucket bucketId(桶id,不须要引号) --username=账号 --password=密码
删除命令:influx v1 auth delete --id 'id编码'
删除命令中的id编码为influx v1 auth list命令查出来的ID,下图所示:

命令执行完成后需输入两次密码。InfluxDB Studio数据库管理工具再次使用此用户名密码登录成功,SeaTunnel同步成功。
同步数据配置文件:v1.batch.config_tmp.template:
  1. env {
  2.   execution.parallelism = 1
  3.   job.mode = "BATCH"
  4.   checkpoint.interval = 10000
  5. }
  6. source {
  7.   influxdb {
  8.     url = "http://X.X.X.X:8086"
  9.     token = "写自己的token" #可有可无
  10.     org = "自己的组织名称"
  11.     bucket = "自己的桶" #可有可无
  12.     database = "自己的桶"
  13.     username = "写在第四步自己新建的influxdb账号"
  14.     password = "写在第四步自己新建的influxdb密码"
  15.     epoch = "H" #这个有好几级,可以去官网查看
  16.     query_timeout_sec = 600
  17.     measurement = "prometheus_remote_write" #数据表
  18.     fields = ["node_cpu_seconds_total", "node_memory_MemTotal_bytes"] #可有可无,配置自己的字段
  19.     sql = """SELECT node_cpu_seconds_total as system_cpu_usage,cpu as process_occupy_physical_memory_size,job as create_dept,node_memory_MemTotal_bytes as process_read_written_file_system_total_bytes,node_memory_MemAvailable_bytes as process_open_file_describe_quantity,time as create_time FROM "prometheus_remote_write" where time > now() - 1h"""
  20.     where = " where time > now() - 1h"
  21.      #经过本人测试。上面的sql查询的字段必须经过重命名,或者doris建表的字段必须和influxdb2的字段完全一致,不然transform 中进行转换的时候就会成为空值,这个我还没研究明白为什么,研究明白了在补上说明,doris的表字段类型也必须和influxdb2中查询的字段类型一致,不然数据存不到doris中。schema 重定义的事influxdb2查到的字段和类型
  22.      schema {
  23.       fields {
  24.         #node_cpu_seconds_total = FLOAT
  25.         system_cpu_usage = FLOAT
  26.         process_occupy_physical_memory_size = INT
  27.         create_dept = STRING
  28.         process_read_written_file_system_total_bytes = FLOAT
  29.         process_open_file_describe_quantity = FLOAT
  30.         create_time = BIGINT
  31.       }
  32.     }
  33.   }
  34. }
  35. sink {
  36.   Doris {
  37.     fenodes = "X.X.X.X:8030"
  38.     username = "账号"
  39.     password = "密码"
  40.     table.identifier = "sbyw_data_acquisition.sbyw_application_process_type_tmp"
  41.     sink.label-prefix = "test-cdc"
  42.     sink.enable-2pc = "true"
  43.     sink.enable-delete = "true"
  44.     sink.max-retries = 3
  45.     batch_size = 10000
  46.     result_table_name = "sbyw_application_process_type_tmp"
  47.     doris.config {
  48.       format = "json"
  49.       read_json_by_line = "true"
  50.     }
  51.   }
  52. }
  53. transform {
  54.   FieldMapper {
  55.     source_table_name = "prometheus_remote_write"
  56.     result_table_name = "sbyw_application_process_type_tmp"
  57.     field_mapper = {
  58.         #node_cpu_seconds_total = system_cpu_usage
  59.         system_cpu_usage = system_cpu_usage
  60.         process_occupy_physical_memory_size = process_occupy_physical_memory_size
  61.         process_read_written_file_system_total_bytes = process_read_written_file_system_total_bytes
  62.         process_open_file_describe_quantity = process_open_file_describe_quantity
  63.         create_time = create_time
  64.         create_dept = create_dept
  65.     }
  66.   }
  67. }
复制代码
写好同步数据脚本文件运行同步命令:./bin/seatunnel.sh -c ./config/v1.batch.config_tmp.template
下面是我Doris的测试表:

下面是InfluxDB Studio-0.2.0客户端查到 InfluxDB 2.7.6的数据:

InfluxDB 2.7.6有个坑点,它支持sql查询,但不完全支持,它只支持常规的简朴查询,例如下图中的查询就可以查询,但是如下图所示,可能会有人说我后面没加group by,经过测试是不行的,即使加上group by也是无法执行,那是由于官方压根不支持的这种查询。

但是下图这样是可以的,InfluxDB 2官方就是这样计划的,聚合查询无法和单字段进行同步查询。

末了是运行结果:

同步到Doris的数据:

原文链接:https://blog.csdn.net/2401_84562349/article/details/140919192
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表