马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
目次
一、前言
二、常用的数据同步解决方案
2.1 为什么需要数据同步
2.2 常用的数据同步方案
2.2.1 Debezium
2.2.2 DataX
2.2.3 Canal
2.2.4 Sqoop
2.2.5 Kettle
2.2.6 Flink CDC
三、Flink CDC介绍
3.1 Flink CDC 概述
3.1.1 Flink CDC 工作原理
3.2 Flink CDC数据同步上风
3.3 Flink CDC 适用范围
四、Java集成Flink CDC同步mysql数据
4.1 组件版本选择
4.2 数据准备
4.3 导入组件依赖
4.4 Java代码实现过程
4.4.1 自界说反序列化器
4.4.2 自界说Sink输出
4.4.3 启动任务类
4.4.4 结果测试
五、与springboot整合过程
5.1 自界说监听类
5.2 结果测试
六、写在文末
一、前言
在微服务开辟模式下,体系的数据源每每不是一个单一的来源,在实际项目中,每每是多种异构数据源的组合,好比核心业务数据在mysql,日志分析数据在hbase,clickhouse,es等,但是不同的数据源需要配合完成某一类业务的时候,就涉及到数据的整合或数据同步问题,尤其是数据同步的场景可以说是非常常见的,那么有哪些解决方案呢?
二、常用的数据同步解决方案
2.1 为什么需要数据同步
当体系发展到肯定阶段,尤其是体系的规模越来越大,业务体量也不停扩大的时候,一个体系可能会用到多种数据存储中间件,而不再是单纯的mysql,pgsql等,甚至一个体系中一个或多个微服务无法再满足业务的需求,而需要单独做数据存储的服务,在类似的场景下,很难制止新的服务需要从原有的微服务中抽取数据,或定期做数据的同步处理,诸如此类的场景另有很多。
2.2 常用的数据同步方案
下图枚举了几种主流的用于解决数据同步场景的方案
关于图中几种技术,做如下简单的介绍,便于做技术选型作为对比
2.2.1 Debezium
Debezium是国外⽤户常⽤的CDC组件,单机对于分布式来说,在数据读取本领的拓展上,没有分布式的更具有上风,在大数据众多的分布式框架中(Hive、Hudi等)Flink CDC 的架构可以或许很好地接入这些框架。
2.2.2 DataX
DataX无法支持增量同步。如果一张Mysql表每天增量的数据是不同天的数据,并且没有办法确定它的产生时间,那么如何将数据同步到数仓是一个值得考虑的问题。DataX支持全表同步,也支持sql查询的方式导入导出,全量同步肯定是不可取的,sql查询的方式没有可以确定增量数据的字段的话也不是一个好的增量数据同步方案。
2.2.3 Canal
Canal是用java开辟的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。Canal主要支持了MySQL的Binlog解析,将增量数据写入中间件中(比方kafka,Rocket MQ等),但是无法同步汗青数据,由于无法获取到binlog的变更。
2.2.4 Sqoop
Sqoop主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间举行数据的传递。Sqoop将导入或导出命令翻译成mapreduce程序来实现,这样的毛病就是Sqoop只能做批量导入,遵照事务的一致性,Mapreduce任务成功则同步成功,失败则全部同步失败。
2.2.5 Kettle
Kettle是一款开源的数据集成工具,主要用于数据抽取、转换和加载(ETL)。它提供了图形化的界面,使得用户可以通过拖拽组件的方式来筹划和执行数据集成任务。
- Kettle 被广泛应用于数据仓库构建、数据迁移、数据清洗等多种场景。
- 虽然 Kettle 在处理中小型数据集时表现良好,但在处理大规模数据集时可能会碰到性能瓶颈,尤其是在没有举行优化的情况下。
- Kettle 在运行时可能会消耗较多的内存和 CPU 资源,特殊是当处理复杂的转换任务时。
虽然 Kettle 的根本操作相对简单,但对于高级功能和复杂任务的筹划,用户仍需投入肯定的时间和精力来学习和掌握。
2.2.6 Flink CDC
Flink CDC 根本都弥补了以上框架的不足,将数据库的全量和增量数据一体化地同步到消息队列和数据仓库中;也可以用于实时数据集成,将数据库数据实时入湖入仓;无需像其他的CDC工具一样需要在服务器上举行部署,淘汰了维护成本,链路更少;完善套接Flink程序,CDC获取到的数据流直接对接Flink举行数据加工处理,一套代码即可完成对数据的抽取转换和写出,既可以利用flink的DataStream API完成编码,也可以利用较为上层的FlinkSQL API举行操作。
三、Flink CDC介绍
3.1 Flink CDC 概述
Flink CDC(Change Data Capture)是指利用 Apache Flink 流处理框架来捕获数据库中的变更记载(即数据变更日志)。这种方式答应开辟者实时监控并处理数据库表中的更改事件,好比插入、更新或删除操作。通过这种方式,可以实现数据库之间或者数据库与外部体系的实时数据同步。
3.1.1 Flink CDC 工作原理
Flink CDC 主要依赖于数据库提供的日志或者事务记载,如 MySQL 的 Binlog、PostgreSQL 的 WAL(Write-Ahead Logs)等。Flink CDC 利用专门的连接器(Connector)读取这些日志文件,并将其转换成 Flink 可以处理的数据流。之后,这些数据流可以被进一步处理,如过滤、聚合、转换等,最终输出到目标体系,如另一个数据库、消息队列或其他存储体系。
3.2 Flink CDC数据同步上风
Flink CDC(Change Data Capture)是一种用于捕获数据库变更事件的技术,它可以或许实实际时的数据同步和流式处理。利用 Apache Flink CDC 举行数据同步具有以下几个主要上风:
- 实时性:Flink CDC 可以或许捕捉到数据库中的任何更改(如插入、更新、删除等),并立刻将这些更改作为事件流发送出去,实现了真正的实时数据同步。
- 高性能:Flink 是一个高度可扩展的流处理框架,其内置的优化机制使得纵然在处理大规模数据流时也能保持高性能。Flink CDC 利用了这些优化技术来包管数据同步的高效执行。
- 易用性:Flink 提供了丰富的 API 和预构建的连接器(Connectors),使得集成数据库变得简单。利用 Flink CDC 可以通过配置文件或简单的编程接口轻松设置数据捕获任务。
- 灵活性:除了根本的 CDC 功能之外,Flink 还支持各种复杂的流处理操作,如窗口计算、状态管理等。这意味着不但可以捕获变更数据,还可以在数据同步过程中加入各种数据处理逻辑。
- 兼容性广泛:Flink CDC 支持多种数据库体系,包括 MySQL、PostgreSQL、Oracle、SQL Server 等,提供了广泛的兼容性和选择空间。
- 容错本领:Flink 内置了强大的状态管理和查抄点机制,纵然在发生故障的情况下也能包管数据的一致性和准确性。这对于需要高可靠性的数据同步场景非常紧张。
- 无侵入性:Flink CDC 通常是无侵入性的,不需要对源数据库举行任何修改或添加额外的触发器等组件,淘汰了对现有体系的干扰。
- 支持多种部署方式:无论是部署在本地集群还是云端,Flink 都能很好地支持,这为不同规模的企业提供了灵活的选择。
- 社区支持:Apache Flink 拥有一个活泼的开源社区,这不但意味着有丰富的文档和教程可供参考,也意味着碰到问题时可以快速获得资助和支持。
Flink CDC 在数据同步方面提供了一个强大且灵活的解决方案,适合那些需要高性能、实时数据处理的应用场景。
3.3 Flink CDC 适用范围
Flink CDC(Change Data Capture)连接器是 Apache Flink 社区为 Flink 提供的一种用于捕获数据库变更事件的工具。它答应用户从关系型数据库中实时捕获表的数据变更,并将这些变更事件转化为流式数据,以便举行实时处理,好比你需要将mysql的数据同步到另一个mysql数据库,就需要利用mysql连接器,如果需要同步mongodb的数据,则需要利用mongodb的连接器。截止到Flink CDC 2.2 为止,支持的连接器:
支Flink CDC 持的Flink版本,在实际利用的时候需要根据版本的对照举行选择:
四、Java集成Flink CDC同步mysql数据
4.1 组件版本选择
网上很多关于Flink CDC的版本都是1.13左右的,这个在当前JDK比较新的版本下已经出现了较多的不兼容,本文以JDK17为基础版本举行分析
编号 | 组件名称 | 版本 | 1 | JDK
| 17
| 2 | springboot
| 3.2.2
| 3 | mysql
| 8.0.23
| 4 | flink cdc
| 1.17.0
|
4.2 数据准备
找一个可以用的mysql数据库,在下面创建两张表,一张tb_role,另一张tb_role_copy,仅表名不一样
- CREATE TABLE `tb_role` (
- `id` varchar(32) NOT NULL COMMENT '主键',
- `role_code` varchar(32) NOT NULL COMMENT '版本号',
- `role_name` varchar(32) DEFAULT NULL COMMENT '角色名称',
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='tb角色表';
复制代码
4.3 导入组件依赖
pom中添加如下依赖
- <!-- Flink CDC 1.17.0版本,与springboot3.0进行整合使用的版本-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>1.17.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java</artifactId>
- <version>1.17.1</version>
- </dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.4.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>1.18.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>1.17.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>3.0.0-1.16</version>
- </dependency>
复制代码 4.4 Java代码实现过程
案例需求场景:通过Flink CDC ,监听tb_role表数据变化,写入tb_role_copy
4.4.1 自界说反序列化器
反序列化器的目的是为了解析flink cdc监听到mysql表数据变化的日志,以json的情势举行解析,方便对日志中的关键参数举行处理
- package com.congge.flink.blog;
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
- import java.util.List;
- public class CustomerSchema implements DebeziumDeserializationSchema<String> {
- /**
- * 封装的数据格式
- * {
- * "database":"",
- * "tableName":"",
- * "before":{"id":"","tm_name":""....},
- * "after":{"id":"","tm_name":""....},
- * "type":"c u d"
- * //"ts":156456135615
- * }
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
- //1.创建JSON对象用于存储最终数据
- JSONObject result = new JSONObject();
- //2.获取库名&表名
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- String database = fields[1];
- String tableName = fields[2];
- Struct value = (Struct) sourceRecord.value();
- //3.获取"before"数据
- Struct before = value.getStruct("before");
- JSONObject beforeJson = new JSONObject();
- if (before != null) {
- Schema beforeSchema = before.schema();
- List<Field> beforeFields = beforeSchema.fields();
- for (Field field : beforeFields) {
- Object beforeValue = before.get(field);
- beforeJson.put(field.name(), beforeValue);
- }
- }
- //4.获取"after"数据
- Struct after = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (after != null) {
- Schema afterSchema = after.schema();
- List<Field> afterFields = afterSchema.fields();
- for (Field field : afterFields) {
- Object afterValue = after.get(field);
- afterJson.put(field.name(), afterValue);
- }
- }
- //5.获取操作类型 CREATE UPDATE DELETE
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- String type = operation.toString().toLowerCase();
- if ("create".equals(type)) {
- type = "insert";
- }
- //6.将字段写入JSON对象
- result.put("database", database);
- result.put("tableName", tableName);
- result.put("before", beforeJson);
- result.put("after", afterJson);
- result.put("type", type);
- //7.输出数据
- collector.collect(result.toJSONString());
- }
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
复制代码
4.4.2 自界说Sink输出
Sink即为Flink CDC的输出连接器,即监听到源表数据变化并经过处理后最终写到那边,以mysql为例,我们在监听到tb_role表数据变化后,同步到tb_role_copy中去
- package org.dromara.sync.flink.v2;
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.dromara.common.core.utils.SpringUtils;
- import org.dromara.sync.config.SyncDsTargetDbConfig;
- import java.io.Serial;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- /**
- * 自定义一个mysql的Sink输出器,将监听到的变化数据写到指定的数据库下面的表中
- * @author zcy
- */
- @Slf4j
- public class MyJdbcSink extends RichSinkFunction<String> {
- @Serial
- public static final long serialVersionUID = 3153039337754737517L;
- // 提前声明连接和预编译语句
- private Connection connection = null;
- private PreparedStatement insertStmt = null;
- private PreparedStatement updateStmt = null;
- private PreparedStatement preparedStatement = null;
- @Override
- public void open(Configuration parameters) throws Exception {
- if(connection ==null){
- Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动
- connection = DriverManager.getConnection("jdbc:mysql://IP:3306/db", "root", "123");
- connection.setAutoCommit(false);//关闭自动提交
- }
- }
- @Override
- public void invoke(String value, Context context) throws Exception {
- JSONObject jsonObject = JSONObject.parseObject(value);
- log.info("监听到数据变化,准备执行sql的变更,参数 jsonObject :【{}】",jsonObject);
- String type = jsonObject.getString("type");
- String tableName = "tb_role_copy";
- String database = jsonObject.getString("database");
- if(type.equals("insert")){
- JSONObject after = (JSONObject)jsonObject.get("after");
- Integer id = after.getInteger("id");
- String roleCode = after.getString("role_code");
- String roleName = after.getString("role_name");
- String sql = String.format("insert into %s.%s values (?,?,?)", database, tableName);
- insertStmt = connection.prepareStatement(sql);
- insertStmt.setInt(1, Integer.valueOf(id));
- insertStmt.setString(2, roleCode);
- insertStmt.setString(3, roleName);
- insertStmt.execute();
- connection.commit();
- } else if(type.equals("update")){
- JSONObject after = jsonObject.getJSONObject("after");
- Integer id = after.getInteger("id");
- String roleCode = after.getString("role_code");
- String roleName = after.getString("role_name");
- String sql = String.format("update %s.%s set role_code = ?, role_name = ? where id = ?", database, tableName);
- updateStmt = connection.prepareStatement(sql);
- updateStmt.setString(1, roleCode);
- updateStmt.setString(2, roleName);
- updateStmt.setInt(3, id);
- updateStmt.execute();
- connection.commit();
- } else if(type.equals("delete")){
- JSONObject after = jsonObject.getJSONObject("before");
- Integer id = after.getInteger("id");
- String sql = String.format("delete from %s.%s where id = ?", database, tableName);
- preparedStatement = connection.prepareStatement(sql);
- preparedStatement.setInt(1, id);
- preparedStatement.execute();
- connection.commit();
- }
- }
- @Override
- public void close() throws Exception {
- if(insertStmt != null){
- insertStmt.close();
- }
- if(updateStmt != null){
- updateStmt.close();
- }
- if(preparedStatement != null){
- preparedStatement.close();
- }
- if(connection != null){
- connection.close();
- }
- }
- }
复制代码 4.4.3 启动任务类
本例先以main程序运行,在实际举行线上部署利用时,可以打成jar包或整合springboot举行启动即可
- package org.dromara.sync.flink.mock;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.dromara.sync.flink.MyJdbcSchema;
- import org.dromara.sync.flink.v2.MyJdbcSink;
- public class FlinkCdcMainTest {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
- .hostname("IP")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("db") // 监听的数据库列表
- .tableList("db.tb_role") // 监听的表列表
- .deserializer(new MyJdbcSchema())
- .startupOptions(StartupOptions.latest())
- .serverTimeZone("UTC")
- .build();
- DataStream<String> stream = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
- // 输出到控制台
- // stream.print();
- stream.addSink(new MyJdbcSink());
- env.execute("Flink CDC Job");
- }
- }
复制代码 4.4.4 结果测试
运行上面的代码,通过控制台可以看到任务已经运行起来了,监听并等候数据源数据变更
测试之前,确保两张表数据是一致的
此时为tb_role表增长一条数据,很快控制台可以监听并输出相关的日志
而后,tb_role_copy表同步新增了一条数据
五、与springboot整合过程
在实际项目中,通常会联合springboot项目整合利用,参考下面的利用步骤
5.1 自界说监听类
可以直接基于启动类改造,也可以新增一个类,实现ApplicationRunner接口,重写里面的run方法
- 不难发现,run方法里面的代码逻辑便是从上述main方法运行任务里面拷贝过来的;
- package org.dromara.sync.flink.v2;
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.dromara.sync.config.SyncDsSourceDbConfig;
- import org.dromara.sync.flink.MyJdbcSchema;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- /**
- * 启动加载,将该类作为一个监听任务的进程启动并执行
- * @author Evans
- */
- @Component
- public class MysqlDsListener implements ApplicationRunner {
- @Override
- public void run(ApplicationArguments args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
- .hostname("IP")
- .port(13306)
- .username("root")
- .password("123456")
- .databaseList("db")
- .tableList("db.tb_role")
- .deserializer(new MyJdbcSchema())
- .startupOptions(StartupOptions.latest())
- .serverTimeZone("UTC")
- .build();
- DataStream<String> stream = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
- // 输出到控制台
- // stream.print();
- stream.addSink(new MyJdbcSink());
- env.execute("Flink CDC Job");
- }
- }
复制代码 5.2 结果测试
启动工程之后,相称于是通过flink cdc启动了一个用于监听数据变更的背景进程
然后我们再在数据库tb_role表增长一条数据,控制台可以看到输出了相关的日志
此时再查抄数据表,可以发现tb_role_copy表新增了一条一样的数据
六、写在文末
本文通过较大的篇幅具体介绍了Flink CDC相关的技术,末了通过一个实际案例演示了利用Flink CDC同步mysql表数据的示例,盼望对看到的同学有用,本篇到此结束感谢观看。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |