目录
欢迎来到Flink CDC
焦点概念
数据管道(Data Pipeline)
数据源(Data Source)
数据接收器(Data Sink)
表ID(Table ID)
转换(Transform)
路由(Route)
毗连器(connectors)
管道毗连器(pipeline connectors)
支持毗连器
开发本身的毗连器
Flink源
Flink CDC 源
支持的毗连器
支持的Flink版本
特征
MySQL同步到MySQL
DataStream方式实现
必要的依靠pom.xml
预备工作
代码
测试
编辑
SQL方式实现
必要的依靠pom.xml
代码
测试
本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。
欢迎来到Flink CDC
Flink CDC是一个流数据集成工具,旨在为用户提供更强盛的API。它允许用户通过YAML优雅地形貌他们的ETL管道逻辑,并帮助用户主动天生自界说Flink算子和提交作业。Flink CDC优先优化使命提交过程,并提供加强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。
与Apache Flink深度集成并由其提供支持,Flink CDC提供:
✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化本领
焦点概念
数据管道(Data Pipeline)
由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向卑鄙,因此整个ETL使命被称为数据管道(Data Pipeline)。
数据源(Data Source)
数据源用于访问元数据(metadata)并从外部系统读取变动的数据(the changed data)。一个数据源可以同时从多个表中读取数据。
注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身界说的数据源,Flink用这个数据源来从外部系统读取变动的数据。
数据接收器(Data Sink)
数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。
表ID(Table ID)
在与外部系统毗连时,必要与外部系统的存储对象建立映射关系。必要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。毗连器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:
数据系统 | 表ID组成 | 例子 | Oracle/PostgreSQL | database, schema, table | mydb.default.orders | MySQL/Doris/StarRocks | database, table | mydb.orders | Kafka | topic | orders | 转换(Transform)
Transform模块帮助用户根据表中的数据列来删除和扩展数据列。别的,它还可以帮助用户在同步过程中过滤一些不必要的数据。
路由(Route)
路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。
毗连器(connectors)
这里connector分了两个章节,必要阐明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于汗青缘故原由,先是source,sink,后面使用connector对source和sink做了同一。
管道毗连器(pipeline connectors)
Flink CDC提供了几个源和接收器毗连器来与外部系统举行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道界说中指定毗连器,您可以使用开箱即用的毗连器。
支持毗连器
毗连器支持的毗连器类型外部系统Apache DorisSink
- Apache Doris: 1.2.x, 2.x.x
KafkaSink
MySQLSource
- MySQL: 5.6, 5.7, 8.0.x
- RDS MySQL: 5.6, 5.7, 8.0.x
- PolarDB MySQL: 5.6, 5.7, 8.0.x
- Aurora MySQL: 5.6, 5.7, 8.0.x
- MariaDB: 10.x
- PolarDB X: 2.0.1
PaimonSink
StarRocksSink
开发本身的毗连器
如果提供的毗连器不能满意您的要求,您可以开发本身的毗连器,以使您的外部系统参与Flink CDC管道。检察Flink CDC api,相识怎样开发本身的毗连器。
Flink源
Flink CDC 源
Flink CDC源是Apache Flink的一组源毗连器(source connectors),使用变动数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。以是它可以充实利用Debezium的本领。相识更多关于什么是Debezium。
debezium
支持的毗连器
毗连器数据库驱动mongodb-cdc
- MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1
MongoDB Driver: 4.9.1mysql-cdc
- MySQL: 5.6, 5.7, 8.0.x
- RDS MySQL: 5.6, 5.7, 8.0.x
- PolarDB MySQL: 5.6, 5.7, 8.0.x
- Aurora MySQL: 5.6, 5.7, 8.0.x
- MariaDB: 10.x
- PolarDB X: 2.0.1
JDBC Driver: 8.0.28oceanbase-cdc
- OceanBase CE: 3.1.x, 4.x
- OceanBase EE: 2.x, 3.x, 4.x
OceanBase Driver: 2.4.xoracle-cdc
Oracle Driver: 19.3.0.0postgres-cdc
- PostgreSQL: 9.6, 10, 11, 12, 13, 14
JDBC Driver: 42.5.1sqlserver-cdc
- Sqlserver: 2012, 2014, 2016, 2017, 2019
JDBC Driver: 9.4.1.jre8tidb-cdc
- TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0
JDBC Driver: 8.0.27db2-cdc
Db2 Driver: 11.5.0.0vitess-cdc
MySql JDBC Driver: 8.0.26 支持的Flink版本
Flink CDC 版本Flink版本1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*2.1.*1.13.*2.2.*1.13.*, 1.14.*2.3.*1.13.*, 1.14.*, 1.15.*, 1.16.*2.4.*1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*3.0.*1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* 特征
1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC毗连器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC毗连器,用户可以使用SQL DDL创建CDC源来监督单个表上的更改。
下表表现了毗连器(connector)的当前特性:
毗连器无锁读并行读仅一次读增量快照读mongodb-cdc✅✅✅✅mysql-cdc✅✅✅✅oracle-cdc✅✅✅✅postgres-cdc✅✅✅✅sqlserver-cdc✅✅✅✅oceanbase-cdc❌❌❌❌tidb-cdc✅❌✅❌db2-cdc✅✅✅✅vitess-cdc✅❌✅❌ MySQL同步到MySQL
DataStream方式实现
必要的依靠pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.leboop.www</groupId>
- <artifactId>flink</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <flink.version>1.17.1</flink.version>
- <scala.binary.version>2.12</scala.binary.version>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- </properties>
- <dependencies>
- <!-- flink客户端 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Table API for Java -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- flink cdc for mysql -->
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.4.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>1.18.0</version>
- </dependency>
- <!-- json解析 -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.78</version> <!-- 请使用最新的版本号 -->
- </dependency>
- </dependencies>
- </project>
复制代码 如果缺少依靠,可能报错如下:
- Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
- at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
- Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
- at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
- at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
- ... 1 more
复制代码 添加如下依靠即可:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>1.18.0</version>
- </dependency>
复制代码 如果报错如下:
- Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
- at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
- at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)
- at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)
- at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)
- at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
- at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
- at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)
复制代码 添加如下依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
复制代码 预备工作
本文仅仅为了演示,在windows当地安装了8.0.30版本的MySQL。如图:
预备两个数据库,分别作为本次案例的source和sink,如图:
建表语句分别如下:
- CREATE TABLE `human` (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `name` varchar(100) DEFAULT NULL,
- `age` int DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- CREATE TABLE `human_sink` (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `name` varchar(100) DEFAULT NULL,
- `age` int DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码 MySQL CDC必要开启biglog日志,执行如下SQL检察biglog日志是否开启
- SHOW VARIABLES LIKE 'log_bin';
复制代码 如图:
Value为ON,表示开启。
代码
- package com.leboop.cdc;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- /**
- * Description TODO.
- * Date 2024/7/28 15:48
- *
- * @author leb
- * @version 2.0
- */
- public class MysqlCDCDemo {
- public static void main(String[] args) throws Exception {
- // flink source,source类型为mysql
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("localhost")
- .port(80)
- .databaseList("cdc_demo")
- .tableList("cdc_demo.human")
- .username("root")
- .password("root")
- .deserializer(new JsonDebeziumDeserializationSchema())
- .serverId("1")
- .build();
- // 初始化环境.
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // enable checkpoint
- env.enableCheckpointing(3000);
- DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
- // set 1 parallel source tasks
- .setParallelism(1);
- // 将数据打印到客户端.
- stringDataStreamSource
- .print().setParallelism(1); // use parallelism 1 for sink
- // 数据同步到mysql
- stringDataStreamSource.addSink(new RichSinkFunction<String>() {
- private Connection connection = null;
- private PreparedStatement preparedStatement = null;
- @Override
- public void open(Configuration parameters) throws Exception {
- if (connection == null) {
- Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
- connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接
- connection.setAutoCommit(false);//关闭自动提交
- }
- }
- @Override
- public void invoke(String value, Context context) throws Exception {
- JSONObject jsonObject = JSON.parseObject(value);
- String op = jsonObject.getString("op");
- if ("r".equals(op)) { // 首次全量
- System.out.println("执行清表操作");
- connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据
- JSONObject after = jsonObject.getJSONObject("after");
- Integer id = after.getInteger("id");
- String name = after.getString("name");
- Integer age = after.getInteger("age");
- preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
- preparedStatement.setInt(1, id);
- preparedStatement.setString(2, name);
- preparedStatement.setInt(3, age);
- preparedStatement.execute();
- connection.commit();//预处理完成后统一提交
- }else if("c".equals(op)) { // 新增.
- JSONObject after = jsonObject.getJSONObject("after");
- Integer id = after.getInteger("id");
- String name = after.getString("name");
- Integer age = after.getInteger("age");
- preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
- preparedStatement.setInt(1, id);
- preparedStatement.setString(2, name);
- preparedStatement.setInt(3, age);
- preparedStatement.execute();
- connection.commit();//预处理完成后统一提交
- }
- else if ("d".equals(op)) { // 删除
- JSONObject after = jsonObject.getJSONObject("before");
- Integer id = after.getInteger("id");
- preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");
- preparedStatement.setInt(1, id);
- preparedStatement.execute();
- connection.commit();//预处理完成后统一提交
- } else if ("u".equals(op)) { // 更新
- JSONObject after = jsonObject.getJSONObject("after");
- Integer id = after.getInteger("id");
- String name = after.getString("name");
- Integer age = after.getInteger("age");
- preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");
- preparedStatement.setString(1, name);
- preparedStatement.setInt(2, age);
- preparedStatement.setInt(3, id);
- preparedStatement.execute();
- connection.commit();//预处理完成后统一提交
- } else {
- System.out.println("不支持的操作op=" + op);
- }
- }
- @Override
- public void close() throws Exception {
- System.out.println("执行close方法");
- if (preparedStatement != null) {
- preparedStatement.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- });
- env.execute("Print MySQL Snapshot + Binlog");
- }
- }
复制代码 (1)Flink源
如下代码毗连了当地MySQL数据库cdc_demo。
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("localhost")
- .port(80)
- .databaseList("cdc_demo")
- .tableList("cdc_demo.human")
- .username("root")
- .password("root")
- .deserializer(new JsonDebeziumDeserializationSchema())
- .serverId("1")
- .build();
复制代码 new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。
(2)server id
每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络毗连和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,发起为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。
(3)从MySQL源读取数据
- DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
- // set 1 parallel source tasks
- .setParallelism(1);
复制代码 代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面必要4个server id,例如"1-4"。
(3)将读取的MySQL数据打印到控制台
- // 将数据打印到客户端.
- stringDataStreamSource
- .print().setParallelism(1); // use parallelism 1 for sink
复制代码 这里仅仅为了检察Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:
- {"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
- {"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
- {"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}
复制代码 第一条Json数据格式化后如下:
- {
- "before": null,
- "after": {
- "id": 7,
- "name": "lisi",
- "age": 23
- },
- "source": {
- "version": "1.9.7.Final",
- "connector": "mysql",
- "name": "mysql_binlog_source",
- "ts_ms": 0,
- "snapshot": "false",
- "db": "cdc_demo",
- "sequence": null,
- "table": "human",
- "server_id": 0,
- "gtid": null,
- "file": "",
- "pos": 0,
- "row": 0,
- "thread": null,
- "query": null
- },
- "op": "r",
- "ts_ms": 1722218198388,
- "transaction": null
- }
复制代码 其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:
- "op": "d" 代表删除操作
- "op": "u" 代表更新操作
- "op": "c" 代表新增操作
- "op": "r" 代表全量读取,而不是来自 binlog 的增量读取
例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。
(4)sink
这里使用匿名内部类RichSinkFunction实现了MySQL sink。
测试
先向human表中插入2条数据,SQL如下:
- insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
- insert into cdc_demo.human(id,name,age) values(2,"lisi",23);
复制代码 然后启动程序,输出日志如下:
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
- log4j:WARN Please initialize the log4j system properly.
- log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
- SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
- SLF4J: Defaulting to no-operation MDCAdapter implementation.
- SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
- {"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
- {"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
- 七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
- 信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
- 执行清表操作
- 执行清表操作
复制代码 检察human_sink表,可以看到human表中的两条数据已经被同步:

接着执行如下更新、删除、新增SQL:
- update cdc_demo.human set age = 10 where id = 1;
- delete from cdc_demo.human where id = 2;
- insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);
复制代码 输出日志如下:
- {"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
- {"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
- {"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}
复制代码 如图:

最终看到两张表数据保持划一,如图:
SQL方式实现
必要的依靠pom.xml
在DataStream方式上,还必要添加如下依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>3.0.0-1.16</version>
- </dependency>
复制代码 如果报错如下:
- Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
- at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)
- at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
- at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)
复制代码 添加如下依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
复制代码 如果报错如下:
- Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.
- Table options are:
- 'connector'='jdbc'
- 'driver'='com.mysql.cj.jdbc.Driver'
- 'password'='******'
- 'table-name'='human_sink'
- 'url'='jdbc:mysql://localhost:80/cdc_sink'
- 'username'='root'
- at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
- at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
- at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
- at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
- at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
- at scala.collection.Iterator.foreach(Iterator.scala:929)
- at scala.collection.Iterator.foreach$(Iterator.scala:929)
- at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)
- at scala.collection.IterableLike.foreach(IterableLike.scala:71)
- at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
- at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
- at scala.collection.TraversableLike.map(TraversableLike.scala:234)
- at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
- at scala.collection.AbstractTraversable.map(Traversable.scala:104)
- at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
- at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
- at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
- Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
- at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
- at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
- at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
- ... 19 more
- Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
- Available factory identifiers are:
- blackhole
- datagen
- mysql-cdc
- print
- at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
- at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)
- ... 21 more
复制代码 请添加如下依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>3.0.0-1.16</version>
- </dependency>
复制代码 代码
- package com.leboop.cdc;
- import org.apache.flink.table.api.EnvironmentSettings;
- import org.apache.flink.table.api.TableEnvironment;
- import org.apache.flink.table.api.TableResult;
- /**
- * Description TODO.
- * Date 2024/7/28 15:48
- *
- * @author leb
- * @version 2.0
- */
- public class MysqlCDCSqlDemo {
- public static void main(String[] args) throws Exception {
- EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
- TableEnvironment tableEnv = TableEnvironment.create(settings);
- tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);
- // source
- TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
- "id BIGINT ,\n" +
- "name STRING ,\n" +
- "age INT ,\n" +
- "PRIMARY KEY (id) NOT ENFORCED \n" +
- ") WITH (\n" +
- " 'connector' = 'mysql-cdc',\n" +
- " 'hostname' = 'localhost',\n" +
- " 'port' = '80',\n" +
- " 'username' = 'root',\n" +
- " 'password' = 'root',\n" +
- " 'database-name' = 'cdc_demo',\n" +
- " 'table-name' = 'human') ");
- // 输出source表
- createSourceTable.print();
- System.out.println("创建源表结束");
- // sink
- TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
- "id BIGINT ," +
- "name STRING ," +
- "age INT ," +
- "PRIMARY KEY(id) NOT ENFORCED " +
- ") WITH (" +
- " 'connector' = 'jdbc'," +
- " 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
- " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
- " 'username' = 'root'," +
- " 'password' = 'root'," +
- " 'table-name' = 'human_sink' )");
- createSinkTable.print();
- System.out.println("创建sink表结束");
- // 插入
- tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
- System.out.println("插入sink表结束");
- }
- }
复制代码 (1)创建源表
如下代码创建了Flink中的源表,为什么说是Flink中呢?缘故原由是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:
- // source
- TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
- "id BIGINT ,\n" +
- "name STRING ,\n" +
- "age INT ,\n" +
- "PRIMARY KEY (id) NOT ENFORCED \n" +
- ") WITH (\n" +
- " 'connector' = 'mysql-cdc',\n" +
- " 'hostname' = 'localhost',\n" +
- " 'port' = '80',\n" +
- " 'username' = 'root',\n" +
- " 'password' = 'root',\n" +
- " 'database-name' = 'cdc_demo',\n" +
- " 'table-name' = 'human') ");
复制代码 注意这里connector必须是mysql-cdc。
(2)创建目标表
代码如下:
- // sink
- TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
- "id BIGINT ," +
- "name STRING ," +
- "age INT ," +
- "PRIMARY KEY(id) NOT ENFORCED " +
- ") WITH (" +
- " 'connector' = 'jdbc'," +
- " 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
- " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
- " 'username' = 'root'," +
- " 'password' = 'root'," +
- " 'table-name' = 'human_sink' )");
复制代码 这里connector的值必须是jdbc,即通过jdbc毗连器实现。
(3)同步数据
通过如下SQL即可以实现数据同步:
- tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
复制代码 测试
与DataStream测试过程相同。
值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |