Apache SeaTunnel脚本升级及参数调优实战

打印 上一主题 下一主题

主题 973|帖子 973|积分 2919

最近作者针对实时数仓的Apache SeaTunnel同步链路,完成了双引擎架构升级与全链路参数深度调优,希望本文能够给大家有所启发,欢迎品评指正!

   Apache SeaTunnel 版本 :2.3.9
  Doris版本:2.0.6
  MySQL JDBC Connector : 8.0.28
  架构升级



  • 批处置处罚链路:JDBC并行度进行提拔,基于ID分区实现分片读取,联合批量参数(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加
  • 实时增量链路:引入MySQL-CDC组件,通过initial快照模式+chunk.size.rows=8096实现全量/增量平滑切换,变乱延长压降至500ms内
稳定性加强



  • 资源管控:JDBC连接池动态扩容(max_size=20)+ CDC限流计谋(rows_per_second=1000),源库CPU峰值负载下降40%
  • 容错机制:Doris两阶段提交(enable-2pc=true)共同检查点(checkpoint.interval=10s),故障规复时间收缩80%
写入优化



  • 缓冲区三级联控(buffer-size=10000+buffer-count=3+flush.interval=5s)提拔Doris写入批次质量
  • Tablet粒度控制(request_tablet_size=5)使BE节点负载均衡度提拔
实战演示

同步之前创建Doris表


  1. -- DROP TABLE IF EXISTS ods.ods_activity_info_full;
  2. CREATE TABLE ods.ods_activity_info_full
  3. (
  4.     `id`            VARCHAR(255) COMMENT '活动id',
  5.     `k1`            DATE NOT NULL   COMMENT '分区字段',
  6.     `activity_name` STRING COMMENT '活动名称',
  7.     `activity_type` STRING COMMENT '活动类型',
  8.     `activity_desc` STRING COMMENT '活动描述',
  9.     `start_time`    STRING COMMENT '开始时间',
  10.     `end_time`      STRING COMMENT '结束时间',
  11.     `create_time`   STRING COMMENT '创建时间'
  12. )
  13.     ENGINE=OLAP  -- 使用Doris的OLAP引擎,适用于高并发分析场景
  14.     UNIQUE KEY(`id`,`k1`)  -- 唯一键约束,保证(id, k1)组合的唯一性(Doris聚合模型特性)
  15. COMMENT '活动信息全量表'
  16. PARTITION BY RANGE(`k1`) ()  -- 按日期范围分区(具体分区规则由动态分区配置决定)
  17. DISTRIBUTED BY HASH(`id`)  -- 按id哈希分桶,保证相同id的数据分布在同一节点
  18. PROPERTIES
  19. (
  20.     "replication_allocation" = "tag.location.default: 1",  -- 副本分配策略:默认标签分配1个副本
  21.     "is_being_synced" = "false",          -- 是否处于同步状态(通常保持false)
  22.     "storage_format" = "V2",             -- 存储格式版本(V2支持更高效压缩和索引)
  23.     "light_schema_change" = "true",      -- 启用轻量级schema变更(仅修改元数据,无需数据重写)
  24.     "disable_auto_compaction" = "false", -- 启用自动压缩(合并小文件提升查询性能)
  25.     "enable_single_replica_compaction" = "false", -- 禁用单副本压缩(多副本时保持数据一致性)
  26.     "dynamic_partition.enable" = "true",            -- 启用动态分区
  27.     "dynamic_partition.time_unit" = "DAY",          -- 按天创建分区
  28.     "dynamic_partition.start" = "-60",             -- 保留最近60天的历史分区
  29.     "dynamic_partition.end" = "3",                 -- 预先创建未来3天的分区
  30.     "dynamic_partition.prefix" = "p",              -- 分区名前缀(如p20240101)
  31.     "dynamic_partition.buckets" = "32",            -- 每个分区的分桶数(影响并行度)
  32.     "dynamic_partition.create_history_partition" = "true", -- 自动创建缺失的历史分区
  33.     "bloom_filter_columns" = "id,activity_name",  -- 为高频过滤字段(id/名称)创建布隆过滤器,加速WHERE查询
  34.     "compaction_policy" = "time_series",          -- 按时间序合并策略优化时序数据(适合活动时间字段)
  35.     "enable_unique_key_merge_on_write" = "true",  -- 唯一键写时合并(实时更新场景减少读放大)
  36.     "in_memory" = "false"                        -- 关闭全内存存储(仅小表可开启)
  37. );
复制代码
设置SeaTunnel JDBC同步脚本


  1. env {
  2.   # 环境配置
  3.   parallelism = 8                     # 增加并行度以提高吞吐量
  4.   job.mode = "STREAMING"              # 使用流式处理模式进行实时同步
  5.   checkpoint.interval = 10000         # 检查点间隔,单位毫秒
  6.   # 限流配置 - 避免对源数据库造成过大压力
  7.   read_limit.bytes_per_second = 10000000  # 每秒读取字节数限制,约10MB/s
  8.   read_limit.rows_per_second = 1000       # 每秒读取行数限制
  9.   # 本地检查点配置
  10.   execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
  11.   execution.checkpoint.max-concurrent = 1  # 最大并发检查点数
  12.   # 性能优化参数
  13.   execution.buffer-timeout = 5000          # 缓冲超时时间(毫秒)
  14.   execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
  15. }
  16. source {
  17.   MySQL-CDC {
  18.     # 基本连接配置
  19.     # server-id = 5652-5657             # MySQL复制客户端的唯一ID范围
  20.     username = "root"                # 数据库用户名
  21.     password = ""                # 数据库密码
  22.     table-names = ["gmall.activity_info"]  # 要同步的表
  23.     base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
  24.     # CDC 特有配置
  25.     schema-changes.enabled = true     # 启用架构变更捕获
  26.     server-time-zone = "Asia/Shanghai"  # 服务器时区
  27.     # 性能优化配置
  28.     snapshot.mode = "initial"         # 初始快照模式
  29.     snapshot.fetch.size = 10000       # 快照获取大小
  30.     chunk.size.rows = 8096            # 分块大小,用于并行快照
  31.     connection.pool.size = 10         # 连接池大小
  32.     # 高级配置
  33.     include.schema.changes = true     # 包含架构变更事件
  34.     scan.startup.mode = "initial"     # 启动模式:initial(全量+增量)
  35.     scan.incremental.snapshot.chunk.size = 8096  # 增量快照分块大小
  36.     debezium.min.row.count.to.stream.results = 1000  # 流式结果的最小行数
  37.     # 容错配置
  38.     connect.timeout = 30000           # 连接超时时间(毫秒)
  39.     connect.max-retries = 3           # 最大重试次数
  40.     # 输出表名
  41.     result_table_name = "mysql_cdc_source"
  42.   }
  43. }
  44. # 可选的转换逻辑,如果需要对数据进行处理
  45. transform {
  46.   Sql {
  47.     source_table_name = "mysql_cdc_source"
  48.     result_table_name = "doris_sink_data"
  49.     # 根据需要转换字段,这里添加了一个分区字段k1
  50.     query = """
  51.       select
  52.         id,
  53.         formatdatetime(create_time,'yyyy-MM-dd') as k1,
  54.         activity_name,
  55.         activity_type,
  56.         activity_desc,
  57.         start_time,
  58.         end_time,
  59.         create_time
  60.       from mysql_cdc_source
  61.     """
  62.   }
  63. }
  64. sink {
  65.   Doris {
  66.     # 基本连接配置
  67.     source_table_name = "doris_sink_data"  # 或直接使用 "mysql_cdc_source"
  68.     fenodes = "192.168.241.128:8030"
  69.     username = "root"
  70.     password = ""
  71.     table.identifier = "ods.ods_activity_info_full"  # Doris目标表
  72.     # 事务和标签配置
  73.     sink.enable-2pc = "true"          # 启用两阶段提交,确保一致性
  74.     sink.label-prefix = "cdc_sync"    # 导入标签前缀
  75.     # 写入模式配置
  76.     sink.properties {
  77.       format = "json"
  78.       read_json_by_line = "true"
  79.       column_separator = "\t"         # 列分隔符
  80.       line_delimiter = "\n"           # 行分隔符
  81.       max_filter_ratio = "0.1"        # 允许的最大错误率
  82.       # CDC特有配置 - 处理不同操作类型
  83.       # 使用Doris的UPSERT模式处理CDC事件
  84.       merge_type = "MERGE"            # 合并类型:APPEND或MERGE
  85.       delete_enable = "true"          # 启用删除操作
  86.     }
  87.     # 性能优化配置
  88.     sink.buffer-size = 10000          # 缓冲区大小
  89.     sink.buffer-count = 3             # 缓冲区数量
  90.     sink.flush.interval-ms = 5000     # 刷新间隔
  91.     sink.max-retries = 3              # 最大重试次数
  92.     sink.parallelism = 8              # 写入并行度
  93.     # Doris连接优化
  94.     doris.config = {
  95.       format = "json"
  96.       read_json_by_line = "true"
  97.       request_connect_timeout_ms = "5000"  # 连接超时
  98.       request_timeout_ms = "30000"         # 请求超时
  99.       request_tablet_size = "5"            # 每个请求的tablet数量
  100.     }
  101.   }
  102. }
复制代码
设置SeaTunnel MySQLCDC 同步脚本


  1. env {
  2.   parallelism = 8
  3.   job.mode = "BATCH"
  4.   checkpoint.interval = 30000
  5.   # 本地文件系统检查点
  6.   execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
  7.   execution.buffer-timeout = 5000
  8.   # JVM 参数优化
  9.   execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
  10. }
  11. source {
  12.   Jdbc {
  13.     result_table_name = "mysql_seatunnel"
  14.     url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"
  15.     driver = "com.mysql.cj.jdbc.Driver"
  16.     connection_check_timeout_sec = 30
  17.     user = "gmall"
  18.     password = "gmall"
  19.     # 使用分区并行读取
  20.     query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"
  21.     partition_column = "id"
  22.     partition_num = 8
  23.     # 连接池配置
  24.     connection_pool {
  25.       max_size = 20
  26.       min_idle = 5
  27.       max_idle_ms = 60000
  28.     }
  29.     # 批处理配置
  30.     fetch_size = 10000
  31.     batch_size = 5000
  32.     is_exactly_once = true
  33.   }
  34. }
  35. transform {
  36.   Sql {
  37.     source_table_name = "mysql_seatunnel"
  38.     result_table_name = "seatunnel_doris"
  39.     query = """
  40.       select
  41.         id,
  42.         formatdatetime(create_time,'yyyy-MM-dd') as k1,  
  43.         activity_name,
  44.         activity_type,
  45.         activity_desc,
  46.         start_time,
  47.         end_time,
  48.         create_time
  49.       from mysql_seatunnel
  50.     """
  51.   }
  52. }
  53. sink {
  54.   Doris {
  55.     source_table_name = "seatunnel_doris"
  56.     fenodes = "192.168.241.128:8030"
  57.     username = "root"
  58.     password = ""
  59.     table.identifier = "ods.ods_activity_info_full"
  60.     sink.enable-2pc = "true"
  61.     sink.label-prefix = "test_json"
  62.     # 优化Doris写入配置
  63.     sink.properties {
  64.       format = "json"
  65.       read_json_by_line = "true"
  66.       column_separator = "\t"
  67.       line_delimiter = "\n"
  68.       max_filter_ratio = "0.1"
  69.     }
  70.     # 批量写入配置
  71.     sink.buffer-size = 10000
  72.     sink.buffer-count = 3
  73.     sink.flush.interval-ms = 5000
  74.     sink.max-retries = 3
  75.     sink.parallelism = 8
  76.     doris.config = {
  77.       format = "json"
  78.       read_json_by_line = "true"
  79.       request_connect_timeout_ms = "5000"
  80.       request_timeout_ms = "30000"
  81.       request_tablet_size = "5"
  82.     }
  83.   }
  84. }
复制代码
终极Apache Doris数据:

本文完!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表