SpringBoot集成Flink CDC实现binlog监听

打印 上一主题 下一主题

主题 905|帖子 905|积分 2715

Flink CDC

CDC相关介绍

CDC是什么?

CDC是Change Data Capture(变更数据获取)的简称。焦颔首脑是,监测并捕获数据库的变更(包罗数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完备记载下来,写入到MQ以供其他服务进行订阅及消费
CDC分类

CDC主要分为基于查询基于Binlog
基于查询基于Binlog开源产品Sqoop、DataXCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获全部数据变化否是延迟性高延迟低延迟是否增长数据库压力是否 基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也由于这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到
Flink CDC

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的source组件。
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
Java中集成Flink CDC

MySQL相关设置

执行初始化SQL数据

  1. # 创建test数据库
  2. create database test;
  3. # 在test库中创建studnet, t1, t2, t3表, 插入数据
  4. use test;
  5. CREATE TABLE `student`  (
  6.   `id` int(11) NOT NULL AUTO_INCREMENT,
  7.   `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  8.   `age` int(11) NULL DEFAULT NULL,
  9.   PRIMARY KEY (`id`) USING BTREE
  10. ) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
  11. INSERT INTO `student` VALUES (1, 'joy', 18);
  12. INSERT INTO `student` VALUES (2, 'tom', 123123);
  13. CREATE TABLE t1(
  14.     `id` VARCHAR(255) PRIMARY KEY,
  15.     `name` VARCHAR(255)
  16. );
  17. CREATE TABLE t2(
  18.     `id` VARCHAR(255) PRIMARY KEY,
  19.     `name` VARCHAR(255)
  20. );
  21. CREATE TABLE t3(
  22.     `id` VARCHAR(255) PRIMARY KEY,
  23.     `name` VARCHAR(255)
  24. );
  25. # 创建test_route数据库
  26. create database test_route;
  27. # 在test_route库中创建t1, t2, t3表
  28. use test_route;
  29. CREATE TABLE t1(
  30.     `id` VARCHAR(255) PRIMARY KEY,
  31.     `name` VARCHAR(255)
  32. );
  33. CREATE TABLE t2(
  34.     `id` VARCHAR(255) PRIMARY KEY,
  35.     `name` VARCHAR(255)
  36. );
  37. CREATE TABLE t3(
  38.     `id` VARCHAR(255) PRIMARY KEY,
  39.     `name` VARCHAR(255)
  40. );
  41. # 在test_route数据库中的t1, t2, t3表插入数据
  42. use test_route;
  43. INSERT INTO t1 VALUES('1001','zhangsan');
  44. INSERT INTO t1 VALUES('1002','lisi');
  45. INSERT INTO t1 VALUES('1003','wangwu');
  46. INSERT INTO t2 VALUES('1004','zhangsan');
  47. INSERT INTO t2 VALUES('1005','lisi');
  48. INSERT INTO t2 VALUES('1006','wangwu');
  49. INSERT INTO t3 VALUES('1001','F');
  50. INSERT INTO t3 VALUES('1002','F');
  51. INSERT INTO t3 VALUES('1003','M');
复制代码
开启Binlog

通常来说默认安装MySQL的cnf都是存在/etc下的
  1. sudo vim /etc/my.cnf
复制代码
  1. # 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
  2. # 数据库id
  3. server-id = 1
  4. # 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
  5. default-time-zone = '+8:00'
  6. # 启动binlog,该参数的值会作为binlog的文件名
  7. log-bin=mysql-bin
  8. # binlog类型,maxwell要求为row类型
  9. binlog_format=row
  10. # 启用binlog的数据库,需根据实际情况作出修改
  11. binlog-do-db=test
  12. binlog-do-db=test_route
复制代码
修改数据库时区

永世修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)
  1. default-time-zone = '+8:00'
复制代码
临时修改(重启会丢失)
  1. # MySQL 8 执行这个
  2. set persist time_zone='+8:00';
  3. # MySQL 5.x版本执行这个
  4. set time_zone='+8:00';
复制代码
重启MySQL

   留意了, 设置后必要重启MySQL!
  1. service mysqld restart
复制代码
代码

相关依赖

  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter-web</artifactId>
  5.     </dependency>
  6.     <!-- Flink CDC依赖 start-->
  7.     <!-- Flink核心依赖, 提供了Flink的核心API -->
  8.     <dependency>
  9.         <groupId>org.apache.flink</groupId>
  10.         <artifactId>flink-java</artifactId>
  11.         <version>${flink.version}</version>
  12.     </dependency>
  13.     <!--  Flink流处理Java API依赖 -->
  14.     <dependency>
  15.         <groupId>org.apache.flink</groupId>
  16.         <artifactId>flink-streaming-java</artifactId>
  17.         <version>${flink.version}</version>
  18.     </dependency>
  19.     <!-- Flink客户端工具依赖, 包含命令行界面和实用函数 -->
  20.     <dependency>
  21.         <groupId>org.apache.flink</groupId>
  22.         <artifactId>flink-clients</artifactId>
  23.         <version>${flink.version}</version>
  24.     </dependency>
  25.     <!-- Flink连接器基础包, 包含连接器公共功能 -->
  26.     <dependency>
  27.         <groupId>org.apache.flink</groupId>
  28.         <artifactId>flink-connector-base</artifactId>
  29.         <version>${flink.version}</version>
  30.     </dependency>
  31.     <!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 -->
  32.     <!--<dependency>
  33.             <groupId>org.apache.flink</groupId>
  34.             <artifactId>flink-connector-kafka</artifactId>
  35.             <version>3.2.0-1.19</version>
  36.         </dependency>-->
  37.     <!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 -->
  38.     <dependency>
  39.         <groupId>org.apache.flink</groupId>
  40.         <artifactId>flink-table-planner_2.12</artifactId>
  41.         <version>${flink.version}</version>
  42.     </dependency>
  43.     <!-- Flink Table API桥接器, 连接DataStream API和Table API -->
  44.     <dependency>
  45.         <groupId>org.apache.flink</groupId>
  46.         <artifactId>flink-table-api-java-bridge</artifactId>
  47.         <version>${flink.version}</version>
  48.     </dependency>
  49.     <!-- Flink JSON格式化数据依赖 -->
  50.     <dependency>
  51.         <groupId>org.apache.flink</groupId>
  52.         <artifactId>flink-json</artifactId>
  53.         <version>${flink.version}</version>
  54.     </dependency>
  55.     <!-- 开启Web UI支持, 端口为8081, 默认为不开启-->
  56.     <!--<dependency>
  57.             <groupId>org.apache.flink</groupId>
  58.             <artifactId>flink-runtime-web</artifactId>
  59.             <version>1.19.1</version>
  60.         </dependency>-->
  61.     <!-- MySQL CDC依赖
  62.         org.apache.flink的适用MySQL 8.0
  63.          具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408
  64.          -->
  65.     <dependency>
  66.         <!--MySQL 8.0适用-->
  67.         <!--<groupId>org.apache.flink</groupId>
  68.             <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  69.             <version>3.1.0</version>-->
  70.         <!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用-->
  71.         <groupId>com.ververica</groupId>
  72.         <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  73.         <!--<version>2.3.0</version>-->
  74.         <version>3.0.1</version>
  75.     </dependency>
  76.     <!-- lombok -->
  77.     <dependency>
  78.         <groupId>org.projectlombok</groupId>
  79.         <artifactId>lombok</artifactId>
  80.     </dependency>
  81.     <!-- gson工具类 -->
  82.     <dependency>
  83.         <groupId>com.google.code.gson</groupId>
  84.         <artifactId>gson</artifactId>
  85.         <version>2.11.0</version>
  86.     </dependency>
  87.     <!-- ognl表达式 -->
  88.     <dependency>
  89.         <groupId>ognl</groupId>
  90.         <artifactId>ognl</artifactId>
  91.         <version>3.1.1</version>
  92.     </dependency>
  93.     <!-- hutool工具类 -->
  94.     <dependency>
  95.         <groupId>cn.hutool</groupId>
  96.         <artifactId>hutool-all</artifactId>
  97.         <version>5.8.26</version>
  98.     </dependency>
  99.     <dependency>
  100.         <groupId>com.alibaba</groupId>
  101.         <artifactId>fastjson</artifactId>
  102.         <version>2.0.31</version>
  103.     </dependency>
  104. </dependencies>
  105. <name>cdc-test</name>
  106. <description>cdc-test</description>
  107. <properties>
  108.     <java.version>11</java.version>
  109.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  110.     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  111.     <spring-boot.version>2.6.13</spring-boot.version>
  112.     <flink.version>1.19.0</flink.version>
  113. </properties>
  114. <dependencyManagement>
  115.     <dependencies>
  116.         <dependency>
  117.             <groupId>org.springframework.boot</groupId>
  118.             <artifactId>spring-boot-dependencies</artifactId>
  119.             <version>${spring-boot.version}</version>
  120.             <type>pom</type>
  121.             <scope>import</scope>
  122.         </dependency>
  123.     </dependencies>
  124. </dependencyManagement>
复制代码
yaml

  1. # Flink CDC相关配置
  2. flink-cdc:
  3.   mysql:
  4.     hostname: 192.168.132.101
  5.     port: 3306
  6.     username: root
  7.     password: 12345678
  8.     databaseList: test
  9.     tableList: test.student, test.t1
  10.     includeSchemaChanges: false
  11.     parallelism: 1
  12.     enableCheckpointing: 5000
复制代码
FlinkCDCConfig

  1. import lombok.Data;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * @author whiteBrocade
  6. * @version 1.0
  7. * @description: Flink CDC配置
  8. */
  9. @Data
  10. @Configuration
  11. @ConfigurationProperties("flink-cdc.mysql")
  12. public class FlinkCDCConfig {
  13.     /**
  14.      * 数据库地址
  15.      */
  16.     private String hostname;
  17.     /**
  18.      * 数据库端口
  19.      */
  20.     private Integer port;
  21.     /**
  22.      * 数据库用户名
  23.      */
  24.     private String username;
  25.     /**
  26.      * 数据库密码
  27.      */
  28.     private String password;
  29.     /**
  30.      * 数据库名
  31.      */
  32.     private String[] databaseList;
  33.     /**
  34.      * 表名
  35.      */
  36.     private String[] tableList;
  37.     /**
  38.      * 是否包含schema变更
  39.      */
  40.     private Boolean includeSchemaChanges;
  41.     /**
  42.      * 并行度
  43.      */
  44.     private Integer parallelism;
  45.     /**
  46.      * 检查点间隔, 单位毫秒
  47.      */
  48.     private Integer enableCheckpointing;
  49. }
复制代码
相关摆列

OperatorTypeEnum

  1. import lombok.AllArgsConstructor;
  2. import lombok.Getter;
  3. /**
  4. * @author whiteBrocade
  5. * @version 1.0
  6. * @description 操作类型枚举
  7. */
  8. @Getter
  9. @AllArgsConstructor
  10. public enum OperatorTypeEnum {
  11.     /**
  12.      * 新增
  13.      */
  14.     INSERT(1),
  15.     /**
  16.      * 修改
  17.      */
  18.     UPDATE(2),
  19.     /**
  20.      * 删除
  21.      */
  22.     DELETE(3),
  23.     ;
  24.     private final int type;
  25. }
复制代码
StrategyEnum

  1. import cn.hutool.core.bean.BeanUtil;
  2. import cn.hutool.core.util.ObjUtil;
  3. import cn.hutool.json.JSONObject;
  4. import cn.hutool.json.JSONUtil;
  5. import lombok.AllArgsConstructor;
  6. import lombok.Getter;
  7. import java.beans.Introspector;
  8. /**
  9. * @author whiteBrocade
  10. * @version 1.0
  11. * @description 表处理策略
  12. * todo 后续在这里新增相关枚举即可
  13. */
  14. @Getter
  15. @AllArgsConstructor
  16. public enum StrategyEnum {
  17.     /**
  18.      * Student处理策略
  19.      */
  20.     STUDENT("student", Student.class, Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),
  21.     ;
  22.     /**
  23.      * 表名
  24.      */
  25.     private final String tableName;
  26.     /**
  27.      * class对象
  28.      */
  29.     private final Class<?> varClass;
  30.     /**
  31.      * 处理器名
  32.      */
  33.     private final String handlerName;
  34.     /**
  35.      * 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回
  36.      *
  37.      * @param dataChangeInfo 数据变更对象
  38.      * @return StrategyHandlerSelector
  39.      */
  40.     public static StrategyHandleSelector getSelector(DataChangeInfo dataChangeInfo) {
  41.         if (ObjUtil.isNull(dataChangeInfo)) {
  42.             return null;
  43.         }
  44.         String tableName = dataChangeInfo.getTableName();
  45.         StrategyHandleSelector selector = new StrategyHandleSelector();
  46.         // 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略
  47.         for (StrategyEnum strategyEnum : values()) {
  48.             // 如果找到匹配的策略, 创建并配置 StrategyHandleSelector
  49.             if (strategyEnum.getTableName().equals(tableName)) {
  50.                 selector.setHandlerName(strategyEnum.handlerName);
  51.                 selector.setOperatorTime(dataChangeInfo.getOperatorTime());
  52.                 selector.setOperatorType(dataChangeInfo.getOperatorType());
  53.                 JSONObject jsonObject = JSONUtil.parseObj(dataChangeInfo.getData());
  54.                 selector.setData(BeanUtil.copyProperties(jsonObject, strategyEnum.varClass));
  55.                 return selector;
  56.             }
  57.         }
  58.         return null;
  59.     }
  60. }
复制代码
pojo

Student

  1. import lombok.Data;
  2. /**
  3. * @author whiteBrocade
  4. * @version 1.0
  5. * @description 学生类, 用于演示
  6. */
  7. @Data
  8. public class Student {
  9.     /**
  10.      * id
  11.      */
  12.     private Integer id;
  13.     /**
  14.      * 姓名
  15.      */
  16.     private String name;
  17.     /**
  18.      * 年龄
  19.      */
  20.     private Integer age;
  21. }
复制代码
DataChangeInfo

  1. import lombok.Data;
  2. /**
  3. * @author whiteBrocade
  4. * @version 1.0
  5. * @description 数据变更对象
  6. */
  7. @Data
  8. public class DataChangeInfo {
  9.     /**
  10.      * 变更前数据
  11.      */
  12.     private String beforeData;
  13.     /**
  14.      * 变更后数据
  15.      */
  16.     private String afterData;
  17.     /**
  18.      * 操作的数据
  19.      */
  20.     private String data;
  21.     /**
  22.      * 变更类型 1->新增 2->修改 3->删除
  23.      */
  24.     private Integer operatorType;
  25.     /**
  26.      * binlog文件名
  27.      */
  28.     private String fileName;
  29.     /**
  30.      * binlog当前读取点位
  31.      */
  32.     private Integer filePos;
  33.     /**
  34.      * 数据库名
  35.      */
  36.     private String database;
  37.     /**
  38.      * 表名
  39.      */
  40.     private String tableName;
  41.     /**
  42.      * 变更时间
  43.      */
  44.     private Long operatorTime;
  45. }
复制代码
自界说Sink

DataChangeSink

  1. import cn.hutool.core.util.ObjUtil;
  2. import lombok.AllArgsConstructor;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.flink.api.common.functions.OpenContext;
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  6. import org.springframework.stereotype.Component;
  7. import java.io.Serializable;
  8. import java.util.Map;
  9. /**
  10. * @author whiteBrocade
  11. * @version 1.0
  12. * @description
  13. */
  14. @Slf4j
  15. @Component
  16. @AllArgsConstructor
  17. public class DataChangeSink extends RichSinkFunction<DataChangeInfo> implements Serializable {
  18.     /**
  19.      * BaseLogHandler相关的缓存
  20.      * Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中
  21.      */
  22.     private final Map<String, BaseLogHandler> strategyHandlerMap;
  23.     @Override
  24.     public void invoke(DataChangeInfo value, Context context) {
  25.         log.info("收到变更原始数据:{}", value);
  26.         // 选择策略
  27.         StrategyHandleSelector selector = StrategyEnum.getSelector(value);
  28.         if (ObjUtil.isNull(selector)) {
  29.             return;
  30.         }
  31.         BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getHandlerName());
  32.         // insert操作
  33.         if (selector.getOperatorType().equals(OperatorTypeEnum.INSERT.getType())) {
  34.             handler.handleInsertLog(selector.getData(), selector.getOperatorTime());
  35.             return;
  36.         }
  37.         // update操作
  38.         if (selector.getOperatorType().equals(OperatorTypeEnum.UPDATE.getType())) {
  39.             handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());
  40.             return;
  41.         }
  42.         // delete操作
  43.         if (selector.getOperatorType().equals(OperatorTypeEnum.DELETE.getType())) {
  44.             handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());
  45.         }
  46.     }
  47.     /**
  48.      * 前置操作
  49.      */
  50.     @Override
  51.     public void open(OpenContext openContext) throws Exception {
  52.         super.open(openContext);
  53.     }
  54.     /**
  55.      * 后置操作
  56.      */
  57.     @Override
  58.     public void close() throws Exception {
  59.         super.close();
  60.     }
  61. }
复制代码
CustomSink

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.flink.api.common.functions.OpenContext;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author whiteBrocade
  7. * @version 1.0
  8. * @description 自定义Sink处理器, 这个是根据ognl表达式区分ddl语句类型, 搭配
  9. */
  10. @Slf4j
  11. @Component
  12. public class CustomSink extends RichSinkFunction<String> {
  13.     @Override
  14.     public void invoke(String json, Context context) throws Exception {
  15.         // op字段:  该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read
  16.         // 对于U操作,其数据部分同时包含了Before和After。
  17.         log.info("监听到数据: {}", json);
  18.         String op = JSONUtil.getValue(json, "op", String.class);
  19.         // 语句的id
  20.         Integer id = null;
  21.         // 如果是update语句
  22.         if ("u".equals(op)) {
  23.             id = JSONUtil.getValue(json, "after.id", Integer.class);
  24.             log.info("执行update语句");
  25.             // 执行update语句
  26.         }
  27.         // 如果是delete语句
  28.         if ("d".equals(op)) {
  29.             id = JSONUtil.getValue(json, "before.id", Integer.class);
  30.             log.info("执行delete语句");
  31.             // 执行删除语句
  32.         }
  33.         // 如果是新增
  34.         if ("c".equals(op)) {
  35.             log.info("执行insert语句");
  36.         }
  37.     }
  38.     // 前置操作
  39.     @Override
  40.     public void open(OpenContext openContext) throws Exception {
  41.         super.open(openContext);
  42.     }
  43.     // 后置操作
  44.     @Override
  45.     public void close() throws Exception {
  46.         super.close();
  47.     }
  48. }
复制代码
自界说反序列化器 MySQLDeserialization

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
  3. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
  4. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
  5. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
  6. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
  7. import io.debezium.data.Envelope;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.flink.api.common.typeinfo.TypeInformation;
  10. import org.apache.flink.util.Collector;
  11. import org.springframework.stereotype.Service;
  12. import java.util.List;
  13. import java.util.Optional;
  14. /**
  15. * @author whiteBrocade
  16. * @version 1.0
  17. * @description MySQL消息读取 自定义反序列化器
  18. */
  19. @Slf4j
  20. @Service
  21. public class MySQLDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
  22.     public static final String TS_MS = "ts_ms";
  23.     public static final String BIN_FILE = "file";
  24.     public static final String POS = "pos";
  25.     public static final String CREATE = "CREATE";
  26.     public static final String BEFORE = "before";
  27.     public static final String AFTER = "after";
  28.     public static final String SOURCE = "source";
  29.     public static final String UPDATE = "UPDATE";
  30.     /**
  31.      * 反序列化数据, 转为变更JSON对象
  32.      *
  33.      * @param sourceRecord SourceRecord
  34.      * @param collector Collector<DataChangeInfo>
  35.      */
  36.     @Override
  37.     public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
  38.         try {
  39.             // 根据主题的格式,获取数据库名(database)和表名(tableName)
  40.             String topic = sourceRecord.topic();
  41.             String[] fields = topic.split("\\.");
  42.             String database = fields[1];
  43.             String tableName = fields[2];
  44.             Struct struct = (Struct) sourceRecord.value();
  45.             final Struct source = struct.getStruct(SOURCE);
  46.             DataChangeInfo dataChangeInfo = new DataChangeInfo();
  47.             // 获取操作类型  CREATE UPDATE DELETE
  48.             Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  49.             String type = operation.toString().toUpperCase();
  50.             int eventType = type.equals(CREATE) ? OperatorTypeEnum.INSERT.getType() : UPDATE.equals(type) ?
  51.                     OperatorTypeEnum.UPDATE.getType() : OperatorTypeEnum.DELETE.getType();
  52.             // 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出
  53.             // 获取变更前和变更后的数据,并将其设置到DataChangeInfo对象中
  54.             dataChangeInfo.setBeforeData(this.getJsonObject(struct, BEFORE).toJSONString());
  55.             dataChangeInfo.setAfterData(this.getJsonObject(struct, AFTER).toJSONString());
  56.             if (eventType == OperatorTypeEnum.DELETE.getType()) {
  57.                 dataChangeInfo.setData(this.getJsonObject(struct, BEFORE).toJSONString());
  58.             } else {
  59.                 dataChangeInfo.setData(this.getJsonObject(struct, AFTER).toJSONString());
  60.             }
  61.             dataChangeInfo.setOperatorType(eventType);
  62.             dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE))
  63.                     .map(Object::toString)
  64.                     .orElse(""));
  65.             dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS))
  66.                     .map(x -> Integer.parseInt(x.toString()))
  67.                     .orElse(0));
  68.             dataChangeInfo.setDatabase(database);
  69.             dataChangeInfo.setTableName(tableName);
  70.             dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS))
  71.                     .map(x -> Long.parseLong(x.toString()))
  72.                     .orElseGet(System::currentTimeMillis));
  73.             // 输出数据
  74.             collector.collect(dataChangeInfo);
  75.         } catch (Exception e) {
  76.             log.error("反序列binlog失败", e);
  77.         }
  78.     }
  79.     /**
  80.      * 从源数据获取出变更之前或之后的数据
  81.      *
  82.      * @param value Struct
  83.      * @param fieldElement 字段
  84.      * @return JSONObject
  85.      */
  86.     private JSONObject getJsonObject(Struct value, String fieldElement) {
  87.         Struct element = value.getStruct(fieldElement);
  88.         JSONObject jsonObject = new JSONObject();
  89.         if (element != null) {
  90.             Schema afterSchema = element.schema();
  91.             List<Field> fieldList = afterSchema.fields();
  92.             for (Field field : fieldList) {
  93.                 Object afterValue = element.get(field);
  94.                 jsonObject.put(field.name(), afterValue);
  95.             }
  96.         }
  97.         return jsonObject;
  98.     }
  99.     @Override
  100.     public TypeInformation<DataChangeInfo> getProducedType() {
  101.         return TypeInformation.of(DataChangeInfo.class);
  102.     }
  103. }
复制代码
LogHandler

BaseLogHandler

  1. import java.io.Serializable;
  2. /**
  3. * @author whiteBrocade
  4. * @version 1.0
  5. * @description 日志处理器
  6. * todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现
  7. */
  8. public interface BaseLogHandler<T> extends Serializable {
  9.     /**
  10.      * 日志处理
  11.      *
  12.      * @param data 数据转换后模型
  13.      * @param operatorTime 操作时间
  14.      */
  15.     void handleInsertLog(T data, Long operatorTime);
  16.     /**
  17.      * 日志处理
  18.      *
  19.      * @param data 数据转换后模型
  20.      * @param operatorTime 操作时间
  21.      */
  22.     void handleUpdateLog(T data, Long operatorTime);
  23.     /**
  24.      * 日志处理
  25.      *
  26.      * @param data 数据转换后模型
  27.      * @param operatorTime 操作时间
  28.      */
  29.     void handleDeleteLog(T data, Long operatorTime);
  30. }
复制代码
StrategyHandleSelector

  1. import lombok.Data;
  2. /**
  3. * @author whiteBrocade
  4. * @version 1.0
  5. * @description 策略处理选择器
  6. */
  7. @Data
  8. public class StrategyHandleSelector {
  9.     /**
  10.      * 策略处理器名称
  11.      */
  12.     private String handlerName;
  13.     /**
  14.      * 数据源
  15.      */
  16.     private Object data;
  17.     /**
  18.      * 操作时间
  19.      */
  20.     private Long operatorTime;
  21.     /**
  22.      * 操作类型
  23.      */
  24.     private Integer operatorType;
  25. }
复制代码
StudentLogHandler

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.stereotype.Service;
  3. /**
  4. * @author whiteBrocade
  5. * @version 1.0
  6. * @description Student对应处理器
  7. */
  8. @Slf4j
  9. @Service
  10. public class StudentLogHandler implements BaseLogHandler<Student> {
  11.     @Override
  12.     public void handleInsertLog(Student data, Long operatorTime) {
  13.         log.info("处理Student表的新增日志: {}", data);
  14.     }
  15.     @Override
  16.     public void handleUpdateLog(Student data, Long operatorTime) {
  17.         log.info("处理Student表的修改日志: {}", data);
  18.     }
  19.     @Override
  20.     public void handleDeleteLog(Student data, Long operatorTime) {
  21.         log.info("处理Student表的删除日志: {}", data);
  22.     }
  23. }
复制代码
JSONUtil

  1. import com.google.gson.Gson;
  2. import com.google.gson.reflect.TypeToken;
  3. import ognl.Ognl;
  4. import ognl.OgnlContext;
  5. import java.util.Map;
  6. /**
  7. * @author whiteBrocade
  8. * @version 1.0
  9. * @description: JSON工具类
  10. */
  11. public class JSONUtil {
  12.     /**
  13.      * 将指定JSON转为Map对象, Key类型为String,对应JSON的key
  14.      * Value分情况:
  15.      * 1. Value是字符串, 自动转为字符串, 例如:{"a","b"}
  16.      * 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}}
  17.      * 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]}
  18.      *
  19.      * @param json 输入的的JSON对象
  20.      * @return 动态Map集合
  21.      */
  22.     public static Map<String, Object> transferToMap(String json) {
  23.         Gson gson = new Gson();
  24.         Map<String, Object> map = gson.fromJson(json, new TypeToken<Map<String, Object>>() {}.getType());
  25.         return map;
  26.     }
  27.     /**
  28.      * 获取指定JSON的指定路径的值
  29.      *
  30.      * @param json  原始JSON数据
  31.      * @param path  OGNL原则表达式
  32.      * @param clazz Value对应的目标类
  33.      * @return clazz对应的数据
  34.      */
  35.     public static <T> T getValue(String json, String path, Class<T> clazz) {
  36.         try {
  37.             Map<String, Object> map = JSONUtil.transferToMap(json);
  38.             OgnlContext ognlContext = new OgnlContext();
  39.             ognlContext.setRoot(map);
  40.             T value = (T) Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);
  41.             return value;
  42.         } catch (Exception e) {
  43.             throw new RuntimeException(e);
  44.         }
  45.     }
  46. }
复制代码
MysqlEventListener

  1. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  2. import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
  3. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  5. import lombok.AllArgsConstructor;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.springframework.boot.ApplicationArguments;
  9. import org.springframework.boot.ApplicationRunner;
  10. import org.springframework.stereotype.Component;
  11. /**
  12. * @author whiteBrocade
  13. * @version 1.0
  14. * @description MySQL变更监听
  15. */
  16. @Component
  17. @AllArgsConstructor
  18. public class MysqlEventListener implements ApplicationRunner {
  19.     /**
  20.      * Flink CDC相关配置
  21.      */
  22.     private final FlinkCDCConfig flinkCDCConfig;
  23.     /**
  24.      * 自定义Sink
  25.      * customSink: 通过ognl解析ddl语句类型
  26.      * dataChangeSink: 通过struct解析ddl语句类型
  27.      * 通常两个选择一个就行
  28.      */
  29.     private final CustomSink customSink;
  30.     private final DataChangeSink dataChangeSink;
  31.     /**
  32.      * 自定义反序列化处理器
  33.      */
  34.     private final MySQLDeserialization mySQLDeserialization;
  35.     @Override
  36.     public void run(ApplicationArguments args) throws Exception {
  37.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  38.         // 设置整个Flink程序的默认并行度
  39.         env.setParallelism(flinkCDCConfig.getParallelism());
  40.         // 设置checkpoint 间隔
  41.         env.enableCheckpointing(flinkCDCConfig.getEnableCheckpointing());
  42.         // 设置任务关闭的时候保留最后一次 CK 数据
  43.         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  44.         // todo 下列的两个MySqlSource选择一个
  45.         // 自定义的反序列化器
  46.         // MySqlSource<DataChangeInfo> mySqlSource = this.buildBaseMySqlSource(DataChangeInfo.class)
  47.         //         .deserializer(mySQLDeserialization)
  48.         //         .build();
  49.         // Flink CDC自带的反序列化器
  50.         MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)
  51.             .deserializer(new JsonDebeziumDeserializationSchema())
  52.             .build();
  53.         env.fromSource(mySqlSource,
  54.                        WatermarkStrategy.noWatermarks(),
  55.                        "mysql-source")
  56.             // 设置该数据源的并行度
  57.             .setParallelism(flinkCDCConfig.getParallelism())
  58.             // todo 根据上述的选择,选择对应的Sink
  59.             // .addSink(dataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink
  60.             .addSink(customSink);
  61.         env.execute("mysql-stream-cdc");
  62.     }
  63.     /**
  64.      * 构建基本的MySqlSourceBuilder
  65.      *
  66.      * @param clazz 返回的数据类型Class对象
  67.      * @param <T>   源数据中存储的类型
  68.      * @return MySqlSourceBuilder
  69.      */
  70.     private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) {
  71.         return MySqlSource.<T>builder()
  72.             .hostname(flinkCDCConfig.getHostname())
  73.             .port(flinkCDCConfig.getPort())
  74.             .username(flinkCDCConfig.getUsername())
  75.             .password(flinkCDCConfig.getPassword())
  76.             .databaseList(flinkCDCConfig.getDatabaseList())
  77.             .tableList(flinkCDCConfig.getTableList())
  78.             /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
  79.                  * latest: 只进行增量导入(不读取历史变化)
  80.                  * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
  81.                  */
  82.             .startupOptions(StartupOptions.latest())
  83.             .includeSchemaChanges(flinkCDCConfig.getIncludeSchemaChanges()) // 包括schema的改变
  84.             .serverTimeZone("GMT+8"); // 时区
  85.     }
  86. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

三尺非寒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表