Flink CDC整合SpringBoot获取变动数据
Flink CDC整合SpringBoot获取变动数据
1.Flink CDC介绍:
(1).概述
(2).Flink CDC Source
(3).版本支持
2.项目概述
(1).项目位置
(2).项目版本
(3).项目依赖
3.Flink CDC环境准备
(1).OceanBase数据库
(2).SQLServer数据库
4.Flink CDC Connector整合SpringBoot
(1).目次布局
(2).版本阐明
(3).项目设计思路
(4).OceanBase数据库
(5).SQLServer数据库
1.Flink CDC介绍:
(1).概述
CDC是Chanage Data Capture(数据变动捕获)的简称。其焦点原理就是监测并捕获数据库的变动(例如增删改),将这些变动按照发生序次捕获,将捕获到的数据,通过一定的数据转换和清洗大概干系业务的整合,写入目标数据库。
Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地形貌他们的ETL管道逻辑,并资助用户自动生成自定义的Flink使用符并提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化,数据转换,完整的数据库同步和exactly-once语义, 与Apache Flink深度集成并由Apache Flink提供支持,Flink CDC提供:端到端数据集成框架, 为数据集成用户提供可轻松构建作业的API 和 Sink中对多表支持, 以及同步整个数据库, 模式演化的能力。
(2).Flink CDC Source
Flink CDC sources是Apache Flink 的一组源连接器,使用更改数据捕获(CDC)从差别数据库获取更改。一些CDC泉源集成Debezium作为捕获数据更改的引擎。此中支持的连接器包罗mysql-cdc, oceanbase-cdc, oracle-cdc, sqlserver-cdc等。
(3).版本支持
2.项目概述
(1).项目版本
JDK版本:JDK11
SpringBoot版本:2.6.6
SpringCloudAlibaba版本:2021.0.1.0
SpringCloud版本: 2021.0.1
FlinkCDC-Connector版本:3.0.1(部分兼容2.2.0版本)
(2).项目依赖
3.Flink CDC环境准备
(1).OceanBase数据库
数据库版本:
5.7.25-OceanBase_CE-v4.3.1.0
署理版本:
oblogproxy-2.0.2-100000012024060321.el7.x86_64
步调:
(1).部署安装数据库
按照以上版本安装OceanBase数据库,具体使用请参考官网网址:https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000818649
(2).安装署理服务oblogproxy
oblogproxy 是 OceanBase 数据库的增量日志署理服务。oblogproxy 支持实时增量链路接入和管理,方便应用接入 OceanBase 数据库的增量日志。同时支持在网络隔离时订阅增量日志。
安装oblogproxy-2.0.2署理,此中署理版本和OceanBase数据库的版本是有对应的,现在OceanBase4.3.1.0版本是最新的版本,对应oblogproxy署理版本是2.0.2,对应Flink CDC 版本是2.2.0及以上。
署理下载地址: https://obbusiness-private.oss-cn-shanghai.aliyuncs.com/download-center/opensource/oblogproxy/v2.0.2/oblogproxy-2.0.2-100000012024060321.el7.x86_64.rpm
下载完成后, 下面的使用均在 oblogproxy 项目目次中举行,oblogproxy 项目目次默以为 /usr/local/oblogproxy。oblogproxy 的配置文件默认放在 conf/conf.json。
需修改 conf/conf.json 的以下配置:(oblogproxy 需要配置用户的用户名和密码,用户必须是 OceanBase 的 sys 租户的用户才能连接。)
此中加密的用户名和密码通过以下命令获取:
./bin/logproxy -x username
./bin/logproxy -x password
然后将结果分别保存到 conf/conf.json 文件的 ob_sys_username 和 ob_sys_password 配置项
此中conf/conf.json中binlog mode选项默认false,不要设置为true,使用CDC模式下不要开启binlog模式,这样的话会启动报错。
(3).运行署理服务oblogproxy
先进入 oblogproxy 项目目次默以为 /usr/local/oblogproxy
cd /usr/local/oblogproxy
再通过以下命令启动服务。
./run.sh start
可以通过日志目次查察启动状态:
cd /usr/local/oblogproxy/log
tail –f logproxy.log
(2).SQLServer数据库
数据库版本:
Microsoft SQL Server 2017 (RTM) - 14.0.1000.169 (X64)
Aug 22 2017 17:04:49
Copyright © 2017 Microsoft Corporation
Enterprise Edition (64-bit) on Windows 10 Enterprise 10.0 <X64> (Build 22621: ) (Hypervisor)
署理版本:
开启上述安装版本数据库下的SQLServerAgent署理服务,此服务是获取数据库变动信息的重要服务,不开启的话无法获取增量数据变动。
步调:
(1).部署安装数据库
按照以上版本安装SQLServer数据库, FlinkCDC Connector最低要求SQLServer2017版本及以上,同时建议安装企业版,因为其他版本可能会有CPU内核的限定,这样倒霉于在线上大用户量并发的环境下获取数据变动,如果仅仅用于测试可以安装开辟版本。
此处安装一定要选择混淆模式,而且设置sa用户的连接密码,企业级开辟以及FlinkCDC连接过程中也是需要提供数据库的用户和密码的,而且添加当前用户为SQLServer管理员。
(2).开启署理服务SQLServer Agent
(3).开启CDC模式
对于要获取数据变动的数据库和表都要开启CDC模式
数据库开启CDC:
EXEC sys.sp_cdc_enable_db;
判定当前数据库(datagather)是否启动CDC成功:
SELECT is_cdc_enabled FROM sys.databases WHERE name = ‘datagather’;
开启当前数据库下表的CDC模式
EXEC sys.sp_cdc_enable_table
@source_schema = ‘dbo’,
@source_name = ‘a_a_test’,
@role_name = ‘cdc_role’;
– schema_name 是表所属的架构(schema)的名称。
– table_name 是要启用 CDC 跟踪的表的名称。
– cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 干系的表中,您可以使用这些信息举行数据更改追踪和同步。
– 查询在当前数据库下全部的表:
SELECT * FROM INFORMATION_SCHEMA.TABLES
如果有多个数据库和多个表,按照以上使用逐一举行开启工作。
4.Flink CDC Connector整合SpringBoot
(1).目次布局
(2).版本阐明
此中version1和version2分别对应代码的版本,version1使用了Flink CDC Connector2.2.0版本,变动数据是在序列化器中获取的,version2使用了FlinkCDC Connector3.0.1(现在最新版本),变动数据是在自定义Sink中获取的,RichSinkFunction 提供了更多的功能和灵活性。它允许用户访问 Flink 运行时的上下文信息,如状态管理、计时器和广播变量等。此外,RichSinkFunction 还可以处理异步 I/O 使用,进步数据输出的效率。
(3).项目设计思路
version1
该版本是原始版本,使用的Connector的版本均为2.2.0,针对差别范例的数据库,配置差别的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取差别数据库范例的CDC连接器的配置信息。
启动完成后,在差别范例的序列化器中获取到变动数据,封装成ConcurrentHashMap,根据变动的数据以及信息,按照范例创建异步任务拼接出insert, delete, update,使用范例的SQL语句,对目标库举行增删改的使用,完成数据的变动和高效使用。
version2
该版本是完善的版本,使用的Connector的版本均为3.0.1,针对差别范例的数据库,配置差别的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取差别数据库范例的CDC连接器的配置信息,自定义Sink而且封装统一的变动实体来吸取数据而不是map,来获取CDC过程获取到的变动信息,而不是直接在序列化器中,这样更加高效而且使得处理变动数据和Flink CDC Connector的序列化器。
由于差别数据库支持的SQL语法差别,联合高内聚低耦合的设计思想以及后期维护和可扩展性,具体变动业务通过策略模式和工厂模式联合,实现对差别范例的数据库组装新增, 删除,修改的SQL语句,通过反射调用,大大进步了应用程序的扩展性和可维护性。
差别数据库范例的实今世码如下所示:
(4).OceanBase数据库
现在验证OceanBase数据库的数据变动获取,把变动结果显现在目标数据库SQLServer中,在application.just配置文件中把flink.version1.oceanbase改为true。
由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version1.oceanbase’) == ‘true’ && @environment.getProperty(‘flink.version2.oceanbase’) == ‘false’}”)
该条件注解的含义是flink.version1.oceanbase和flink.version2.oceanbase只有一个为true,另一个为false的时候才可以启动,这样的话每个范例数据库的CDC程序运行时只能同时存在一个。
可以看到version1中的oceanbase范例的CDC连接器启动成功,现在往表中插入一条数据,我们看oceanbase cdc 的连接器连接数据库的信息
新增使用
我们在该库下的a_a_test表中添加一条数据:
在oceanbase的序列化器中可以看到添加的变动数据信息
在我们的Runable接口中,可以看到针对目标数据库SQLServer,已经处理好了插入语句,语法是SQLServer的语法,如下图所示:
修改使用
删除使用
version2和version1的结果一致,此处不在演示.
(5).SQLServer数据库
现在验证SQLServer数据库的数据变动获取,把变动结果显现在目标数据库SQLServer中,在application.just配置文件中把flink.version2.sqlserver改为true
由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version2.sqlserver’) == ‘true’ && @environment.getProperty(‘flink.version1.sqlserver’) == ‘false’}”)该条件注解的含义是flink.version1.sqlserver和flink.version2.sqlserver只有一个为true,另一个为false的时候才可以启动,这样的话每个范例数据库的CDC程序运行时只能同时存在一个
可以看到version2中的sqlserver范例的CDC连接器启动成功
SQLServer的验证和OceanBase一样,大家可以自行验证哦
以上是使用FlinkCDC整合SpringBoot获取变动数据的分享,希望可以给大家提供思路和资助
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |