使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

打印 上一主题 下一主题

主题 507|帖子 507|积分 1521


版本说明:
SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT
引擎说明:
Flink:1.16.2
Zeta:官方自带
前言

近些时间,我们正好接手一个数据集成项目,数据上游方是给我们投递到Kafka,我们一开始的技术选型是SpringBoot+Flink对上游数据进行加工处理(下文简称:方案一),由于测试不到位,后来到线上,发现数据写入效率完全不符合预期。后来将目光转到开源项目SeaTunnel上面,发现Source支持Kafka,于是开始研究测试,开发环境测试了500w+数据,发现效率在10000/s左右。果断放弃方案一,采取SeaTunnel对数据进行集成加工(下文简称:方案二)。在SeaTunnel研究的过程中,总结了两种方法,方法二相较于方法一,可以实现全场景使用,无需担心字段值里面各种意想不到的字符对数据落地造成错位现象的发生。
对比


在方案二的基础上又衍生出两种方法

所以,在经过长时间的探索和我们线上验证得出结论,建议使用方案二的方法二。
好了,我们进入正文,主篇幅主要介绍方案二中的两种方法,让大家主观的感受SeaTunnel的神奇。
方案一 Springboot+Flink实现Kafka 复杂JSON的解析

网上案例很多,在此不做过多介绍。
方案二 SeaTunnel实现Kafka复杂JSON的解析

在开始介绍之前,我们看一下我们上游方投递Kafka的Json样例数据(对部分数据进行了敏感处理),如下:
  1.     "magic": "a***G",
  2.     "type": "D***",
  3.     "headers": null,
  4.     "messageSchemaId": null,
  5.     "messageSchema": null,
  6.     "message": {
  7.         "data": {
  8.             "LSH": "187eb13****l0214723",
  9.             "NSRSBH": "9134****XERG56",
  10.             "QMYC": "01*****135468",
  11.             "QMZ": "1Zr*****UYGy%2F5bOWtrh",
  12.             "QM_SJ": "2023-05-05 16:42:10.000000",
  13.             "YX_BZ": "Y",
  14.             "ZGHQ_BZ": "Y",
  15.             "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
  16.             "SKSSQ": 202304,
  17.             "SWJG_DM": "",
  18.             "SWRY_DM": "00",
  19.             "CZSJ": "2023-05-05 16:42:10.000000",
  20.             "YNSRSBH": "9134****XERG56",
  21.             "SJTBSJ": "2023-06-26 19:29:59.000",
  22.             "SJCZBS": "I"
  23.         },
  24.         "beforeData": null,
  25.         "headers": {
  26.             "operation": "INSERT",
  27.             "changeSequence": "12440977",
  28.             "timestamp": "2023-06-26T19:29:59.673000",
  29.             "streamPosition": "00****3.16",
  30.             "transactionId": "000***0006B0002",
  31.             "changeMask": "0FFF***FF",
  32.             "columnMask": "0F***FFFF",
  33.             "transactionEventCounter": 1,
  34.             "transactionLastEvent": false
  35.         }
  36.     }
  37. }
复制代码
方法一、不通过UDF函数实现

存在问题:字段值存在分隔符,例如‘,’  则数据在落地的时候会发生错位现象。
该方法主要使用官网 transform-v2的各种转换插件进行实现,主要用到的插件有 Replace、Split以及Sql实现
ST脚本:(ybjc_qrqm.conf)
  1. env {
  2.   execution.parallelism = 100
  3.   job.mode = "STREAMING"
  4.   job.name = "kafka2mysql_ybjc"
  5.   execution.checkpoint.interval = 60000
  6. }
  7. source {
  8.   Kafka {
  9.     result_table_name = "DZFP_***_QRQM1"
  10.     topic = "DZFP_***_QRQM"
  11.     bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
  12.     schema = {
  13.       fields {
  14.         message =  {
  15.             data = {
  16.                 LSH = "string",
  17.                 NSRSBH =  "string",
  18.                 QMYC =  "string",
  19.                 QMZ =  "string",
  20.                 QM_SJ =  "string",
  21.                 YX_BZ =  "string",
  22.                 ZGHQ_BZ =  "string",
  23.                 ZGHQ_SJ =  "string",
  24.                 SKSSQ =  "string",
  25.                 SWJG_DM = "string",
  26.                 SWRY_DM = "string",
  27.                 CZSJ = "string",
  28.                 YNSRSBH = "string",
  29.                 SJTBSJ = "string",
  30.                 SJCZBS = "string"
  31.             }
  32.         }
  33.       }  
  34.     }
  35.           start_mode = "earliest"
  36.     #start_mode.offsets = {
  37.     #     0 = 0
  38.     #     1 = 0
  39.     #     2 = 0
  40.     #}       
  41.     kafka.config = {
  42.       auto.offset.reset = "earliest"
  43.       enable.auto.commit = "true"
  44.             # max.poll.interval.ms = 30000000
  45.             max.partition.fetch.bytes = "5242880"
  46.             session.timeout.ms = "30000"
  47.       max.poll.records = "100000"
  48.     }
  49.   }
  50. }
  51. transform {
  52. Replace {
  53.     source_table_name = "DZFP_***_QRQM1"
  54.     result_table_name = "DZFP_***_QRQM2"
  55.     replace_field = "message"
  56.     pattern = "[["
  57.     replacement = ""
  58.     #is_regex = true
  59.           #replace_first = true
  60.   }
  61.   Replace {
  62.     source_table_name = "DZFP_***_QRQM2"
  63.     result_table_name = "DZFP_***_QRQM3"
  64.     replace_field = "message"
  65.     pattern = "]]"
  66.     replacement = ""
  67.     #is_regex = true
  68.           #replace_first = true
  69.   }
  70.   
  71.   Split {
  72.     source_table_name = "DZFP_***_QRQM3"
  73.     result_table_name = "DZFP_***_QRQM4"
  74.     # 存在问题: 如果字段值存在分隔符 separator,则数据会错位
  75.     separator = ","
  76.     split_field = "message"
  77.     # 你的第一个字段包含在zwf5里面,,前五个占位符是固定的。
  78.     output_fields = [zwf1,zwf2,zwf3,zwf4,zwf5,nsrsbh,qmyc,qmz,qm_sj,yx_bz,zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,sjczbs]
  79.   }
  80.   
  81.   sql{
  82.    source_table_name = "DZFP_***_QRQM4"
  83.    query = "select replace(zwf5 ,'fields=[','') as lsh,nsrsbh,trim(qmyc) as qmyc,qmz,qm_sj,yx_bz, zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,replace(sjczbs,']}]}','') as sjczbs  from DZFP_DZDZ_QRPT_YWRZ_QRQM4 where skssq <> ' null'"
  84.    result_table_name = "DZFP_***_QRQM5"
  85.   }
  86.   
  87. }
  88. sink {
  89.         Console {
  90.                 source_table_name = "DZFP_***_QRQM5"
  91.         }
  92.        
  93.     jdbc {
  94.             source_table_name = "DZFP_***_QRQM5"
  95.         url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
  96.         driver = "com.mysql.cj.jdbc.Driver"
  97.         user = "user"
  98.         password = "pwd"
  99.         batch_size = 200000
  100.         database = "dbname"
  101.         table = "tablename"
  102.                                 generate_sink_sql = true
  103.         primary_keys = ["nsrsbh","skssq"]
  104.     }
  105. }
复制代码
正常写入数据是可以写入了。
写入成功如下:
● kafka源数据:

● tidb目标数据:

现在我们模拟给kafka发送一条数据,其中,SJTBSJ字段我在中间设置一个, 是逗号。
原始值:2023-06-26 19:29:59.000 更改之后的值2023-06-26 19:29:59.0,00
  1. 往topic生产一条数据命令
  2. kafka-console-producer.sh --topic DZFP_***_QRQM --broker-list centos1:19092,centos2:19092,centos3:19092
复制代码
发送如下:
  1.     "magic": "a***G",
  2.     "type": "D***",
  3.     "headers": null,
  4.     "messageSchemaId": null,
  5.     "messageSchema": null,
  6.     "message": {
  7.         "data": {
  8.             "LSH": "187eb13****l0214723",
  9.             "NSRSBH": "9134****XERG56",
  10.             "QMYC": "01*****135468",
  11.             "QMZ": "1Zr*****UYGy%2F5bOWtrh",
  12.             "QM_SJ": "2023-05-05 16:42:10.000000",
  13.             "YX_BZ": "Y",
  14.             "ZGHQ_BZ": "Y",
  15.             "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
  16.             "SKSSQ": 202304,
  17.             "SWJG_DM": "",
  18.             "SWRY_DM": "00",
  19.             "CZSJ": "2023-05-05 16:42:10.000000",
  20.             "YNSRSBH": "9134****XERG56",
  21.             "SJTBSJ": "2023-06-26 19:29:59.0,00",
  22.             "SJCZBS": "I"
  23.         },
  24.         "beforeData": null,
  25.         "headers": {
  26.             "operation": "INSERT",
  27.             "changeSequence": "12440977",
  28.             "timestamp": "2023-06-26T19:29:59.673000",
  29.             "streamPosition": "00****3.16",
  30.             "transactionId": "000***0006B0002",
  31.             "changeMask": "0FFF***FF",
  32.             "columnMask": "0F***FFFF",
  33.             "transactionEventCounter": 1,
  34.             "transactionLastEvent": false
  35.         }
  36.     }
  37. }
复制代码
写入之后,发现数据错位了。

结论:其实这个问题线上还是能遇到的,比如地址字段里面含有逗号,备注信息里面含有逗号等等,这种现象是不可避免的,所以此种方案直接pass。对数据危害性极大!可以处理简单的数据,当做一种思路。
方法二:通过UDF函数实现

该方法通过UDF函数扩展(https://seatunnel.apache.org/docs/2.3.2/transform-v2/sql-udf)的方式,实现嵌套kafka source json源数据的解析。可以大大简化ST脚本的配置
ST脚本:(ybjc_qrqm_yh.conf)
  1. env {
  2.     execution.parallelism = 5
  3.     job.mode = "STREAMING"
  4.     job.name = "kafka2mysql_ybjc_yh"
  5.     execution.checkpoint.interval = 60000
  6. }
  7. source {
  8.     Kafka {
  9.         result_table_name = "DZFP_***_QRQM1"
  10.         topic = "DZFP_***_QRQM"
  11.         bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
  12.         schema = {
  13.         fields {
  14.              message =  {
  15.                 data = "map<string,string>"
  16.              }
  17.         }  
  18.         }
  19.         start_mode = "earliest"
  20.         #start_mode.offsets = {
  21.         #     0 = 0
  22.         #     1 = 0
  23.         #     2 = 0
  24.         #}       
  25.         kafka.config = {
  26.         auto.offset.reset = "earliest"
  27.         enable.auto.commit = "true"
  28.         # max.poll.interval.ms = 30000000
  29.         max.partition.fetch.bytes = "5242880"
  30.         session.timeout.ms = "30000"
  31.         max.poll.records = "100000"
  32.         }
  33.     }
  34. }
  35. transform {
  36.     sql{
  37.         source_table_name = "DZFP_***_QRQM1"
  38.         result_table_name = "DZFP_***_QRQM2"
  39.         # 这里的qdmx就是我自定义的UDF函数,具体实现下文详细讲解。。。
  40.         query = "select qdmx(message,'lsh') as lsh,qdmx(message,'nsrsbh') as nsrsbh,qdmx(message,'qmyc') as qmyc,qdmx(message,'qmz') as qmz,qdmx(message,'qm_sj') as qm_sj,qdmx(message,'yx_bz') as yx_bz,qdmx(message,'zghq_bz') as zghq_bz,qdmx(message,'zghq_sj') as zghq_sj,qdmx(message,'skssq') as skssq,qdmx(message,'swjg_dm') as swjg_dm,qdmx(message,'swry_dm') as swry_dm,qdmx(message,'czsj') as czsj,qdmx(message,'ynsrsbh') as ynsrsbh, qdmx(message,'sjtbsj') as sjtbsj,qdmx(message,'sjczbs') as sjczbs  from  DZFP_DZDZ_QRPT_YWRZ_QRQM1"
  41.     }
  42. }
  43.    
  44. sink {
  45.     Console {
  46.         source_table_name = "DZFP_***_QRQM2"
  47.     }
  48.     jdbc {
  49.                   source_table_name = "DZFP_***_QRQM2"
  50.                   url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
  51.                   driver = "com.mysql.cj.jdbc.Driver"
  52.                   user = "user"
  53.                   password = "pwd"
  54.                   batch_size = 200000
  55.                   database = "dbname"
  56.                   table = "tablename"
  57.                   generate_sink_sql = true
  58.                   primary_keys = ["nsrsbh","skssq"]
  59.         }
  60. }
复制代码
执行脚本:查看结果,发现并没有错位,还在原来的字段(sjtbsj)上面。
这种方法,是通过key获取value值。不会出现方法一中的按照逗号分割出现数据错位现象。

具体UDF函数编写如下。
maven引入如下:
  1. <dependencies>
  2.   <dependency>
  3.     <groupId>org.apache.seatunnel</groupId>
  4.     <artifactId>seatunnel-transforms-v2</artifactId>
  5.     <version>2.3.2</version>
  6.     <scope>provided</scope>
  7.   </dependency>
  8.   <dependency>
  9.     <groupId>org.apache.seatunnel</groupId>
  10.     <artifactId>seatunnel-api</artifactId>
  11.     <version>2.3.2</version>
  12.   </dependency>
  13.   <dependency>
  14.     <groupId>cn.hutool</groupId>
  15.     <artifactId>hutool-all</artifactId>
  16.     <version>5.8.20</version>
  17.   </dependency>
  18.   <dependency>
  19.     <groupId>com.google.auto.service</groupId>
  20.     <artifactId>auto-service-annotations</artifactId>
  21.     <version>1.1.1</version>
  22.     <optional>true</optional>
  23.     <scope>compile</scope>
  24.   </dependency>
  25.   <dependency>
  26.     <groupId>com.google.auto.service</groupId>
  27.     <artifactId>auto-service</artifactId>
  28.     <version>1.1.1</version>
  29.     <optional>true</optional>
  30.     <scope>compile</scope>
  31.   </dependency>
  32. </dependencies>  
复制代码
UDF具体实现java代码如下:
[code]package org.seatunnel.sqlUDF;import cn.hutool.core.util.StrUtil;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.google.auto.service.AutoService;import org.apache.seatunnel.api.table.type.BasicType;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;import java.util.HashMap;import java.util.List;import java.util.Map;@AutoService(ZetaUDF.class)public class QdmxUDF implements ZetaUDF {    @Override    public String functionName() {        return "QDMX";    }    @Override    public SeaTunnelDataType resultType(List

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

徐锦洪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表