IT评测·应用市场-qidao123.com技术社区

标题: 简化数据流:Apache SeaTunnel实现多表同步的高效指南 [打印本页]

作者: 我可以不吃啊    时间: 2024-7-23 14:50
标题: 简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简朴举例说明如何实现这些功能。
单表 to 单表

一个source,一个sink。
从mysql同步到mysql,中心不做区分
  1. env {
  2.   # You can set flink configuration here
  3.   execution.parallelism = 2
  4.   job.mode = "BATCH"
  5. }
  6. source{
  7.     Jdbc {
  8.         url = "jdbc:mysql://127.0.0.1:3306/test"
  9.         driver = "com.mysql.cj.jdbc.Driver"
  10.         connection_check_timeout_sec = 100
  11.         user = "user"
  12.         password = "password"
  13.         query = "select * from base_region"
  14.     }
  15. }
  16. transform {
  17.     # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  18.     # please go to https://seatunnel.apache.org/docs/transform/sql
  19. }
  20. sink {
  21.   jdbc {
  22.         url = "jdbc:mysql://127.0.0.1:3306/dw"
  23.         driver = "com.mysql.cj.jdbc.Driver"
  24.         connection_check_timeout_sec = 100
  25.         user = "user"
  26.         password = "password"
  27.     query = "insert into base_region(id,region_name) values(?,?)"
  28.   }
  29. }
复制代码
执行任务
  1. ./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf
复制代码
单表 to 多表

一个source,多个sink。
从MySQL同步到MySQL,将一个用户表数据同步过去,中心通过2个sql组件分布将男性用户和女性用户分开,在sink阶段分别插入到差别的表:
  1. env {
  2.   execution.parallelism = 2
  3.   job.mode = "BATCH"
  4. }
  5. source {
  6.     Jdbc {
  7.         url = "jdbc:mysql://127.0.0.1:3306/test"
  8.         driver = "com.mysql.cj.jdbc.Driver"
  9.         connection_check_timeout_sec = 100
  10.         user = "user"
  11.         password = "password"
  12.         result_table_name="t_user"
  13.         query = "select * from t_user;"
  14.     }
  15. }
  16. transform {
  17.   Sql {
  18.     source_table_name = "t_user"
  19.     result_table_name = "t_user_nan"
  20.     query = "select id,name,birth,gender from t_user where gender ='男';"
  21.   }
  22.   Sql {
  23.     source_table_name = "t_user"
  24.     result_table_name = "t_user_nv"
  25.     query = "select id,name,birth,gender from t_user where gender ='女';"
  26.   }
  27. }
  28. sink {
  29.   jdbc {
  30.     url = "jdbc:mysql://127.0.0.1:3306/dw"
  31.     driver = "com.mysql.cj.jdbc.Driver"
  32.     connection_check_timeout_sec = 100
  33.     user = "user"
  34.     password = "password"
  35.     source_table_name = "t_user_nan"
  36.     query =  "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"
  37.   }
  38.   jdbc {
  39.     url = "jdbc:mysql://127.0.0.1:3306/dw"
  40.     driver = "com.mysql.cj.jdbc.Driver"
  41.     connection_check_timeout_sec = 100
  42.     user = "user"
  43.     password = "password"
  44.     source_table_name = "t_user_nv"
  45.     query =  "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"
  46.   }
  47. }
  48. ./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf
复制代码
多表 to 单表

多个source,一个sink。
如果有一张交换器使用情况表,一张路由器使用情况表,目标表是将这种数据合在一起的olap表。
表结构如下:
  1. -- dw 源表1
  2. CREATE TABLE IF NOT EXISTS ads_device_switch_performance (
  3.   `event_time` timestamp COMMENT '业务时间',
  4.   `device_id` VARCHAR(32) COMMENT '设备id',
  5.   `device_type` VARCHAR(32) COMMENT '设备类型',
  6.   `device_name` VARCHAR(128) COMMENT '设备名称',
  7.   `cpu_usage` INT COMMENT 'CPU使用率百分比'
  8. ) ;
  9. INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
  10. INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);
  11. -- dw 源表2
  12. CREATE TABLE IF NOT EXISTS ads_device_router_performance (
  13.   `event_time` timestamp COMMENT '业务时间',
  14.   `device_id` VARCHAR(32) COMMENT '设备id',
  15.   `device_type` VARCHAR(32) COMMENT '设备类型',
  16.   `device_name` VARCHAR(128) COMMENT '设备名称',
  17.   `cpu_usage` INT COMMENT 'CPU使用率百分比'
  18. );
  19. INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
  20. INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);
  21. -------------------------------------------------------------------------------
  22. -- olap 目标表
  23. CREATE TABLE `device_performance` (
  24.   `id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',
  25.   `event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',
  26.   `device_id` VARCHAR(32) COMMENT '设备id',
  27.   `device_type` VARCHAR(32) COMMENT '设备类型',
  28.   `device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',
  29.   `cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',
  30.   `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  31.   `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  32.   PRIMARY KEY (`id`)
  33. ) COMMENT='设备状态';
复制代码
将交换器数据和路由器数据一起同步到olap目标表,总结通过sql组件处理:
  1. env {
  2.     job.mode="BATCH"
  3.     job.name="device_performance"
  4. }
  5. source {
  6.     Jdbc {
  7.         url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
  8.         driver="com.mysql.cj.jdbc.Driver"
  9.         user = "user"
  10.         password = "password"
  11.         result_table_name="switch_src"
  12.         query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
  13.     }
  14.     Jdbc {
  15.         url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
  16.         driver="com.mysql.cj.jdbc.Driver"
  17.         user = "user"
  18.         password = "password"
  19.         result_table_name="router_src"
  20.         query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
  21.     }
  22. }
  23. transform {
  24.   Sql {
  25.     source_table_name = "switch_src"
  26.     result_table_name = "switch_dst"
  27.     query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  28.   }
  29.   Sql {
  30.     source_table_name = "router_src"
  31.     result_table_name = "router_dst"
  32.     query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  33.   }
  34. }
  35. sink {
  36.     Jdbc {
  37.         url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
  38.         driver="com.mysql.cj.jdbc.Driver"
  39.         user = "user"
  40.         password = "password"
  41.         source_table_name = "switch_dst"
  42.         query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
  43.       }
  44.     Jdbc {
  45.         url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
  46.         driver="com.mysql.cj.jdbc.Driver"
  47.         user = "user"
  48.         password = "password"
  49.         source_table_name = "router_dst"
  50.         query="INSERT INTO device_performance  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
  51.        }
  52. }
复制代码
执行任务:
  1. ./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf
复制代码
作业成功!
多表 to 多表

多个source,多个sink。
将交换器使用情况数据和路由器使用情况数据分别同步到对应的目标表,中心sql组件处理
  1. env {
  2.     job.mode="BATCH"
  3.     job.name="device_performance"
  4. }
  5. source {
  6.     Jdbc {
  7.         url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
  8.         driver="com.mysql.cj.jdbc.Driver"
  9.         user = "user"
  10.         password = "password"
  11.         result_table_name="switch_src"
  12.         query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
  13.     }
  14.     Jdbc {
  15.         url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
  16.         driver="com.mysql.cj.jdbc.Driver"
  17.         user = "user"
  18.         password = "password"
  19.         result_table_name="router_src"
  20.         query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
  21.     }
  22. }
  23. transform {
  24.   Sql {
  25.     source_table_name = "switch_src"
  26.     result_table_name = "switch_dst"
  27.     query = "SELECT  event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time  FROM switch_src;"
  28.   }
  29.   Sql {
  30.     source_table_name = "router_src"
  31.     result_table_name = "router_dst"
  32.     query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
  33.   }
  34. }
  35. sink {
  36.     Jdbc {
  37.         url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
  38.         driver="com.mysql.cj.jdbc.Driver"
  39.         user = "user"
  40.         password = "password"
  41.         source_table_name = "switch_dst"
  42.         query="INSERT INTO device_performance_switch  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
  43.       }
  44.     Jdbc {
  45.         url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
  46.         driver="com.mysql.cj.jdbc.Driver"
  47.         user = "user"
  48.         password = "password"
  49.         source_table_name = "router_dst"
  50.         query="INSERT INTO device_performance_router  VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
  51.        }
  52. }
复制代码
结语

综上所述,Apache SeaTunnel多表同步技能具有高效、实时、可靠和灵活的特点,在企业的数据同步领域发挥偏重要作用。借助Apache SeaTunnel多表同步功能,企业可以或许更好地实现差别体系和数据库之间数据的无缝流转,提升数据管理和使用的效率,为业务发展提供有力支持。渴望本文可以或许帮助读者更好地了解和应用Apache SeaTunnel多表同步,从而为企业数据同步带来更多大概性。
原文链接:https://blog.csdn.net/weixin_44586883/article/details/136049897
本文由 白鲸开源 提供发布支持!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4