Flink-cdc更好的流式数据集成工具

一给  金牌会员 | 2024-10-31 07:23:19 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 856|帖子 856|积分 2568

What’s Flink-cdc?


Flink CDC 是基于Apache Flink的一种数据变更捕获技能,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技能答应实时地捕获数据库中的增、删、改操纵,将这些变更事件转化为流式数据,并可以或许对这些事件进行实时处理和分析。
Flink CDC提供了与各种数据源集成的功能,包罗常见的关系型数据库(如MySQL、PostgreSQL、Oracle等)以及NoSQL数据库(如MongoDB、HBase等)。它通过监控数据库的日记或轮询方式来捕获数据变更,并将变更事件作为数据流发送到Flink的使掷中进行处理。
Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:
✅ 端到端的数据集成框架
✅ 为数据集成的用户提供了易于构建作业的 API
✅ 支持在 Source 和 Sink 中处理多个表
✅ 整库同步
✅具备表结构变更自动同步的能力(Schema Evolution)
在使用者的角度,就是Flink-cdc可以简化流处理的流程:


  • 引入Flink-cdc之前流处理流程

  • 引入Flink-cdc之后后流处理流程

如上所示,在flink-cdc被引入后大大简化了流处理流程
Flink-cdc支持的链接及对应的版本

Pipeline Connectors

(截止2024-08-12)
Source Connectors

(截止2024-08-12)
Flink-cdc与Flink对应对影版本的关系


(截止2024-08-12)
flink-connector-mysql-cdc 实例分析

示例代码

demo代码:
  1. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  2. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  3. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
  6. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. public class MySqlSourceDemo {
  9.     public static void main(String[] args) throws Exception {
  10.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  11.                 .hostname("mysql-server-host")
  12.                 .port(3306)
  13.                 .databaseList("mydb") // 设置捕获的数据库
  14.                 .tableList("mydb.products") // 设置捕获的表,如果需要同步整个数据库,请将 tableList 设置为 ".*".
  15. //                .tableList(".*") // 捕获整个数据库的表
  16. //                .tableList("^(?!mysql|information_schema|performance_schema).*") // 设置捕获的表,排除系统库
  17. //                .tableList("mydb.(?!products|orders).*") // 同步排除products和orders表之外的整个my_db库
  18.                 .username("flink-cdc")
  19.                 .password("xxx")
  20.                 .serverId("5400-5405")
  21.                 .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
  22.                 .serverTimeZone("Asia/Shanghai") // 设置时区
  23.                 .startupOptions(StartupOptions.initial())
  24.                 .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
  25. //                .includeSchemaChanges(true) // 包括 schema 变更
  26.                 .build();
  27.         org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();
  28.         config.setString("rest.port", "8081");
  29. //        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); //本地环境,调试用
  30.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  31.         // 设置 3s 的 checkpoint 间隔
  32.         env.enableCheckpointing(3000);
  33.         env.setStateBackend(new HashMapStateBackend());
  34.         env.getCheckpointConfig().setCheckpointStorage("file:///tmp/ck");//本地文件系统
  35. //        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本开始支持
  36.         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  37.         env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  38.                 // 设置 source 节点的并行度为 4
  39.                 .setParallelism(5)
  40.                 .print()
  41.                 .setParallelism(1); // 设置 sink 节点并行度为 1
  42.         env.execute("Print MySQL Snapshot + Binlog");
  43.     }
  44. }
复制代码
maven依靠:
  1. <properties>
  2.         <maven.compiler.source>8</maven.compiler.source>
  3.         <maven.compiler.target>8</maven.compiler.target>
  4.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  5.         <flink.version>1.14.5</flink.version>
  6.         <scala.binary.version>2.12</scala.binary.version>
  7.     </properties>
  8.     <dependencies>
  9.         <dependency>
  10.             <groupId>junit</groupId>
  11.             <artifactId>junit</artifactId>
  12.             <scope>test</scope>
  13.         </dependency>
  14.         <!-- 将 Apache Flink 的 Web 运行时模块添加到项目中 -->
  15.         <dependency>
  16.             <groupId>org.apache.flink</groupId>
  17.             <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
  18.             <version>${flink.version}</version>
  19.             <scope>provided</scope>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>org.apache.flink</groupId>
  23.             <artifactId>flink-clients_${scala.binary.version}</artifactId>
  24.             <version>${flink.version}</version>
  25.             <scope>provided</scope> <!--provided生命周期在test模式才可以运行,在main模式会找不到包-->
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>com.ververica</groupId>
  29.             <artifactId>flink-connector-mysql-cdc</artifactId>
  30.             <version>2.3.0</version>
  31.             <scope>compile</scope>
  32.         </dependency>
  33.         <dependency>
  34.             <groupId>org.apache.flink</groupId>
  35.             <artifactId>flink-table-common</artifactId>
  36.             <version>${flink.version}</version>
  37.             <scope>compile</scope>
  38.         </dependency>
  39.         <dependency>
  40.             <groupId>org.slf4j</groupId>
  41.             <artifactId>slf4j-log4j12</artifactId>
  42.             <version>1.7.25</version>
  43.             <scope>provided</scope>
  44.         </dependency>
  45.     </dependencies>
复制代码
日记设置文件:
log4j.properties
  1. log4j.rootCategory=error,stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.target=System.out
  4. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  5. log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n
复制代码
启动standalone Flink级群

  1. # jobmanager
  2. docker run -d \
  3. --name flink-jm \
  4. --hostname flink-jm \
  5. -p 8082:8081 \
  6. --env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
  7. --network flink-network-standalone \
  8. ponylee/flink:1.15.0-java8  \
  9. jobmanager
  10. # taskmanager
  11. docker run -d \
  12. --name flink-tm \
  13. --hostname flink-tm \
  14. --env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
  15. --network flink-network-standalone \
  16. ponylee/flink:1.15.0-java8 \
  17. taskmanager \
  18. -Dtaskmanager.memory.process.size=1024m \
  19. -Dtaskmanager.numberOfTaskSlots=5 \
  20. -Drest.flamegraph.enabled=true
复制代码
分析阐明

为每个 Reader 设置差别的 Server id

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。 MySQL 服务器将使用此 id 来维护网络毗连和 binlog 位置。 因此,假如差别的作业共享雷同的 Server id, 则可能导致从错误的 binlog 位置读取数据。 因此,建议通过为每个 Reader 设置差别的 Server id , 假设 Source 并行度为 4,server id 设置必须:serverId(“5400-5405”),5405-5400=5 >= 4。来为 4 个 Source readers 中的每一个分配唯一的 Server id。
查看mysql链接发现
select * from information_schema.processlist where user = ‘flink-cdc’;
Flink-cdc对mysql的影响
正常情况下,Flink-cdc是No-lock Read,主库可以继续处理事务和查询,而不会导致主库历程阻塞,不会对主库产生直接影响。但是,在某些情况下数据同步的过程中可能会对主库产生一些间接影响,好比:网络、IO、CPU负载以及mysql的并发毗连数等资源斲丧。但这些对主库的开销影响相对较小(全量同步阶段可能比较耗能,但时间相对比较短)。

从上图mysql资源使用情况来看,flink-cdc对内存和CPU负载影响微乎极微,是No-lock Read,重要体现对网络和IO的资源斲丧。
断点续传

通过从checkpoint/savepoint 恢复,flink-cdc可以保证断点续传。


  • 从checkpoint/savepoint恢复,缩小同步范围,例如:从tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 缩小到 tableList(“mydb.products”),应用更新见效。
  • 应用从checkpoint/savepoint恢复,扩大同步范围的部分不会见效,例如:从tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”),应用更新不见效见效。若想使动态加表见效,可以显示订定scanNewlyAddedTableEnabled(true) ,来启用扫描新添加的表功能。如没有特殊情况,建议在开发环境开启此设置。
flink-cdc包名变更

Flink CDC 项目 从 2.0.0 版本将 group id 从com.alibaba.ververica 改成 com.ververica, 自 3.1 版本从将 group id 从 com.ververica 改成 org.apache.flink。 这是为了让项目更加社区中立,让各个公司的开发者共建时更方便。所以在maven仓库找 2.x 的包时,路径是 /com/ververica;找3.1及以上版本的包时,路径是/org/apache/flink
参考:
flink-cdc
flink-cdc docs

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表