最近作者针对实时数仓的Apache SeaTunnel同步链路,完成了双引擎架构升级与全链路参数深度调优,希望本文能够给大家有所启发,欢迎品评指正!
Apache SeaTunnel 版本 :2.3.9
Doris版本:2.0.6
MySQL JDBC Connector : 8.0.28
架构升级
- 批处置处罚链路:JDBC并行度进行提拔,基于ID分区实现分片读取,联合批量参数(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加
- 实时增量链路:引入MySQL-CDC组件,通过initial快照模式+chunk.size.rows=8096实现全量/增量平滑切换,变乱延长压降至500ms内
稳定性加强
- 资源管控:JDBC连接池动态扩容(max_size=20)+ CDC限流计谋(rows_per_second=1000),源库CPU峰值负载下降40%
- 容错机制:Doris两阶段提交(enable-2pc=true)共同检查点(checkpoint.interval=10s),故障规复时间收缩80%
写入优化
- 缓冲区三级联控(buffer-size=10000+buffer-count=3+flush.interval=5s)提拔Doris写入批次质量
- Tablet粒度控制(request_tablet_size=5)使BE节点负载均衡度提拔
实战演示
同步之前创建Doris表

- -- DROP TABLE IF EXISTS ods.ods_activity_info_full;
- CREATE TABLE ods.ods_activity_info_full
- (
- `id` VARCHAR(255) COMMENT '活动id',
- `k1` DATE NOT NULL COMMENT '分区字段',
- `activity_name` STRING COMMENT '活动名称',
- `activity_type` STRING COMMENT '活动类型',
- `activity_desc` STRING COMMENT '活动描述',
- `start_time` STRING COMMENT '开始时间',
- `end_time` STRING COMMENT '结束时间',
- `create_time` STRING COMMENT '创建时间'
- )
- ENGINE=OLAP -- 使用Doris的OLAP引擎,适用于高并发分析场景
- UNIQUE KEY(`id`,`k1`) -- 唯一键约束,保证(id, k1)组合的唯一性(Doris聚合模型特性)
- COMMENT '活动信息全量表'
- PARTITION BY RANGE(`k1`) () -- 按日期范围分区(具体分区规则由动态分区配置决定)
- DISTRIBUTED BY HASH(`id`) -- 按id哈希分桶,保证相同id的数据分布在同一节点
- PROPERTIES
- (
- "replication_allocation" = "tag.location.default: 1", -- 副本分配策略:默认标签分配1个副本
- "is_being_synced" = "false", -- 是否处于同步状态(通常保持false)
- "storage_format" = "V2", -- 存储格式版本(V2支持更高效压缩和索引)
- "light_schema_change" = "true", -- 启用轻量级schema变更(仅修改元数据,无需数据重写)
- "disable_auto_compaction" = "false", -- 启用自动压缩(合并小文件提升查询性能)
- "enable_single_replica_compaction" = "false", -- 禁用单副本压缩(多副本时保持数据一致性)
- "dynamic_partition.enable" = "true", -- 启用动态分区
- "dynamic_partition.time_unit" = "DAY", -- 按天创建分区
- "dynamic_partition.start" = "-60", -- 保留最近60天的历史分区
- "dynamic_partition.end" = "3", -- 预先创建未来3天的分区
- "dynamic_partition.prefix" = "p", -- 分区名前缀(如p20240101)
- "dynamic_partition.buckets" = "32", -- 每个分区的分桶数(影响并行度)
- "dynamic_partition.create_history_partition" = "true", -- 自动创建缺失的历史分区
- "bloom_filter_columns" = "id,activity_name", -- 为高频过滤字段(id/名称)创建布隆过滤器,加速WHERE查询
- "compaction_policy" = "time_series", -- 按时间序合并策略优化时序数据(适合活动时间字段)
- "enable_unique_key_merge_on_write" = "true", -- 唯一键写时合并(实时更新场景减少读放大)
- "in_memory" = "false" -- 关闭全内存存储(仅小表可开启)
- );
复制代码 设置SeaTunnel JDBC同步脚本
设置SeaTunnel MySQLCDC 同步脚本

- env {
- parallelism = 8
- job.mode = "BATCH"
- checkpoint.interval = 30000
- # 本地文件系统检查点
- execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
- execution.buffer-timeout = 5000
- # JVM 参数优化
- execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
- }
- source {
- Jdbc {
- result_table_name = "mysql_seatunnel"
- url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"
- driver = "com.mysql.cj.jdbc.Driver"
- connection_check_timeout_sec = 30
- user = "gmall"
- password = "gmall"
- # 使用分区并行读取
- query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"
- partition_column = "id"
- partition_num = 8
- # 连接池配置
- connection_pool {
- max_size = 20
- min_idle = 5
- max_idle_ms = 60000
- }
- # 批处理配置
- fetch_size = 10000
- batch_size = 5000
- is_exactly_once = true
- }
- }
- transform {
- Sql {
- source_table_name = "mysql_seatunnel"
- result_table_name = "seatunnel_doris"
- query = """
- select
- id,
- formatdatetime(create_time,'yyyy-MM-dd') as k1,
- activity_name,
- activity_type,
- activity_desc,
- start_time,
- end_time,
- create_time
- from mysql_seatunnel
- """
- }
- }
- sink {
- Doris {
- source_table_name = "seatunnel_doris"
- fenodes = "192.168.241.128:8030"
- username = "root"
- password = ""
- table.identifier = "ods.ods_activity_info_full"
- sink.enable-2pc = "true"
- sink.label-prefix = "test_json"
- # 优化Doris写入配置
- sink.properties {
- format = "json"
- read_json_by_line = "true"
- column_separator = "\t"
- line_delimiter = "\n"
- max_filter_ratio = "0.1"
- }
- # 批量写入配置
- sink.buffer-size = 10000
- sink.buffer-count = 3
- sink.flush.interval-ms = 5000
- sink.max-retries = 3
- sink.parallelism = 8
- doris.config = {
- format = "json"
- read_json_by_line = "true"
- request_connect_timeout_ms = "5000"
- request_timeout_ms = "30000"
- request_tablet_size = "5"
- }
- }
- }
复制代码 终极Apache Doris数据:

本文完!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |