Flink CDC
CDC相关介绍
CDC是什么?
CDC是Change Data Capture(变更数据获取)的简称。焦颔首脑是,监测并捕获数据库的变更(包罗数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完备记载下来,写入到MQ以供其他服务进行订阅及消费
CDC分类
CDC主要分为基于查询和基于Binlog
基于查询基于Binlog开源产品Sqoop、DataXCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获全部数据变化否是延迟性高延迟低延迟是否增长数据库压力是否 基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也由于这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到
Flink CDC
Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的source组件。
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
Java中集成Flink CDC
MySQL相关设置
执行初始化SQL数据
- # 创建test数据库
- create database test;
- # 在test库中创建studnet, t1, t2, t3表, 插入数据
- use test;
- CREATE TABLE `student` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
- `age` int(11) NULL DEFAULT NULL,
- PRIMARY KEY (`id`) USING BTREE
- ) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
- INSERT INTO `student` VALUES (1, 'joy', 18);
- INSERT INTO `student` VALUES (2, 'tom', 123123);
- CREATE TABLE t1(
- `id` VARCHAR(255) PRIMARY KEY,
- `name` VARCHAR(255)
- );
- CREATE TABLE t2(
- `id` VARCHAR(255) PRIMARY KEY,
- `name` VARCHAR(255)
- );
- CREATE TABLE t3(
- `id` VARCHAR(255) PRIMARY KEY,
- `name` VARCHAR(255)
- );
- # 创建test_route数据库
- create database test_route;
- # 在test_route库中创建t1, t2, t3表
- use test_route;
- CREATE TABLE t1(
- `id` VARCHAR(255) PRIMARY KEY,
- `name` VARCHAR(255)
- );
- CREATE TABLE t2(
- `id` VARCHAR(255) PRIMARY KEY,
- `name` VARCHAR(255)
- );
- CREATE TABLE t3(
- `id` VARCHAR(255) PRIMARY KEY,
- `name` VARCHAR(255)
- );
- # 在test_route数据库中的t1, t2, t3表插入数据
- use test_route;
- INSERT INTO t1 VALUES('1001','zhangsan');
- INSERT INTO t1 VALUES('1002','lisi');
- INSERT INTO t1 VALUES('1003','wangwu');
- INSERT INTO t2 VALUES('1004','zhangsan');
- INSERT INTO t2 VALUES('1005','lisi');
- INSERT INTO t2 VALUES('1006','wangwu');
- INSERT INTO t3 VALUES('1001','F');
- INSERT INTO t3 VALUES('1002','F');
- INSERT INTO t3 VALUES('1003','M');
复制代码 开启Binlog
通常来说默认安装MySQL的cnf都是存在/etc下的
- # 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
- # 数据库id
- server-id = 1
- # 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
- default-time-zone = '+8:00'
- # 启动binlog,该参数的值会作为binlog的文件名
- log-bin=mysql-bin
- # binlog类型,maxwell要求为row类型
- binlog_format=row
- # 启用binlog的数据库,需根据实际情况作出修改
- binlog-do-db=test
- binlog-do-db=test_route
复制代码 修改数据库时区
永世修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)
- default-time-zone = '+8:00'
复制代码 临时修改(重启会丢失)
- # MySQL 8 执行这个
- set persist time_zone='+8:00';
- # MySQL 5.x版本执行这个
- set time_zone='+8:00';
复制代码 重启MySQL
留意了, 设置后必要重启MySQL!
代码
相关依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- Flink CDC依赖 start-->
- <!-- Flink核心依赖, 提供了Flink的核心API -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Flink流处理Java API依赖 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Flink客户端工具依赖, 包含命令行界面和实用函数 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Flink连接器基础包, 包含连接器公共功能 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 -->
- <!--<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>3.2.0-1.19</version>
- </dependency>-->
- <!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Flink Table API桥接器, 连接DataStream API和Table API -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- Flink JSON格式化数据依赖 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- 开启Web UI支持, 端口为8081, 默认为不开启-->
- <!--<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web</artifactId>
- <version>1.19.1</version>
- </dependency>-->
- <!-- MySQL CDC依赖
- org.apache.flink的适用MySQL 8.0
- 具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408
- -->
- <dependency>
- <!--MySQL 8.0适用-->
- <!--<groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>3.1.0</version>-->
- <!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用-->
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <!--<version>2.3.0</version>-->
- <version>3.0.1</version>
- </dependency>
- <!-- lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <!-- gson工具类 -->
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.11.0</version>
- </dependency>
- <!-- ognl表达式 -->
- <dependency>
- <groupId>ognl</groupId>
- <artifactId>ognl</artifactId>
- <version>3.1.1</version>
- </dependency>
- <!-- hutool工具类 -->
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.8.26</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>2.0.31</version>
- </dependency>
- </dependencies>
- <name>cdc-test</name>
- <description>cdc-test</description>
- <properties>
- <java.version>11</java.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <spring-boot.version>2.6.13</spring-boot.version>
- <flink.version>1.19.0</flink.version>
- </properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-dependencies</artifactId>
- <version>${spring-boot.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
复制代码 yaml
- # Flink CDC相关配置
- flink-cdc:
- mysql:
- hostname: 192.168.132.101
- port: 3306
- username: root
- password: 12345678
- databaseList: test
- tableList: test.student, test.t1
- includeSchemaChanges: false
- parallelism: 1
- enableCheckpointing: 5000
复制代码 FlinkCDCConfig
- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Configuration;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description: Flink CDC配置
- */
- @Data
- @Configuration
- @ConfigurationProperties("flink-cdc.mysql")
- public class FlinkCDCConfig {
- /**
- * 数据库地址
- */
- private String hostname;
- /**
- * 数据库端口
- */
- private Integer port;
- /**
- * 数据库用户名
- */
- private String username;
- /**
- * 数据库密码
- */
- private String password;
- /**
- * 数据库名
- */
- private String[] databaseList;
- /**
- * 表名
- */
- private String[] tableList;
- /**
- * 是否包含schema变更
- */
- private Boolean includeSchemaChanges;
- /**
- * 并行度
- */
- private Integer parallelism;
- /**
- * 检查点间隔, 单位毫秒
- */
- private Integer enableCheckpointing;
- }
复制代码 相关摆列
OperatorTypeEnum
- import lombok.AllArgsConstructor;
- import lombok.Getter;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 操作类型枚举
- */
- @Getter
- @AllArgsConstructor
- public enum OperatorTypeEnum {
- /**
- * 新增
- */
- INSERT(1),
- /**
- * 修改
- */
- UPDATE(2),
- /**
- * 删除
- */
- DELETE(3),
- ;
- private final int type;
- }
复制代码 StrategyEnum
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.util.ObjUtil;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import lombok.AllArgsConstructor;
- import lombok.Getter;
- import java.beans.Introspector;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 表处理策略
- * todo 后续在这里新增相关枚举即可
- */
- @Getter
- @AllArgsConstructor
- public enum StrategyEnum {
- /**
- * Student处理策略
- */
- STUDENT("student", Student.class, Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),
- ;
- /**
- * 表名
- */
- private final String tableName;
- /**
- * class对象
- */
- private final Class<?> varClass;
- /**
- * 处理器名
- */
- private final String handlerName;
- /**
- * 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回
- *
- * @param dataChangeInfo 数据变更对象
- * @return StrategyHandlerSelector
- */
- public static StrategyHandleSelector getSelector(DataChangeInfo dataChangeInfo) {
- if (ObjUtil.isNull(dataChangeInfo)) {
- return null;
- }
- String tableName = dataChangeInfo.getTableName();
- StrategyHandleSelector selector = new StrategyHandleSelector();
- // 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略
- for (StrategyEnum strategyEnum : values()) {
- // 如果找到匹配的策略, 创建并配置 StrategyHandleSelector
- if (strategyEnum.getTableName().equals(tableName)) {
- selector.setHandlerName(strategyEnum.handlerName);
- selector.setOperatorTime(dataChangeInfo.getOperatorTime());
- selector.setOperatorType(dataChangeInfo.getOperatorType());
- JSONObject jsonObject = JSONUtil.parseObj(dataChangeInfo.getData());
- selector.setData(BeanUtil.copyProperties(jsonObject, strategyEnum.varClass));
- return selector;
- }
- }
- return null;
- }
- }
复制代码 pojo
Student
- import lombok.Data;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 学生类, 用于演示
- */
- @Data
- public class Student {
- /**
- * id
- */
- private Integer id;
- /**
- * 姓名
- */
- private String name;
- /**
- * 年龄
- */
- private Integer age;
- }
复制代码 DataChangeInfo
- import lombok.Data;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 数据变更对象
- */
- @Data
- public class DataChangeInfo {
- /**
- * 变更前数据
- */
- private String beforeData;
- /**
- * 变更后数据
- */
- private String afterData;
- /**
- * 操作的数据
- */
- private String data;
- /**
- * 变更类型 1->新增 2->修改 3->删除
- */
- private Integer operatorType;
- /**
- * binlog文件名
- */
- private String fileName;
- /**
- * binlog当前读取点位
- */
- private Integer filePos;
- /**
- * 数据库名
- */
- private String database;
- /**
- * 表名
- */
- private String tableName;
- /**
- * 变更时间
- */
- private Long operatorTime;
- }
复制代码 自界说Sink
DataChangeSink
- import cn.hutool.core.util.ObjUtil;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.functions.OpenContext;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.springframework.stereotype.Component;
- import java.io.Serializable;
- import java.util.Map;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description
- */
- @Slf4j
- @Component
- @AllArgsConstructor
- public class DataChangeSink extends RichSinkFunction<DataChangeInfo> implements Serializable {
- /**
- * BaseLogHandler相关的缓存
- * Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中
- */
- private final Map<String, BaseLogHandler> strategyHandlerMap;
- @Override
- public void invoke(DataChangeInfo value, Context context) {
- log.info("收到变更原始数据:{}", value);
- // 选择策略
- StrategyHandleSelector selector = StrategyEnum.getSelector(value);
- if (ObjUtil.isNull(selector)) {
- return;
- }
- BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getHandlerName());
- // insert操作
- if (selector.getOperatorType().equals(OperatorTypeEnum.INSERT.getType())) {
- handler.handleInsertLog(selector.getData(), selector.getOperatorTime());
- return;
- }
- // update操作
- if (selector.getOperatorType().equals(OperatorTypeEnum.UPDATE.getType())) {
- handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());
- return;
- }
- // delete操作
- if (selector.getOperatorType().equals(OperatorTypeEnum.DELETE.getType())) {
- handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());
- }
- }
- /**
- * 前置操作
- */
- @Override
- public void open(OpenContext openContext) throws Exception {
- super.open(openContext);
- }
- /**
- * 后置操作
- */
- @Override
- public void close() throws Exception {
- super.close();
- }
- }
复制代码 CustomSink
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.functions.OpenContext;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.springframework.stereotype.Component;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 自定义Sink处理器, 这个是根据ognl表达式区分ddl语句类型, 搭配
- */
- @Slf4j
- @Component
- public class CustomSink extends RichSinkFunction<String> {
- @Override
- public void invoke(String json, Context context) throws Exception {
- // op字段: 该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read
- // 对于U操作,其数据部分同时包含了Before和After。
- log.info("监听到数据: {}", json);
- String op = JSONUtil.getValue(json, "op", String.class);
- // 语句的id
- Integer id = null;
- // 如果是update语句
- if ("u".equals(op)) {
- id = JSONUtil.getValue(json, "after.id", Integer.class);
- log.info("执行update语句");
- // 执行update语句
- }
- // 如果是delete语句
- if ("d".equals(op)) {
- id = JSONUtil.getValue(json, "before.id", Integer.class);
- log.info("执行delete语句");
- // 执行删除语句
- }
- // 如果是新增
- if ("c".equals(op)) {
- log.info("执行insert语句");
- }
- }
- // 前置操作
- @Override
- public void open(OpenContext openContext) throws Exception {
- super.open(openContext);
- }
- // 后置操作
- @Override
- public void close() throws Exception {
- super.close();
- }
- }
复制代码 自界说反序列化器 MySQLDeserialization
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.springframework.stereotype.Service;
- import java.util.List;
- import java.util.Optional;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description MySQL消息读取 自定义反序列化器
- */
- @Slf4j
- @Service
- public class MySQLDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
- public static final String TS_MS = "ts_ms";
- public static final String BIN_FILE = "file";
- public static final String POS = "pos";
- public static final String CREATE = "CREATE";
- public static final String BEFORE = "before";
- public static final String AFTER = "after";
- public static final String SOURCE = "source";
- public static final String UPDATE = "UPDATE";
- /**
- * 反序列化数据, 转为变更JSON对象
- *
- * @param sourceRecord SourceRecord
- * @param collector Collector<DataChangeInfo>
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
- try {
- // 根据主题的格式,获取数据库名(database)和表名(tableName)
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- String database = fields[1];
- String tableName = fields[2];
- Struct struct = (Struct) sourceRecord.value();
- final Struct source = struct.getStruct(SOURCE);
- DataChangeInfo dataChangeInfo = new DataChangeInfo();
- // 获取操作类型 CREATE UPDATE DELETE
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- String type = operation.toString().toUpperCase();
- int eventType = type.equals(CREATE) ? OperatorTypeEnum.INSERT.getType() : UPDATE.equals(type) ?
- OperatorTypeEnum.UPDATE.getType() : OperatorTypeEnum.DELETE.getType();
- // 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出
- // 获取变更前和变更后的数据,并将其设置到DataChangeInfo对象中
- dataChangeInfo.setBeforeData(this.getJsonObject(struct, BEFORE).toJSONString());
- dataChangeInfo.setAfterData(this.getJsonObject(struct, AFTER).toJSONString());
- if (eventType == OperatorTypeEnum.DELETE.getType()) {
- dataChangeInfo.setData(this.getJsonObject(struct, BEFORE).toJSONString());
- } else {
- dataChangeInfo.setData(this.getJsonObject(struct, AFTER).toJSONString());
- }
- dataChangeInfo.setOperatorType(eventType);
- dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE))
- .map(Object::toString)
- .orElse(""));
- dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS))
- .map(x -> Integer.parseInt(x.toString()))
- .orElse(0));
- dataChangeInfo.setDatabase(database);
- dataChangeInfo.setTableName(tableName);
- dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS))
- .map(x -> Long.parseLong(x.toString()))
- .orElseGet(System::currentTimeMillis));
- // 输出数据
- collector.collect(dataChangeInfo);
- } catch (Exception e) {
- log.error("反序列binlog失败", e);
- }
- }
- /**
- * 从源数据获取出变更之前或之后的数据
- *
- * @param value Struct
- * @param fieldElement 字段
- * @return JSONObject
- */
- private JSONObject getJsonObject(Struct value, String fieldElement) {
- Struct element = value.getStruct(fieldElement);
- JSONObject jsonObject = new JSONObject();
- if (element != null) {
- Schema afterSchema = element.schema();
- List<Field> fieldList = afterSchema.fields();
- for (Field field : fieldList) {
- Object afterValue = element.get(field);
- jsonObject.put(field.name(), afterValue);
- }
- }
- return jsonObject;
- }
- @Override
- public TypeInformation<DataChangeInfo> getProducedType() {
- return TypeInformation.of(DataChangeInfo.class);
- }
- }
复制代码 LogHandler
BaseLogHandler
- import java.io.Serializable;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 日志处理器
- * todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现
- */
- public interface BaseLogHandler<T> extends Serializable {
- /**
- * 日志处理
- *
- * @param data 数据转换后模型
- * @param operatorTime 操作时间
- */
- void handleInsertLog(T data, Long operatorTime);
- /**
- * 日志处理
- *
- * @param data 数据转换后模型
- * @param operatorTime 操作时间
- */
- void handleUpdateLog(T data, Long operatorTime);
- /**
- * 日志处理
- *
- * @param data 数据转换后模型
- * @param operatorTime 操作时间
- */
- void handleDeleteLog(T data, Long operatorTime);
- }
复制代码 StrategyHandleSelector
- import lombok.Data;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description 策略处理选择器
- */
- @Data
- public class StrategyHandleSelector {
- /**
- * 策略处理器名称
- */
- private String handlerName;
- /**
- * 数据源
- */
- private Object data;
- /**
- * 操作时间
- */
- private Long operatorTime;
- /**
- * 操作类型
- */
- private Integer operatorType;
- }
复制代码 StudentLogHandler
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description Student对应处理器
- */
- @Slf4j
- @Service
- public class StudentLogHandler implements BaseLogHandler<Student> {
- @Override
- public void handleInsertLog(Student data, Long operatorTime) {
- log.info("处理Student表的新增日志: {}", data);
- }
- @Override
- public void handleUpdateLog(Student data, Long operatorTime) {
- log.info("处理Student表的修改日志: {}", data);
- }
- @Override
- public void handleDeleteLog(Student data, Long operatorTime) {
- log.info("处理Student表的删除日志: {}", data);
- }
- }
复制代码 JSONUtil
- import com.google.gson.Gson;
- import com.google.gson.reflect.TypeToken;
- import ognl.Ognl;
- import ognl.OgnlContext;
- import java.util.Map;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description: JSON工具类
- */
- public class JSONUtil {
- /**
- * 将指定JSON转为Map对象, Key类型为String,对应JSON的key
- * Value分情况:
- * 1. Value是字符串, 自动转为字符串, 例如:{"a","b"}
- * 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}}
- * 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]}
- *
- * @param json 输入的的JSON对象
- * @return 动态Map集合
- */
- public static Map<String, Object> transferToMap(String json) {
- Gson gson = new Gson();
- Map<String, Object> map = gson.fromJson(json, new TypeToken<Map<String, Object>>() {}.getType());
- return map;
- }
- /**
- * 获取指定JSON的指定路径的值
- *
- * @param json 原始JSON数据
- * @param path OGNL原则表达式
- * @param clazz Value对应的目标类
- * @return clazz对应的数据
- */
- public static <T> T getValue(String json, String path, Class<T> clazz) {
- try {
- Map<String, Object> map = JSONUtil.transferToMap(json);
- OgnlContext ognlContext = new OgnlContext();
- ognlContext.setRoot(map);
- T value = (T) Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);
- return value;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
复制代码 MysqlEventListener
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import lombok.AllArgsConstructor;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.stereotype.Component;
- /**
- * @author whiteBrocade
- * @version 1.0
- * @description MySQL变更监听
- */
- @Component
- @AllArgsConstructor
- public class MysqlEventListener implements ApplicationRunner {
- /**
- * Flink CDC相关配置
- */
- private final FlinkCDCConfig flinkCDCConfig;
- /**
- * 自定义Sink
- * customSink: 通过ognl解析ddl语句类型
- * dataChangeSink: 通过struct解析ddl语句类型
- * 通常两个选择一个就行
- */
- private final CustomSink customSink;
- private final DataChangeSink dataChangeSink;
- /**
- * 自定义反序列化处理器
- */
- private final MySQLDeserialization mySQLDeserialization;
- @Override
- public void run(ApplicationArguments args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 设置整个Flink程序的默认并行度
- env.setParallelism(flinkCDCConfig.getParallelism());
- // 设置checkpoint 间隔
- env.enableCheckpointing(flinkCDCConfig.getEnableCheckpointing());
- // 设置任务关闭的时候保留最后一次 CK 数据
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // todo 下列的两个MySqlSource选择一个
- // 自定义的反序列化器
- // MySqlSource<DataChangeInfo> mySqlSource = this.buildBaseMySqlSource(DataChangeInfo.class)
- // .deserializer(mySQLDeserialization)
- // .build();
- // Flink CDC自带的反序列化器
- MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)
- .deserializer(new JsonDebeziumDeserializationSchema())
- .build();
- env.fromSource(mySqlSource,
- WatermarkStrategy.noWatermarks(),
- "mysql-source")
- // 设置该数据源的并行度
- .setParallelism(flinkCDCConfig.getParallelism())
- // todo 根据上述的选择,选择对应的Sink
- // .addSink(dataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink
- .addSink(customSink);
- env.execute("mysql-stream-cdc");
- }
- /**
- * 构建基本的MySqlSourceBuilder
- *
- * @param clazz 返回的数据类型Class对象
- * @param <T> 源数据中存储的类型
- * @return MySqlSourceBuilder
- */
- private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) {
- return MySqlSource.<T>builder()
- .hostname(flinkCDCConfig.getHostname())
- .port(flinkCDCConfig.getPort())
- .username(flinkCDCConfig.getUsername())
- .password(flinkCDCConfig.getPassword())
- .databaseList(flinkCDCConfig.getDatabaseList())
- .tableList(flinkCDCConfig.getTableList())
- /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
- * latest: 只进行增量导入(不读取历史变化)
- * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
- */
- .startupOptions(StartupOptions.latest())
- .includeSchemaChanges(flinkCDCConfig.getIncludeSchemaChanges()) // 包括schema的改变
- .serverTimeZone("GMT+8"); // 时区
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |