立山 发表于 2024-8-28 18:12:21

Flink CDC使用数据库获取变动数据

Flink CDC整合SpringBoot获取变动数据

Flink CDC整合SpringBoot获取变动数据
1.Flink CDC介绍:
(1).概述
(2).Flink CDC Source
(3).版本支持
2.项目概述
(1).项目位置
(2).项目版本
(3).项目依赖
3.Flink CDC环境准备
(1).OceanBase数据库
(2).SQLServer数据库
4.Flink CDC Connector整合SpringBoot
(1).目次布局
(2).版本阐明
(3).项目设计思路
(4).OceanBase数据库
(5).SQLServer数据库
1.Flink CDC介绍:

(1).概述

CDC是Chanage Data Capture(数据变动捕获)的简称。其焦点原理就是监测并捕获数据库的变动(例如增删改),将这些变动按照发生序次捕获,将捕获到的数据,通过一定的数据转换和清洗大概干系业务的整合,写入目标数据库。
Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地形貌他们的ETL管道逻辑,并资助用户自动生成自定义的Flink使用符并提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化,数据转换,完整的数据库同步和exactly-once语义, 与Apache Flink深度集成并由Apache Flink提供支持,Flink CDC提供:端到端数据集成框架, 为数据集成用户提供可轻松构建作业的API 和 Sink中对多表支持, 以及同步整个数据库, 模式演化的能力。
(2).Flink CDC Source

Flink CDC sources是Apache Flink 的一组源连接器,使用更改数据捕获(CDC)从差别数据库获取更改。一些CDC泉源集成Debezium作为捕获数据更改的引擎。此中支持的连接器包罗mysql-cdc, oceanbase-cdc, oracle-cdc, sqlserver-cdc等。
(3).版本支持

https://i-blog.csdnimg.cn/direct/d3b11890951e45fba3022e9f08c55532.png#pic_center
2.项目概述

(1).项目版本

JDK版本:JDK11
SpringBoot版本:2.6.6
SpringCloudAlibaba版本:2021.0.1.0
SpringCloud版本: 2021.0.1
FlinkCDC-Connector版本:3.0.1(部分兼容2.2.0版本)
(2).项目依赖

<dependencies>
      <!--*******Flink cdc-connector驱动*********-->
      <!--flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本-->
      <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-core</artifactId>
            <version>1.9.7.Final</version>
      </dependency>
      <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-oracle</artifactId>
            <version>1.9.7.Final</version>
      </dependency>
      <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-sqlserver</artifactId>
            <version>1.9.7.Final</version>
      </dependency>

<!--解决无法访问com.google.protobuf.GeneratedMessageV3 找不到com.google.protobuf.GeneratedMessageV-->
      <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.1</version>
      </dependency>

      <!--flink-sqlserver依赖-->
      <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>3.0.1</version>
      </dependency>
      <!--flink-oracle依赖-->
      <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <version>3.0.1</version>
      </dependency>
      <!--flink-oceanbase依赖-->
      <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oceanbase-cdc</artifactId>
            <version>3.0.1</version>
      </dependency>
      <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.0.1</version>
      </dependency>

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
            <exclusions>
                <!-- Exclude the included Guava as it conflicts with the Flink shaded guava -->
                <exclusion>
                  <groupId>com.google.guava</groupId>
                  <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
      </dependency>

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
      </dependency>
      <!--高版本兼容低版本,需要排除依赖 flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本-->
      <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.2.0</version>
            <exclusions>
                <exclusion>
                  <groupId>io.debezium</groupId>
                  <artifactId>debezium-core</artifactId>
                </exclusion>
                <exclusion>
                  <groupId>io.debezium</groupId>
                  <artifactId>debezium-connector-sqlserver</artifactId>
                </exclusion>
            </exclusions>
      </dependency>
      <!--高版本兼容低版本,需要排除依赖 需要排除依赖 flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本-->
      <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <version>2.2.0</version>
            <exclusions>
                <exclusion>
                  <groupId>io.debezium</groupId>
                  <artifactId>debezium-core</artifactId>
                </exclusion>
                <exclusion>
                  <groupId>io.debezium</groupId>
                  <artifactId>debezium-connector-oracle</artifactId>
                </exclusion>
            </exclusions>
      </dependency>
      <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.6</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>18.0-12.0</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.13.2</version>
      </dependency>
      <!--********************************数据库连接驱动********************************-->
      <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
      </dependency>
      <dependency>
            <groupId>com.oceanbase</groupId>
            <artifactId>oceanbase-client</artifactId>
            <version>2.4.3</version>
      </dependency>
      <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>sqljdbc4</artifactId>
            <version>4.0</version>
      </dependency>
      <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc11</artifactId>
            <version>23.3.0.23.09</version>
      </dependency>
      <!-- ********************************Spring相关******************************** -->
      <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.2</version>
      </dependency>
      <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.16</version>
      </dependency>
      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
      </dependency>
      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
      </dependency>
    </dependencies>

3.Flink CDC环境准备

(1).OceanBase数据库

数据库版本:

5.7.25-OceanBase_CE-v4.3.1.0
署理版本:

oblogproxy-2.0.2-100000012024060321.el7.x86_64
步调:

(1).部署安装数据库

按照以上版本安装OceanBase数据库,具体使用请参考官网网址:https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000818649
(2).安装署理服务oblogproxy

oblogproxy 是 OceanBase 数据库的增量日志署理服务。oblogproxy 支持实时增量链路接入和管理,方便应用接入 OceanBase 数据库的增量日志。同时支持在网络隔离时订阅增量日志。
安装oblogproxy-2.0.2署理,此中署理版本和OceanBase数据库的版本是有对应的,现在OceanBase4.3.1.0版本是最新的版本,对应oblogproxy署理版本是2.0.2,对应Flink CDC 版本是2.2.0及以上。
署理下载地址: https://obbusiness-private.oss-cn-shanghai.aliyuncs.com/download-center/opensource/oblogproxy/v2.0.2/oblogproxy-2.0.2-100000012024060321.el7.x86_64.rpm
下载完成后, 下面的使用均在 oblogproxy 项目目次中举行,oblogproxy 项目目次默以为 /usr/local/oblogproxy。oblogproxy 的配置文件默认放在 conf/conf.json。
需修改 conf/conf.json 的以下配置:(oblogproxy 需要配置用户的用户名和密码,用户必须是 OceanBase 的 sys 租户的用户才能连接。)
https://i-blog.csdnimg.cn/direct/14f8d9d40694418988a4053db4b00395.png#pic_center
此中加密的用户名和密码通过以下命令获取:
./bin/logproxy -x username
./bin/logproxy -x password
然后将结果分别保存到 conf/conf.json 文件的 ob_sys_username 和 ob_sys_password 配置项
https://i-blog.csdnimg.cn/direct/4ebc7afb4d6a4e68a66676520e35a2fd.png#pic_center
此中conf/conf.json中binlog mode选项默认false,不要设置为true,使用CDC模式下不要开启binlog模式,这样的话会启动报错。
(3).运行署理服务oblogproxy

先进入 oblogproxy 项目目次默以为 /usr/local/oblogproxy
cd /usr/local/oblogproxy
再通过以下命令启动服务。
./run.sh start
https://i-blog.csdnimg.cn/direct/80198c68636f4d15bd21533a80ab06e7.png#pic_center
可以通过日志目次查察启动状态:
cd /usr/local/oblogproxy/log
tail –f logproxy.log
https://i-blog.csdnimg.cn/direct/1ac2201ca94746bc908f68336e72cfa5.png#pic_center
(2).SQLServer数据库

数据库版本:

Microsoft SQL Server 2017 (RTM) - 14.0.1000.169 (X64)
Aug 22 2017 17:04:49
Copyright © 2017 Microsoft Corporation
Enterprise Edition (64-bit) on Windows 10 Enterprise 10.0 <X64> (Build 22621: ) (Hypervisor)
署理版本:

开启上述安装版本数据库下的SQLServerAgent署理服务,此服务是获取数据库变动信息的重要服务,不开启的话无法获取增量数据变动。
https://i-blog.csdnimg.cn/direct/47fe102840c448c5a32d69cafdec91fd.png#pic_center
步调:

(1).部署安装数据库

按照以上版本安装SQLServer数据库, FlinkCDC Connector最低要求SQLServer2017版本及以上,同时建议安装企业版,因为其他版本可能会有CPU内核的限定,这样倒霉于在线上大用户量并发的环境下获取数据变动,如果仅仅用于测试可以安装开辟版本。
https://i-blog.csdnimg.cn/direct/65b630b0ca374d70b6b9d02300bfafa0.png#pic_center
https://i-blog.csdnimg.cn/direct/c351db0d8ee2406daa9b98e7742ffd65.png#pic_center
https://i-blog.csdnimg.cn/direct/971a12ac52cf463db0693c39af6c7ce4.png#pic_center
https://i-blog.csdnimg.cn/direct/8ee4da3c53cc4fb1a37396ca837c8c21.png#pic_center
此处安装一定要选择混淆模式,而且设置sa用户的连接密码,企业级开辟以及FlinkCDC连接过程中也是需要提供数据库的用户和密码的,而且添加当前用户为SQLServer管理员。
https://i-blog.csdnimg.cn/direct/befa4354bb144653badd203ef08a6a76.png#pic_centerhttps://i-blog.csdnimg.cn/direct/4fdc4b0977bf4529bd9090571db8e0db.png#pic_centerhttps://i-blog.csdnimg.cn/direct/e1b59d7acaee4d958a1bce4835e49620.png#pic_center
(2).开启署理服务SQLServer Agent

https://i-blog.csdnimg.cn/direct/93b1470c833849d48c94cdc972f12776.png#pic_center
(3).开启CDC模式

对于要获取数据变动的数据库和表都要开启CDC模式
数据库开启CDC:
EXEC sys.sp_cdc_enable_db;
判定当前数据库(datagather)是否启动CDC成功:
SELECT is_cdc_enabled FROM sys.databases WHERE name = ‘datagather’;
https://i-blog.csdnimg.cn/direct/6e145569a1ba40409870e5ddd5095889.png#pic_center
开启当前数据库下表的CDC模式
EXEC sys.sp_cdc_enable_table
@source_schema = ‘dbo’,
@source_name = ‘a_a_test’,
@role_name = ‘cdc_role’;
– schema_name 是表所属的架构(schema)的名称。
– table_name 是要启用 CDC 跟踪的表的名称。
– cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。
https://i-blog.csdnimg.cn/direct/f4061f69f6d94b5fbad90433b2849e7d.png#pic_center
启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 干系的表中,您可以使用这些信息举行数据更改追踪和同步。
– 查询在当前数据库下全部的表:
SELECT * FROM INFORMATION_SCHEMA.TABLES
https://i-blog.csdnimg.cn/direct/d2e283536e6845fcaf8f09b2d788482c.png#pic_center
如果有多个数据库和多个表,按照以上使用逐一举行开启工作。
4.Flink CDC Connector整合SpringBoot

(1).目次布局

https://i-blog.csdnimg.cn/direct/25934aa22d5c4a5ebf27ebb9de16838c.png#pic_center
(2).版本阐明

此中version1和version2分别对应代码的版本,version1使用了Flink CDC Connector2.2.0版本,变动数据是在序列化器中获取的,version2使用了FlinkCDC Connector3.0.1(现在最新版本),变动数据是在自定义Sink中获取的,RichSinkFunction 提供了更多的功能和灵活性。它允许用户访问 Flink 运行时的上下文信息,如状态管理、计时器和广播变量等。此外,RichSinkFunction 还可以处理异步 I/O 使用,进步数据输出的效率。
(3).项目设计思路

version1

该版本是原始版本,使用的Connector的版本均为2.2.0,针对差别范例的数据库,配置差别的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取差别数据库范例的CDC连接器的配置信息。
https://i-blog.csdnimg.cn/direct/be271c7189c94fbcb1ff87105d880ae2.png#pic_center
https://i-blog.csdnimg.cn/direct/f644201692814b22989a61b7ebd12d78.png#pic_center
启动完成后,在差别范例的序列化器中获取到变动数据,封装成ConcurrentHashMap,根据变动的数据以及信息,按照范例创建异步任务拼接出insert, delete, update,使用范例的SQL语句,对目标库举行增删改的使用,完成数据的变动和高效使用。
https://i-blog.csdnimg.cn/direct/65cdf7153e984c52a1a0e9c59ec60e7a.png#pic_center
version2

该版本是完善的版本,使用的Connector的版本均为3.0.1,针对差别范例的数据库,配置差别的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取差别数据库范例的CDC连接器的配置信息,自定义Sink而且封装统一的变动实体来吸取数据而不是map,来获取CDC过程获取到的变动信息,而不是直接在序列化器中,这样更加高效而且使得处理变动数据和Flink CDC Connector的序列化器。
https://i-blog.csdnimg.cn/direct/5974f69763624085a2cc69de092404da.png#pic_center
https://i-blog.csdnimg.cn/direct/6a538f8899224225a90343be7d7c5fb1.png#pic_center
https://i-blog.csdnimg.cn/direct/01cc17e6d6184a1f88574749e647e20c.png#pic_center
由于差别数据库支持的SQL语法差别,联合高内聚低耦合的设计思想以及后期维护和可扩展性,具体变动业务通过策略模式和工厂模式联合,实现对差别范例的数据库组装新增, 删除,修改的SQL语句,通过反射调用,大大进步了应用程序的扩展性和可维护性。
https://i-blog.csdnimg.cn/direct/2b8b60b6cb6242c09c03f1da8f87c5b2.png#pic_center
https://i-blog.csdnimg.cn/direct/8a53f280bd3b434ea9ad2787bb532421.png#pic_center
https://i-blog.csdnimg.cn/direct/32ae4628901d444baebd5f72bbd7f876.png#pic_center
差别数据库范例的实今世码如下所示:
(4).OceanBase数据库

现在验证OceanBase数据库的数据变动获取,把变动结果显现在目标数据库SQLServer中,在application.just配置文件中把flink.version1.oceanbase改为true。
https://i-blog.csdnimg.cn/direct/da45c6c228874d758315353647f80eb1.png#pic_center
https://i-blog.csdnimg.cn/direct/1ba0fc030a834150a75c8d6a84993c61.png#pic_center
由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version1.oceanbase’) == ‘true’ && @environment.getProperty(‘flink.version2.oceanbase’) == ‘false’}”)
该条件注解的含义是flink.version1.oceanbase和flink.version2.oceanbase只有一个为true,另一个为false的时候才可以启动,这样的话每个范例数据库的CDC程序运行时只能同时存在一个。
https://i-blog.csdnimg.cn/direct/938ed9e3cf804fb195db0edd59a84360.png#pic_center
可以看到version1中的oceanbase范例的CDC连接器启动成功,现在往表中插入一条数据,我们看oceanbase cdc 的连接器连接数据库的信息
新增使用

我们在该库下的a_a_test表中添加一条数据:
https://i-blog.csdnimg.cn/direct/217c8242cb8e44db9a7a69c749c3f892.png#pic_center
在oceanbase的序列化器中可以看到添加的变动数据信息
https://i-blog.csdnimg.cn/direct/d6d392a8bdea46f6bde34d6ba6359878.png#pic_center
在我们的Runable接口中,可以看到针对目标数据库SQLServer,已经处理好了插入语句,语法是SQLServer的语法,如下图所示:
https://i-blog.csdnimg.cn/direct/f8873abbd0214b769b1e967e90c2d71c.png#pic_center
https://i-blog.csdnimg.cn/direct/924eb10f540544569df6af31cce2e192.png#pic_center
修改使用

https://i-blog.csdnimg.cn/direct/e648f5d90d60469c88455aa4b4f2e336.png#pic_center
https://i-blog.csdnimg.cn/direct/6543ba5796774bf88c037290f2c6984f.png#pic_centerhttps://i-blog.csdnimg.cn/direct/ae26aa5a48f7445bbc0614d154d36d95.png#pic_centerhttps://i-blog.csdnimg.cn/direct/3ff76f497db84bbbaedc6f59f6209f06.png#pic_center
删除使用

https://i-blog.csdnimg.cn/direct/5750718fb4d2428598a5b8192d837415.png#pic_center
https://i-blog.csdnimg.cn/direct/a78e08acc1054cff943ba7b211cc4a2e.png#pic_center
https://i-blog.csdnimg.cn/direct/e1b2faa732214bcea12201ca94363962.png#pic_center
https://i-blog.csdnimg.cn/direct/069fcb8cb4f44566875424105d7cfe5b.png#pic_center
version2和version1的结果一致,此处不在演示.
(5).SQLServer数据库

https://i-blog.csdnimg.cn/direct/b7c3680802d141e9a74e908e4f9548a7.png#pic_center
现在验证SQLServer数据库的数据变动获取,把变动结果显现在目标数据库SQLServer中,在application.just配置文件中把flink.version2.sqlserver改为true
https://i-blog.csdnimg.cn/direct/a9f973d10aff458c9698f3d340ff52f9.png#pic_center
https://i-blog.csdnimg.cn/direct/9ad2ca756c144897972e7bc707f5fa9a.png#pic_center
由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version2.sqlserver’) == ‘true’ && @environment.getProperty(‘flink.version1.sqlserver’) == ‘false’}”)该条件注解的含义是flink.version1.sqlserver和flink.version2.sqlserver只有一个为true,另一个为false的时候才可以启动,这样的话每个范例数据库的CDC程序运行时只能同时存在一个
https://i-blog.csdnimg.cn/direct/227c78eef90040adae6a9073dd2db92e.png#pic_center
可以看到version2中的sqlserver范例的CDC连接器启动成功
SQLServer的验证和OceanBase一样,大家可以自行验证哦
以上是使用FlinkCDC整合SpringBoot获取变动数据的分享,希望可以给大家提供思路和资助

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink CDC使用数据库获取变动数据