ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink CDC基本概念以及MySQL同步到MySQL [打印本页]

作者: 刘俊凯    时间: 2024-9-20 13:17
标题: Flink CDC基本概念以及MySQL同步到MySQL
目录
欢迎来到Flink CDC
焦点概念
数据管道(Data Pipeline)
数据源(Data Source)
数据接收器(Data Sink)
表ID(Table ID)
转换(Transform)
路由(Route)
毗连器(connectors)
管道毗连器(pipeline connectors)
支持毗连器
开发本身的毗连器
Flink源
Flink CDC 源
支持的毗连器
 支持的Flink版本
特征
MySQL同步到MySQL
DataStream方式实现
必要的依靠pom.xml
预备工作
代码 
测试
​编辑
 SQL方式实现
必要的依靠pom.xml
代码
测试

        本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。
欢迎来到Flink CDC

        Flink CDC是一个流数据集成工具,旨在为用户提供更强盛的API。它允许用户通过YAML优雅地形貌他们的ETL管道逻辑,并帮助用户主动天生自界说Flink算子和提交作业。Flink CDC优先优化使命提交过程,并提供加强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。
        与Apache Flink深度集成并由其提供支持,Flink CDC提供:
✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化本领

焦点概念

数据管道(Data Pipeline

        由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向卑鄙,因此整个ETL使命被称为数据管道(Data Pipeline)。
数据源(Data Source

        数据源用于访问元数据(metadata)并从外部系统读取变动的数据(the changed data)。一个数据源可以同时从多个表中读取数据。
        注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身界说的数据源,Flink用这个数据源来从外部系统读取变动的数据。
数据接收器(Data Sink

        数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。
表ID(Table ID)

        在与外部系统毗连时,必要与外部系统的存储对象建立映射关系。必要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。毗连器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:
数据系统表ID组成例子
Oracle/PostgreSQLdatabase, schema, tablemydb.default.orders
MySQL/Doris/StarRocksdatabase, tablemydb.orders
Kafkatopicorders
转换(Transform

        Transform模块帮助用户根据表中的数据列来删除和扩展数据列。别的,它还可以帮助用户在同步过程中过滤一些不必要的数据。
路由(Route

        路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。

毗连器(connectors)

        这里connector分了两个章节,必要阐明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于汗青缘故原由,先是source,sink,后面使用connector对source和sink做了同一。
管道毗连器(pipeline connectors)

        Flink CDC提供了几个源和接收器毗连器来与外部系统举行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道界说中指定毗连器,您可以使用开箱即用的毗连器。
支持毗连器

毗连器支持的毗连器类型外部系统Apache DorisSink     
KafkaSink     
MySQLSource     
PaimonSink     
StarRocksSink     
开发本身的毗连器

        如果提供的毗连器不能满意您的要求,您可以开发本身的毗连器,以使您的外部系统参与Flink CDC管道。检察Flink CDC api,相识怎样开发本身的毗连器。
Flink源

Flink CDC 源

        Flink CDC源是Apache Flink的一组源毗连器(source connectors),使用变动数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。以是它可以充实利用Debezium的本领。相识更多关于什么是Debezium。
debezium

支持的毗连器

毗连器数据库驱动mongodb-cdc     
MongoDB Driver: 4.9.1mysql-cdc     
JDBC Driver: 8.0.28oceanbase-cdc     
OceanBase Driver: 2.4.xoracle-cdc     
Oracle Driver: 19.3.0.0postgres-cdc     
JDBC Driver: 42.5.1sqlserver-cdc     
JDBC Driver: 9.4.1.jre8tidb-cdc     
JDBC Driver: 8.0.27db2-cdc     
Db2 Driver: 11.5.0.0vitess-cdc     
MySql JDBC Driver: 8.0.26  支持的Flink版本

Flink CDC 版本Flink版本1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*2.1.*1.13.*2.2.*1.13.*, 1.14.*2.3.*1.13.*, 1.14.*, 1.15.*, 1.16.*2.4.*1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*3.0.*1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* 特征

1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC毗连器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC毗连器,用户可以使用SQL DDL创建CDC源来监督单个表上的更改。
        下表表现了毗连器(connector)的当前特性:
毗连器无锁读并行读仅一次读增量快照读mongodb-cdc✅✅✅✅mysql-cdc✅✅✅✅oracle-cdc✅✅✅✅postgres-cdc✅✅✅✅sqlserver-cdc✅✅✅✅oceanbase-cdc❌❌❌❌tidb-cdc✅❌✅❌db2-cdc✅✅✅✅vitess-cdc✅❌✅❌
MySQL同步到MySQL

DataStream方式实现

必要的依靠pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>com.leboop.www</groupId>
  7.     <artifactId>flink</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <flink.version>1.17.1</flink.version>
  11.         <scala.binary.version>2.12</scala.binary.version>
  12.         <maven.compiler.source>1.8</maven.compiler.source>
  13.         <maven.compiler.target>1.8</maven.compiler.target>
  14.     </properties>
  15.     <dependencies>
  16.         <!-- flink客户端 -->
  17.         <dependency>
  18.             <groupId>org.apache.flink</groupId>
  19.             <artifactId>flink-clients</artifactId>
  20.             <version>${flink.version}</version>
  21.         </dependency>
  22.         <!--  Table API for Java -->
  23.         <dependency>
  24.             <groupId>org.apache.flink</groupId>
  25.             <artifactId>flink-table-api-java</artifactId>
  26.             <version>${flink.version}</version>
  27.         </dependency>
  28.         <!-- flink cdc for mysql -->
  29.         <dependency>
  30.             <groupId>com.ververica</groupId>
  31.             <artifactId>flink-connector-mysql-cdc</artifactId>
  32.             <version>2.4.2</version>
  33.         </dependency>
  34.         <dependency>
  35.             <groupId>org.apache.flink</groupId>
  36.             <artifactId>flink-connector-base</artifactId>
  37.             <version>1.18.0</version>
  38.         </dependency>
  39.         <!-- json解析 -->
  40.         <dependency>
  41.             <groupId>com.alibaba</groupId>
  42.             <artifactId>fastjson</artifactId>
  43.             <version>1.2.78</version> <!-- 请使用最新的版本号 -->
  44.         </dependency>
  45.     </dependencies>
  46. </project>
复制代码
如果缺少依靠,可能报错如下:
  1. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
  2.     at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
  3. Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter
  4.     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  5.     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  6.     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
  7.     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  8.     ... 1 more
复制代码
添加如下依靠即可:
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-base</artifactId>
  4.     <version>1.18.0</version>
  5. </dependency>
复制代码
如果报错如下:
  1. Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
  2.     at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
  3.     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)
  4.     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)
  5.     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)
  6.     at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
  7.     at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
  8.     at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)
复制代码
添加如下依靠: 
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-clients</artifactId>
  4.     <version>${flink.version}</version>
  5. </dependency>
复制代码
预备工作

        本文仅仅为了演示,在windows当地安装了8.0.30版本的MySQL。如图:

预备两个数据库,分别作为本次案例的source和sink,如图: 

 建表语句分别如下:
  1. CREATE TABLE `human` (
  2.   `id` bigint NOT NULL AUTO_INCREMENT,
  3.   `name` varchar(100) DEFAULT NULL,
  4.   `age` int DEFAULT NULL,
  5.   PRIMARY KEY (`id`)
  6. ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  7. CREATE TABLE `human_sink` (
  8.   `id` bigint NOT NULL AUTO_INCREMENT,
  9.   `name` varchar(100) DEFAULT NULL,
  10.   `age` int DEFAULT NULL,
  11.   PRIMARY KEY (`id`)
  12. ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码
 MySQL CDC必要开启biglog日志,执行如下SQL检察biglog日志是否开启
  1. SHOW VARIABLES LIKE 'log_bin';
复制代码
如图:

 Value为ON,表示开启。
代码 

  1. package com.leboop.cdc;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  5. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  11. import java.sql.Connection;
  12. import java.sql.DriverManager;
  13. import java.sql.PreparedStatement;
  14. /**
  15. * Description TODO.
  16. * Date 2024/7/28 15:48
  17. *
  18. * @author leb
  19. * @version 2.0
  20. */
  21. public class MysqlCDCDemo {
  22.     public static void main(String[] args) throws Exception {
  23.         // flink source,source类型为mysql
  24.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  25.                 .hostname("localhost")
  26.                 .port(80)
  27.                 .databaseList("cdc_demo")
  28.                 .tableList("cdc_demo.human")
  29.                 .username("root")
  30.                 .password("root")
  31.                 .deserializer(new JsonDebeziumDeserializationSchema())
  32.                 .serverId("1")
  33.                 .build();
  34.         // 初始化环境.
  35.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  36.         // enable checkpoint
  37.         env.enableCheckpointing(3000);
  38.         DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  39.                 // set 1 parallel source tasks
  40.                 .setParallelism(1);
  41.         // 将数据打印到客户端.
  42.         stringDataStreamSource
  43.                 .print().setParallelism(1); // use parallelism 1 for sink
  44.         // 数据同步到mysql
  45.         stringDataStreamSource.addSink(new RichSinkFunction<String>() {
  46.             private Connection connection = null;
  47.             private PreparedStatement preparedStatement = null;
  48.             @Override
  49.             public void open(Configuration parameters) throws Exception {
  50.                 if (connection == null) {
  51.                     Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
  52.                     connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接
  53.                     connection.setAutoCommit(false);//关闭自动提交
  54.                 }
  55.             }
  56.             @Override
  57.             public void invoke(String value, Context context) throws Exception {
  58.                 JSONObject jsonObject = JSON.parseObject(value);
  59.                 String op = jsonObject.getString("op");
  60.                 if ("r".equals(op)) { // 首次全量
  61.                     System.out.println("执行清表操作");
  62.                     connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据
  63.                     JSONObject after = jsonObject.getJSONObject("after");
  64.                     Integer id = after.getInteger("id");
  65.                     String name = after.getString("name");
  66.                     Integer age = after.getInteger("age");
  67.                     preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
  68.                     preparedStatement.setInt(1, id);
  69.                     preparedStatement.setString(2, name);
  70.                     preparedStatement.setInt(3, age);
  71.                     preparedStatement.execute();
  72.                     connection.commit();//预处理完成后统一提交
  73.                 }else if("c".equals(op)) { // 新增.
  74.                     JSONObject after = jsonObject.getJSONObject("after");
  75.                     Integer id = after.getInteger("id");
  76.                     String name = after.getString("name");
  77.                     Integer age = after.getInteger("age");
  78.                     preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");
  79.                     preparedStatement.setInt(1, id);
  80.                     preparedStatement.setString(2, name);
  81.                     preparedStatement.setInt(3, age);
  82.                     preparedStatement.execute();
  83.                     connection.commit();//预处理完成后统一提交
  84.                 }
  85.                 else if ("d".equals(op)) { // 删除
  86.                     JSONObject after = jsonObject.getJSONObject("before");
  87.                     Integer id = after.getInteger("id");
  88.                     preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");
  89.                     preparedStatement.setInt(1, id);
  90.                     preparedStatement.execute();
  91.                     connection.commit();//预处理完成后统一提交
  92.                 } else if ("u".equals(op)) { // 更新
  93.                     JSONObject after = jsonObject.getJSONObject("after");
  94.                     Integer id = after.getInteger("id");
  95.                     String name = after.getString("name");
  96.                     Integer age = after.getInteger("age");
  97.                     preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");
  98.                     preparedStatement.setString(1, name);
  99.                     preparedStatement.setInt(2, age);
  100.                     preparedStatement.setInt(3, id);
  101.                     preparedStatement.execute();
  102.                     connection.commit();//预处理完成后统一提交
  103.                 } else {
  104.                     System.out.println("不支持的操作op=" + op);
  105.                 }
  106.             }
  107.             @Override
  108.             public void close() throws Exception {
  109.                 System.out.println("执行close方法");
  110.                 if (preparedStatement != null) {
  111.                     preparedStatement.close();
  112.                 }
  113.                 if (connection != null) {
  114.                     connection.close();
  115.                 }
  116.             }
  117.         });
  118.         env.execute("Print MySQL Snapshot + Binlog");
  119.     }
  120. }
复制代码
(1)Flink源
如下代码毗连了当地MySQL数据库cdc_demo。
  1.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  2.                 .hostname("localhost")
  3.                 .port(80)
  4.                 .databaseList("cdc_demo")
  5.                 .tableList("cdc_demo.human")
  6.                 .username("root")
  7.                 .password("root")
  8.                 .deserializer(new JsonDebeziumDeserializationSchema())
  9.                 .serverId("1")
  10.                 .build();
复制代码
new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。
(2)server id
        每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络毗连和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,发起为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。
(3)从MySQL源读取数据
  1.         DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  2.                 // set 1 parallel source tasks
  3.                 .setParallelism(1);
复制代码
代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面必要4个server id,例如"1-4"。
(3)将读取的MySQL数据打印到控制台
  1.         // 将数据打印到客户端.
  2.         stringDataStreamSource
  3.                 .print().setParallelism(1); // use parallelism 1 for sink
复制代码
这里仅仅为了检察Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:
  1. {"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
  2. {"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
  3. {"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}
复制代码
第一条Json数据格式化后如下: 
  1. {
  2.   "before": null,
  3.   "after": {
  4.     "id": 7,
  5.     "name": "lisi",
  6.     "age": 23
  7.   },
  8.   "source": {
  9.     "version": "1.9.7.Final",
  10.     "connector": "mysql",
  11.     "name": "mysql_binlog_source",
  12.     "ts_ms": 0,
  13.     "snapshot": "false",
  14.     "db": "cdc_demo",
  15.     "sequence": null,
  16.     "table": "human",
  17.     "server_id": 0,
  18.     "gtid": null,
  19.     "file": "",
  20.     "pos": 0,
  21.     "row": 0,
  22.     "thread": null,
  23.     "query": null
  24.   },
  25.   "op": "r",
  26.   "ts_ms": 1722218198388,
  27.   "transaction": null
  28. }
复制代码
其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:

例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。
(4)sink
这里使用匿名内部类RichSinkFunction实现了MySQL sink。
测试

        先向human表中插入2条数据,SQL如下:
  1. insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
  2. insert into cdc_demo.human(id,name,age) values(2,"lisi",23);
复制代码
然后启动程序,输出日志如下:
  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
  5. log4j:WARN Please initialize the log4j system properly.
  6. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  7. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
  8. SLF4J: Defaulting to no-operation MDCAdapter implementation.
  9. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
  10. {"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
  11. {"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
  12. 七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
  13. 信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
  14. 执行清表操作
  15. 执行清表操作
复制代码
检察human_sink表,可以看到human表中的两条数据已经被同步:

接着执行如下更新、删除、新增SQL:
  1. update cdc_demo.human set age = 10 where id = 1;
  2. delete from cdc_demo.human where id = 2;
  3. insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);
复制代码
 输出日志如下:
  1. {"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
  2. {"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
  3. {"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}
复制代码
如图:

最终看到两张表数据保持划一,如图: 


 SQL方式实现

必要的依靠pom.xml

        在DataStream方式上,还必要添加如下依靠:
  1.         <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  4.             <version>${flink.version}</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.apache.flink</groupId>
  8.             <artifactId>flink-connector-jdbc</artifactId>
  9.             <version>3.0.0-1.16</version>
  10.         </dependency>
复制代码
如果报错如下:
  1. Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
  2.         at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)
  3.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)
  4.         at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
  5.         at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)
复制代码
添加如下依靠: 
  1.         <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  4.             <version>${flink.version}</version>
  5.         </dependency>
复制代码
如果报错如下: 
  1. Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.
  2. Table options are:
  3. 'connector'='jdbc'
  4. 'driver'='com.mysql.cj.jdbc.Driver'
  5. 'password'='******'
  6. 'table-name'='human_sink'
  7. 'url'='jdbc:mysql://localhost:80/cdc_sink'
  8. 'username'='root'
  9.         at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
  10.         at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
  11.         at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
  12.         at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
  13.         at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
  14.         at scala.collection.Iterator.foreach(Iterator.scala:929)
  15.         at scala.collection.Iterator.foreach$(Iterator.scala:929)
  16.         at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)
  17.         at scala.collection.IterableLike.foreach(IterableLike.scala:71)
  18.         at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
  19.         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  20.         at scala.collection.TraversableLike.map(TraversableLike.scala:234)
  21.         at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
  22.         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  23.         at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
  24.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
  25.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
  26.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
  27.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
  28.         at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
  29. Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
  30.         at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
  31.         at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
  32.         at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
  33.         ... 19 more
  34. Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
  35. Available factory identifiers are:
  36. blackhole
  37. datagen
  38. mysql-cdc
  39. print
  40.         at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
  41.         at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)
  42.         ... 21 more
复制代码
请添加如下依靠: 
  1.         <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-connector-jdbc</artifactId>
  4.             <version>3.0.0-1.16</version>
  5.         </dependency>
复制代码
代码

  1. package com.leboop.cdc;
  2. import org.apache.flink.table.api.EnvironmentSettings;
  3. import org.apache.flink.table.api.TableEnvironment;
  4. import org.apache.flink.table.api.TableResult;
  5. /**
  6. * Description TODO.
  7. * Date 2024/7/28 15:48
  8. *
  9. * @author leb
  10. * @version 2.0
  11. */
  12. public class MysqlCDCSqlDemo {
  13.     public static void main(String[] args) throws Exception {
  14.         EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
  15.         TableEnvironment tableEnv = TableEnvironment.create(settings);
  16.         tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);
  17.         // source
  18.         TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
  19.                 "id BIGINT ,\n" +
  20.                 "name STRING ,\n" +
  21.                 "age INT ,\n" +
  22.                 "PRIMARY KEY (id) NOT ENFORCED \n" +
  23.                 ") WITH (\n" +
  24.                 " 'connector' = 'mysql-cdc',\n" +
  25.                 " 'hostname' = 'localhost',\n" +
  26.                 " 'port' = '80',\n" +
  27.                 " 'username' = 'root',\n" +
  28.                 " 'password' = 'root',\n" +
  29.                 " 'database-name' = 'cdc_demo',\n" +
  30.                 " 'table-name' = 'human') ");
  31.         // 输出source表
  32.         createSourceTable.print();
  33.         System.out.println("创建源表结束");
  34.         // sink
  35.         TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
  36.                 "id BIGINT ," +
  37.                 "name STRING ," +
  38.                 "age INT ," +
  39.                 "PRIMARY KEY(id) NOT ENFORCED " +
  40.                 ") WITH (" +
  41.                 " 'connector' = 'jdbc'," +
  42.                 " 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
  43.                 " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  44.                 " 'username' = 'root'," +
  45.                 " 'password' = 'root'," +
  46.                 " 'table-name' = 'human_sink' )");
  47.         createSinkTable.print();
  48.         System.out.println("创建sink表结束");
  49.         // 插入
  50.         tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
  51.         System.out.println("插入sink表结束");
  52.     }
  53. }
复制代码
 (1)创建源表
        如下代码创建了Flink中的源表,为什么说是Flink中呢?缘故原由是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:
  1.         // source
  2.         TableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +
  3.                 "id BIGINT ,\n" +
  4.                 "name STRING ,\n" +
  5.                 "age INT ,\n" +
  6.                 "PRIMARY KEY (id) NOT ENFORCED \n" +
  7.                 ") WITH (\n" +
  8.                 " 'connector' = 'mysql-cdc',\n" +
  9.                 " 'hostname' = 'localhost',\n" +
  10.                 " 'port' = '80',\n" +
  11.                 " 'username' = 'root',\n" +
  12.                 " 'password' = 'root',\n" +
  13.                 " 'database-name' = 'cdc_demo',\n" +
  14.                 " 'table-name' = 'human') ");
复制代码
注意这里connector必须是mysql-cdc。
(2)创建目标表
代码如下:
  1.         // sink
  2.         TableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +
  3.                 "id BIGINT ," +
  4.                 "name STRING ," +
  5.                 "age INT ," +
  6.                 "PRIMARY KEY(id) NOT ENFORCED " +
  7.                 ") WITH (" +
  8.                 " 'connector' = 'jdbc'," +
  9.                 " 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +
  10.                 " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  11.                 " 'username' = 'root'," +
  12.                 " 'password' = 'root'," +
  13.                 " 'table-name' = 'human_sink' )");
复制代码
这里connector的值必须是jdbc,即通过jdbc毗连器实现。
(3)同步数据
        通过如下SQL即可以实现数据同步:
  1.         tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");
复制代码
测试

        与DataStream测试过程相同。

        值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4