IT评测·应用市场-qidao123.com

标题: Flink CDC使用数据库获取变动数据 [打印本页]

作者: 立山    时间: 2024-8-28 18:12
标题: Flink CDC使用数据库获取变动数据
Flink CDC整合SpringBoot获取变动数据

Flink CDC整合SpringBoot获取变动数据
1.Flink CDC介绍:
(1).概述
(2).Flink CDC Source
(3).版本支持
2.项目概述
(1).项目位置
(2).项目版本
(3).项目依赖
3.Flink CDC环境准备
(1).OceanBase数据库
(2).SQLServer数据库
4.Flink CDC Connector整合SpringBoot
(1).目次布局
(2).版本阐明
(3).项目设计思路
(4).OceanBase数据库
(5).SQLServer数据库
1.Flink CDC介绍:

(1).概述

CDC是Chanage Data Capture(数据变动捕获)的简称。其焦点原理就是监测并捕获数据库的变动(例如增删改),将这些变动按照发生序次捕获,将捕获到的数据,通过一定的数据转换和清洗大概干系业务的整合,写入目标数据库。
Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地形貌他们的ETL管道逻辑,并资助用户自动生成自定义的Flink使用符并提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化,数据转换,完整的数据库同步和exactly-once语义, 与Apache Flink深度集成并由Apache Flink提供支持,Flink CDC提供:端到端数据集成框架, 为数据集成用户提供可轻松构建作业的API 和 Sink中对多表支持, 以及同步整个数据库, 模式演化的能力。
(2).Flink CDC Source

Flink CDC sources是Apache Flink 的一组源连接器,使用更改数据捕获(CDC)从差别数据库获取更改。一些CDC泉源集成Debezium作为捕获数据更改的引擎。此中支持的连接器包罗mysql-cdc, oceanbase-cdc, oracle-cdc, sqlserver-cdc等。
(3).版本支持


2.项目概述

(1).项目版本

JDK版本:JDK11
SpringBoot版本:2.6.6
SpringCloudAlibaba版本:2021.0.1.0
SpringCloud版本: 2021.0.1
FlinkCDC-Connector版本:3.0.1(部分兼容2.2.0版本)
(2).项目依赖

  1. <dependencies>
  2.         <!--*******Flink cdc-connector驱动*********-->
  3.         <!--flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本-->
  4.         <dependency>
  5.             <groupId>io.debezium</groupId>
  6.             <artifactId>debezium-core</artifactId>
  7.             <version>1.9.7.Final</version>
  8.         </dependency>
  9.         <dependency>
  10.             <groupId>io.debezium</groupId>
  11.             <artifactId>debezium-connector-oracle</artifactId>
  12.             <version>1.9.7.Final</version>
  13.         </dependency>
  14.         <dependency>
  15.             <groupId>io.debezium</groupId>
  16.             <artifactId>debezium-connector-sqlserver</artifactId>
  17.             <version>1.9.7.Final</version>
  18.         </dependency>
  19. <!--解决无法访问com.google.protobuf.GeneratedMessageV3 找不到com.google.protobuf.GeneratedMessageV-->
  20.         <dependency>
  21.             <groupId>com.google.protobuf</groupId>
  22.             <artifactId>protobuf-java</artifactId>
  23.             <version>3.5.1</version>
  24.         </dependency>
  25.         <!--  flink-sqlserver依赖  -->
  26.         <dependency>
  27.             <groupId>com.ververica</groupId>
  28.             <artifactId>flink-connector-sqlserver-cdc</artifactId>
  29.             <version>3.0.1</version>
  30.         </dependency>
  31.         <!--  flink-oracle依赖  -->
  32.         <dependency>
  33.             <groupId>com.ververica</groupId>
  34.             <artifactId>flink-connector-oracle-cdc</artifactId>
  35.             <version>3.0.1</version>
  36.         </dependency>
  37.         <!--  flink-oceanbase依赖  -->
  38.         <dependency>
  39.             <groupId>com.ververica</groupId>
  40.             <artifactId>flink-connector-oceanbase-cdc</artifactId>
  41.             <version>3.0.1</version>
  42.         </dependency>
  43.         <dependency>
  44.             <groupId>com.ververica</groupId>
  45.             <artifactId>flink-connector-mysql-cdc</artifactId>
  46.             <version>3.0.1</version>
  47.         </dependency>
  48.         <dependency>
  49.             <groupId>org.apache.flink</groupId>
  50.             <artifactId>flink-streaming-java_2.12</artifactId>
  51.             <version>1.12.0</version>
  52.             <exclusions>
  53.                 <!-- Exclude the included Guava as it conflicts with the Flink shaded guava -->
  54.                 <exclusion>
  55.                     <groupId>com.google.guava</groupId>
  56.                     <artifactId>guava</artifactId>
  57.                 </exclusion>
  58.             </exclusions>
  59.         </dependency>
  60.         <dependency>
  61.             <groupId>org.apache.flink</groupId>
  62.             <artifactId>flink-java</artifactId>
  63.             <version>1.12.0</version>
  64.         </dependency>
  65.         <dependency>
  66.             <groupId>org.apache.flink</groupId>
  67.             <artifactId>flink-clients_2.12</artifactId>
  68.             <version>1.12.0</version>
  69.         </dependency>
  70.         <!--  高版本兼容低版本,需要排除依赖 flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本-->
  71.         <dependency>
  72.             <groupId>com.ververica</groupId>
  73.             <artifactId>flink-connector-sqlserver-cdc</artifactId>
  74.             <version>2.2.0</version>
  75.             <exclusions>
  76.                 <exclusion>
  77.                     <groupId>io.debezium</groupId>
  78.                     <artifactId>debezium-core</artifactId>
  79.                 </exclusion>
  80.                 <exclusion>
  81.                     <groupId>io.debezium</groupId>
  82.                     <artifactId>debezium-connector-sqlserver</artifactId>
  83.                 </exclusion>
  84.             </exclusions>
  85.         </dependency>
  86.         <!--  高版本兼容低版本,需要排除依赖 需要排除依赖 flinkcdc3.0.1与2.2.0版本共存,需要排除低版本的debezium-core和debezium-connector手动指定高版本-->
  87.         <dependency>
  88.             <groupId>com.ververica</groupId>
  89.             <artifactId>flink-connector-oracle-cdc</artifactId>
  90.             <version>2.2.0</version>
  91.             <exclusions>
  92.                 <exclusion>
  93.                     <groupId>io.debezium</groupId>
  94.                     <artifactId>debezium-core</artifactId>
  95.                 </exclusion>
  96.                 <exclusion>
  97.                     <groupId>io.debezium</groupId>
  98.                     <artifactId>debezium-connector-oracle</artifactId>
  99.                 </exclusion>
  100.             </exclusions>
  101.         </dependency>
  102.         <dependency>
  103.             <groupId>org.apache.hadoop</groupId>
  104.             <artifactId>hadoop-client</artifactId>
  105.             <version>3.1.3</version>
  106.         </dependency>
  107.         <dependency>
  108.             <groupId>org.apache.flink</groupId>
  109.             <artifactId>flink-table-planner-blink_2.12</artifactId>
  110.             <version>1.13.6</version>
  111.         </dependency>
  112.         <dependency>
  113.             <groupId>org.apache.flink</groupId>
  114.             <artifactId>flink-shaded-guava</artifactId>
  115.             <version>18.0-12.0</version>
  116.         </dependency>
  117.         <dependency>
  118.             <groupId>org.apache.flink</groupId>
  119.             <artifactId>flink-runtime-web_2.12</artifactId>
  120.             <version>1.13.2</version>
  121.         </dependency>
  122.         <!--********************************数据库连接驱动********************************-->
  123.         <dependency>
  124.             <groupId>mysql</groupId>
  125.             <artifactId>mysql-connector-java</artifactId>
  126.         </dependency>
  127.         <dependency>
  128.             <groupId>com.oceanbase</groupId>
  129.             <artifactId>oceanbase-client</artifactId>
  130.             <version>2.4.3</version>
  131.         </dependency>
  132.         <dependency>
  133.             <groupId>com.microsoft.sqlserver</groupId>
  134.             <artifactId>sqljdbc4</artifactId>
  135.             <version>4.0</version>
  136.         </dependency>
  137.         <dependency>
  138.             <groupId>com.oracle.database.jdbc</groupId>
  139.             <artifactId>ojdbc11</artifactId>
  140.             <version>23.3.0.23.09</version>
  141.         </dependency>
  142.         <!-- ********************************Spring相关******************************** -->
  143.         <dependency>
  144.             <groupId>org.mybatis.spring.boot</groupId>
  145.             <artifactId>mybatis-spring-boot-starter</artifactId>
  146.             <version>1.3.2</version>
  147.         </dependency>
  148.         <dependency>
  149.             <groupId>com.alibaba</groupId>
  150.             <artifactId>druid-spring-boot-starter</artifactId>
  151.             <version>1.2.16</version>
  152.         </dependency>
  153.         <dependency>
  154.             <groupId>org.springframework.boot</groupId>
  155.             <artifactId>spring-boot-starter-jdbc</artifactId>
  156.         </dependency>
  157.         <dependency>
  158.             <groupId>org.springframework.boot</groupId>
  159.             <artifactId>spring-boot-starter-validation</artifactId>
  160.         </dependency>
  161.     </dependencies>
复制代码
3.Flink CDC环境准备

(1).OceanBase数据库

数据库版本:

5.7.25-OceanBase_CE-v4.3.1.0
署理版本:

oblogproxy-2.0.2-100000012024060321.el7.x86_64
步调:

(1).部署安装数据库

按照以上版本安装OceanBase数据库,具体使用请参考官网网址:https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000000818649
(2).安装署理服务oblogproxy

oblogproxy 是 OceanBase 数据库的增量日志署理服务。oblogproxy 支持实时增量链路接入和管理,方便应用接入 OceanBase 数据库的增量日志。同时支持在网络隔离时订阅增量日志。
安装oblogproxy-2.0.2署理,此中署理版本和OceanBase数据库的版本是有对应的,现在OceanBase4.3.1.0版本是最新的版本,对应oblogproxy署理版本是2.0.2,对应Flink CDC 版本是2.2.0及以上。
署理下载地址: https://obbusiness-private.oss-cn-shanghai.aliyuncs.com/download-center/opensource/oblogproxy/v2.0.2/oblogproxy-2.0.2-100000012024060321.el7.x86_64.rpm
下载完成后, 下面的使用均在 oblogproxy 项目目次中举行,oblogproxy 项目目次默以为 /usr/local/oblogproxy。oblogproxy 的配置文件默认放在 conf/conf.json。
需修改 conf/conf.json 的以下配置:(oblogproxy 需要配置用户的用户名和密码,用户必须是 OceanBase 的 sys 租户的用户才能连接。)

此中加密的用户名和密码通过以下命令获取:
./bin/logproxy -x username
./bin/logproxy -x password
然后将结果分别保存到 conf/conf.json 文件的 ob_sys_username 和 ob_sys_password 配置项

此中conf/conf.json中binlog mode选项默认false,不要设置为true,使用CDC模式下不要开启binlog模式,这样的话会启动报错。
(3).运行署理服务oblogproxy

先进入 oblogproxy 项目目次默以为 /usr/local/oblogproxy
cd /usr/local/oblogproxy
再通过以下命令启动服务。
./run.sh start

可以通过日志目次查察启动状态:
cd /usr/local/oblogproxy/log
tail –f logproxy.log

(2).SQLServer数据库

数据库版本:

Microsoft SQL Server 2017 (RTM) - 14.0.1000.169 (X64)
Aug 22 2017 17:04:49
Copyright © 2017 Microsoft Corporation
Enterprise Edition (64-bit) on Windows 10 Enterprise 10.0 <X64> (Build 22621: ) (Hypervisor)
署理版本:

开启上述安装版本数据库下的SQLServerAgent署理服务,此服务是获取数据库变动信息的重要服务,不开启的话无法获取增量数据变动。

步调:

(1).部署安装数据库

按照以上版本安装SQLServer数据库, FlinkCDC Connector最低要求SQLServer2017版本及以上,同时建议安装企业版,因为其他版本可能会有CPU内核的限定,这样倒霉于在线上大用户量并发的环境下获取数据变动,如果仅仅用于测试可以安装开辟版本。




此处安装一定要选择混淆模式,而且设置sa用户的连接密码,企业级开辟以及FlinkCDC连接过程中也是需要提供数据库的用户和密码的,而且添加当前用户为SQLServer管理员。

(2).开启署理服务SQLServer Agent


(3).开启CDC模式

对于要获取数据变动的数据库和表都要开启CDC模式
数据库开启CDC:
EXEC sys.sp_cdc_enable_db;
判定当前数据库(datagather)是否启动CDC成功:
SELECT is_cdc_enabled FROM sys.databases WHERE name = ‘datagather’;

开启当前数据库下表的CDC模式
EXEC sys.sp_cdc_enable_table
@source_schema = ‘dbo’,
@source_name = ‘a_a_test’,
@role_name = ‘cdc_role’;
– schema_name 是表所属的架构(schema)的名称。
– table_name 是要启用 CDC 跟踪的表的名称。
– cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。

启用 CDC 后,SQL Server 将自动跟踪启用了 CDC 的表上的数据更改,并将更改信息存储在 CDC 干系的表中,您可以使用这些信息举行数据更改追踪和同步。
– 查询在当前数据库下全部的表:
SELECT * FROM INFORMATION_SCHEMA.TABLES

如果有多个数据库和多个表,按照以上使用逐一举行开启工作。
4.Flink CDC Connector整合SpringBoot

(1).目次布局


(2).版本阐明

此中version1和version2分别对应代码的版本,version1使用了Flink CDC Connector2.2.0版本,变动数据是在序列化器中获取的,version2使用了FlinkCDC Connector3.0.1(现在最新版本),变动数据是在自定义Sink中获取的,RichSinkFunction 提供了更多的功能和灵活性。它允许用户访问 Flink 运行时的上下文信息,如状态管理、计时器和广播变量等。此外,RichSinkFunction 还可以处理异步 I/O 使用,进步数据输出的效率。
(3).项目设计思路

version1

该版本是原始版本,使用的Connector的版本均为2.2.0,针对差别范例的数据库,配置差别的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取差别数据库范例的CDC连接器的配置信息。


启动完成后,在差别范例的序列化器中获取到变动数据,封装成ConcurrentHashMap,根据变动的数据以及信息,按照范例创建异步任务拼接出insert, delete, update,使用范例的SQL语句,对目标库举行增删改的使用,完成数据的变动和高效使用。

version2

该版本是完善的版本,使用的Connector的版本均为3.0.1,针对差别范例的数据库,配置差别的启动监听器,在项目启动时候,通过读取配置文件flink.properties来获取差别数据库范例的CDC连接器的配置信息,自定义Sink而且封装统一的变动实体来吸取数据而不是map,来获取CDC过程获取到的变动信息,而不是直接在序列化器中,这样更加高效而且使得处理变动数据和Flink CDC Connector的序列化器。



由于差别数据库支持的SQL语法差别,联合高内聚低耦合的设计思想以及后期维护和可扩展性,具体变动业务通过策略模式和工厂模式联合,实现对差别范例的数据库组装新增, 删除,修改的SQL语句,通过反射调用,大大进步了应用程序的扩展性和可维护性。



差别数据库范例的实今世码如下所示:
(4).OceanBase数据库

现在验证OceanBase数据库的数据变动获取,把变动结果显现在目标数据库SQLServer中,在application.just配置文件中把flink.version1.oceanbase改为true。


由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version1.oceanbase’) == ‘true’ && @environment.getProperty(‘flink.version2.oceanbase’) == ‘false’}”)
该条件注解的含义是flink.version1.oceanbase和flink.version2.oceanbase只有一个为true,另一个为false的时候才可以启动,这样的话每个范例数据库的CDC程序运行时只能同时存在一个。

可以看到version1中的oceanbase范例的CDC连接器启动成功,现在往表中插入一条数据,我们看oceanbase cdc 的连接器连接数据库的信息
新增使用

我们在该库下的a_a_test表中添加一条数据:

在oceanbase的序列化器中可以看到添加的变动数据信息

在我们的Runable接口中,可以看到针对目标数据库SQLServer,已经处理好了插入语句,语法是SQLServer的语法,如下图所示:


修改使用



删除使用





version2和version1的结果一致,此处不在演示.
(5).SQLServer数据库


现在验证SQLServer数据库的数据变动获取,把变动结果显现在目标数据库SQLServer中,在application.just配置文件中把flink.version2.sqlserver改为true


由条件注解来控制@ConditionalOnExpression(“#{@environment.getProperty(‘flink.version2.sqlserver’) == ‘true’ && @environment.getProperty(‘flink.version1.sqlserver’) == ‘false’}”)该条件注解的含义是flink.version1.sqlserver和flink.version2.sqlserver只有一个为true,另一个为false的时候才可以启动,这样的话每个范例数据库的CDC程序运行时只能同时存在一个

可以看到version2中的sqlserver范例的CDC连接器启动成功
SQLServer的验证和OceanBase一样,大家可以自行验证哦
以上是使用FlinkCDC整合SpringBoot获取变动数据的分享,希望可以给大家提供思路和资助

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4