王炸组合:Dolphinscheudler 3.1.*搭配SeaT unnel2.3.*高效完成异构数据数 ...

打印 上一主题 下一主题

主题 797|帖子 797|积分 2391


概述

本篇主要介绍怎样通过Dolphinscheduler海豚调度搭配Seatunnel完成异构数据源之间的数据同步功能,这个在大数据流批一体数仓建设的过程中是一个非常好的解决方案, 稳定高效,只要用上了你肯定爱不释手。
环境准备


  • dolphinscheduler集群 >= 3.1.5
  • dolphinscheduler3.1.5版本源码
  • Seatunnel集群 >= 2.3.3
没有安装好以上准备环境的童鞋,请先参考我的另外两篇文章完成基础环境搭建基于Seatunnel最新2.3.5版本分布式集群安装部署指南(小白版)及dolphinscheduler分布式集群部署指南(小白版)再回到章节继续。
配置文件修改

这里分析一下, 通过海豚调度配置的Seatunnel数据同步任务最后都会被分配到DS集群的某个Worker组大概某个worker节点上进行执行,所以你要保证你的DS集群的目标worker节点上也安装了Seatunnel服务。这很重要,因为实际dolphisncheduler中定义的seatunnel任务实例到最后都是需要调用worker节点上的seatunnel服务在本地执行seatunnel的任务启动命令来完成任务提交和运行。
Dolphinscheduler的配置文件修改

因为我们需要使用seatunnel完成数据集成,所以我们需要在dolphinscheduler的体系环境变量中将我们的Seatunnel的安装目录进行配置。
找到你的dolphinscheduler主节点的安装目录下的$DOLPHINSCHEDULER_HOME/bin/env/dolphinscheduler_env.sh。
设置SEATUNNEL_HOME的访问目录,将SEATUNNEL_HOME设置为你自己的SeaTunnel安装目录。
  1. export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel-2.3.5}
复制代码
然后生存重启Dolphinscheduler集群即可完成配置修改同步到所有的api-server、master-server及worker-server节点。
Dolphinscheduler部分源码修改

为什么要修改Dolphinscheduler的源码?
因为我这里使用的Seatunnel的版本是2.3.5,使用的引擎不是Seatunnel的默认引擎, 用的是Spark引擎, Spark我用的版本是2.4.5, 所有我最后在命令执行的命令如下:
  1. $SEATUNNEL_HOME/bin/start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template
复制代码
假如我用的是Spark3.X的版本,我执行命令如下:
  1. $SEATUNNEL_HOME/bin/start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config /opt/software/seatunnel-2.3.5/config/app-config/v2.batch.config.template
复制代码
然而在Dolphinscheduler3.1.5版本的Seatunnel任务插件中,存在一些问题没办法兼容, 首先是前端,这里引擎只支持Spark和Flink,没有针对具体的版本进行兼容,没办法自由的选择使用Spark2、Spark3照旧FIink13、Flink15。

其次就是后端的代码。

找到EngineEnum类, 修改一下代码如下:
  1. public enum EngineEnum {
  2.     // FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink.sh"),
  3.     // SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark.sh");
  4.     FLINK("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-13-connector-v2.sh"),
  5.     FLINK15("${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh"),
  6.     SPARK("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-2-connector-v2.sh"),
  7.     SPARK3("${SEATUNNEL_HOME}/bin/start-seatunnel-spark-3-connector-v2.sh");
  8.     private String command;
  9.     EngineEnum(String command) {
  10.         this.command = command;
  11.     }
  12.     public String getCommand() {
  13.         return command;
  14.     }
  15. }
复制代码
这样修改完毕之后就OK了, 然后编译打包dolphinscheduler的源码。
更新Dolphinscheduler集群中的SeaTunnel任务插件

项目编译打包完成之后,找到dolphinscheduler-task-seatunnel工程下的target目录下的dolphinscheduler-task-seatunnel-3.1.5.jar包, 上传到你的dolphinscheduler集群的主节点。

然后将主节点上DS安装目录下的api-server/libs、master-server/libs、worker-server/libs、alert-server/libs目录(其实这里可以只替换woker-server/libs目录)下的dolphinscheduler-task-seatunnel-3.1.5.jar重命名为dolphinscheduler-task-seatunnel-3.1.5.jar.20240606(带上日期方便知道变动时间)。

然后将我们编译的dolphinscheduler-task-seatunnel-3.1.5.jar拷贝到这几个目录(api-server/libs、master-server/libs、worker-server/libs、alert-server/libs目录,确认一下是不是所有目录下都有这个dolphinscheduler-task-seatunnel-3.1.5.jar,没有的目录就直接略过)下。
然后使用主节点上的分发脚本,将api-server/libs、master-server/libs、worker-server/libs、alert-server/libs的修改同步到其他的DS节点上,分发完成之后,查抄一下分发是否成功。
最后就是重启我们的DS集群,通过以上步骤我们就完成了Dolphisncheduler中SeaTunnel插件的升级适配。
测试验证

我们通过dolphinscheduler的工作流定义页面定义一个Seatunnel数据同步的任务, 完成Oracle数据库表收罗到MySQL数据库的任务, 下面我们来操纵。
关于seatunnel任务配置脚本文件,官网的文档介绍如下:
Source输入源配置定义分析

这里我们的输入原始Oracle, 所以直接从Source中查找Oracle相干的配置怎样定义,官网给我们提供了不少任务示例,:
简朴任务示例
  1. # Defining the runtime environment
  2. env {
  3.   parallelism = 4
  4.   job.mode = "BATCH"
  5. }
  6. source{
  7.     Jdbc {
  8.         url = "jdbc:oracle:thin:@datasource01:1523:xe"
  9.         driver = "oracle.jdbc.OracleDriver"
  10.         user = "root"
  11.         password = "123456"
  12.         query = "SELECT * FROM TEST_TABLE"
  13.     }
  14. }
  15. transform {}
  16. sink {
  17.     Console {}
  18. }
复制代码
按分区列并行任务示例

并行读取你配置的分片字段和分片数据,假如你想读取整个表,可以这样做
  1. env {
  2.   parallelism = 4
  3.   job.mode = "BATCH"
  4. }
  5. source {
  6.     Jdbc {
  7.         url = "jdbc:oracle:thin:@datasource01:1523:xe"
  8.         driver = "oracle.jdbc.OracleDriver"
  9.         connection_check_timeout_sec = 100
  10.         user = "root"
  11.         password = "123456"
  12.         # 根据需要定义查询逻辑
  13.         query = "SELECT * FROM TEST_TABLE"
  14.         # 设置并行分片读取字段
  15.         partition_column = "ID"
  16.         # 分区切片数量
  17.         partition_num = 10
  18.         properties {
  19.         database.oracle.jdbc.timezoneAsRegion = "false"
  20.         }
  21.     }
  22. }
  23. sink {
  24.   Console {}
  25. }
复制代码
按主键或唯一索引并行任务示例

配置table_path会开启自动分割,可以配置split.*来调解分割策略
  1. env {
  2.   parallelism = 4
  3.   job.mode = "BATCH"
  4. }
  5. source {
  6.     Jdbc {
  7.         url = "jdbc:oracle:thin:@datasource01:1523:xe"
  8.         driver = "oracle.jdbc.OracleDriver"
  9.         connection_check_timeout_sec = 100
  10.         user = "root"
  11.         password = "123456"
  12.         table_path = "DA.SCHEMA1.TABLE1"
  13.         query = "select * from SCHEMA1.TABLE1"
  14.         split.size = 10000
  15.     }
  16. }
  17. sink {
  18.   Console {}
  19. }
复制代码
并行上下限任务示例

指定查询的上下限内的数据效率更高按照你配置的上下限来读取你的数据源效率更高
  1. source {
  2.     Jdbc {
  3.         url = "jdbc:oracle:thin:@datasource01:1523:xe"
  4.         driver = "oracle.jdbc.OracleDriver"
  5.         connection_check_timeout_sec = 100
  6.         user = "root"
  7.         password = "123456"
  8.         # Define query logic as required
  9.         query = "SELECT * FROM TEST_TABLE"
  10.         partition_column = "ID"
  11.         # Read start boundary
  12.         partition_lower_bound = 1
  13.         # Read end boundary
  14.         partition_upper_bound = 500
  15.         partition_num = 10
  16.     }
  17. }
复制代码
多表读取任务示例

配置table_list会开启自动分割,可以通过配置split.来调解分割策略*
  1. env {
  2.   job.mode = "BATCH"
  3.   parallelism = 4
  4. }
  5. source {
  6.   Jdbc {
  7.     url = "jdbc:oracle:thin:@datasource01:1523:xe"
  8.     driver = "oracle.jdbc.OracleDriver"
  9.     connection_check_timeout_sec = 100
  10.     user = "root"
  11.     password = "123456"
  12.     "table_list"=[
  13.         {
  14.             "table_path"="XE.TEST.USER_INFO"
  15.         },
  16.         {
  17.             "table_path"="XE.TEST.YOURTABLENAME"
  18.         }
  19.     ]
  20.     #where_condition= "where id > 100"
  21.     split.size = 10000
  22.     #split.even-distribution.factor.upper-bound = 100
  23.     #split.even-distribution.factor.lower-bound = 0.05
  24.     #split.sample-sharding.threshold = 1000
  25.     #split.inverse-sampling.rate = 1000
  26.   }
  27. }
  28. sink {
  29.   Console {}
  30. }
复制代码
Sink输出源配置定义分析

简朴任务示例

本示例定义了一个SeaTunnel同步任务,通过FakeSource自动天生数据并发送到JDBC Sink。FakeSource一共天生16行数据(row.num=16),每行有两个字段name(string类型)和age(int类型)。最终的目标表为test_table,表中同样会有16行数据。运行此作业之前,你需要在mysql中创建数据库test和表test_table。假如你还没有安装和部署SeaTunnel,你需要按照安装SeaTunnel中的分析安装并部署SeaTunnel。然后按照快速开始使用SeaTunnel引擎中的分析运行此作业。
  1. env {
  2.   parallelism = 1
  3.   job.mode = "BATCH"
  4. }
  5. source {
  6.   FakeSource {
  7.     parallelism = 1
  8.     result_table_name = "fake"
  9.     row.num = 16
  10.     schema = {
  11.       fields {
  12.         name = "string"
  13.         age = "int"
  14.       }
  15.     }
  16.   }
  17. }
  18. transform {}
  19. sink {
  20.     jdbc {
  21.         url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  22.         driver = "com.mysql.cj.jdbc.Driver"
  23.         user = "root"
  24.         password = "123456"
  25.         query = "insert into test_table(name,age) values(?,?)"
  26.         }
  27. }
复制代码
天生输出SQL任务示例

本示例无需编写复杂的sql语句,您可以配置输出端数据库名称表名称来自动为您天生添加语句
  1. sink {
  2.     jdbc {
  3.         url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  4.         driver = "com.mysql.cj.jdbc.Driver"
  5.         user = "root"
  6.         password = "123456"
  7.         # 根据数据库表名自动生成sql语句
  8.         generate_sink_sql = true
  9.         database = test
  10.         table = test_table
  11.     }
  12. }
复制代码
精确任务示例

对于需要精确写入场景,我们保证精确一次。
  1. sink {
  2.     jdbc {
  3.         url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  4.         driver = "com.mysql.cj.jdbc.Driver"
  5.         max_retries = 0
  6.         user = "root"
  7.         password = "123456"
  8.         query = "insert into test_table(name,age) values(?,?)"
  9.         is_exactly_once = "true"
  10.         xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
  11.     }
  12. }
复制代码
CDC(变动数据捕捉)变乱

我们也支持CDC变动数据在这种情况下,您需要配置数据库,表和primary_keys。
  1. sink {
  2.     jdbc {
  3.         url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  4.         driver = "com.mysql.cj.jdbc.Driver"
  5.         user = "root"
  6.         password = "123456"
  7.         generate_sink_sql = true
  8.         # You need to configure both database and table
  9.         database = test
  10.         table = sink_table
  11.         primary_keys = ["id","name"]
  12.         field_ide = UPPERCASE
  13.         schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
  14.         data_save_mode="APPEND_DATA"
  15.     }
  16. }
复制代码
完整测试脚本配置文件

下面给出本示例中完整的配置文件示例
  1. env {
  2.   parallelism = 4
  3.   job.mode = "BATCH"
  4. }
  5. source{
  6.     Jdbc {
  7.         url = "jdbc:oracle:thin:@192.168.11.101:15210:YLAPP"
  8.         driver = "oracle.jdbc.OracleDriver"
  9.         user = "appuser001"
  10.         password = "appuser001"
  11.         query = "SELECT * FROM YL_APP.MET_COM_ICDOPERATION_LS"
  12.     }
  13. }
  14. transform {}
  15. sink {
  16.     jdbc {
  17.         url = "jdbc:mysql://192.168.10.210:13306/yl-app-new?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
  18.         driver = "com.mysql.cj.jdbc.Driver"
  19.         user = "appuser001"
  20.         password = "appuser001"
  21.         generate_sink_sql = "true"
  22.         database = "hive"
  23.         table = "met_com_icdoperation_ls"
  24.         schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
  25.         data_save_mode="APPEND_DATA"
  26.     }
  27. }
复制代码

将上述脚本中的数据库配置信息修改成你的数据连接配置, 然后将脚本覆盖到上图脚本输入中, 生存工作流, 上线之后启动工作流。

到对应数据库验证
原来的Oracle数据库表

同步之后的MySQL数据库表

任务成功了, 数据也成功同步过来了, OK,测试通过!大家接下来可以在这个Demo的基础上进行更多的扩展和挖掘, 实战的多了, 你对于Dolphinscheduler和Seatunnel的架构和原理的理解就会越来越深入了,逐步你也可以通过扩展源码来升级和拓展这些优秀开源框架的功能了。创作不易,假如我的文章对你有帮助接待点赞,收藏,送你一朵小红花~~~
原文链接:https://blog.csdn.net/qq_41865652/article/details/140971419
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

半亩花草

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表