IT评测·应用市场-qidao123.com
标题:
SpringBoot集成Flink-CDC,实现对数据库数据的监听
[打印本页]
作者:
丝
时间:
2024-6-13 23:02
标题:
SpringBoot集成Flink-CDC,实现对数据库数据的监听
一、什么是 CDC ?
CDC 是
Change Data Capture
(变更数据获取) 的简称。 焦点思想是,监测并捕捉数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完备记载下来,写入到消息中间件中以供其他服务进行订阅及消耗。
二、Flink-CDC 是什么?
CDC Connectors for Apache Flink是一组用于Apache Flink 的源毗连器,利用变更数据捕捉 (CDC) 从差异数据库获取变更。用于 Apache Flink 的 CDC 毗连器将 Debezium 集成为捕捉数据更改的引擎。以是它可以充分发挥 Debezium 的能力。
大概意思就是,Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。
Flink-CDC 开源地址: Apache/Flink-CDC
Flink-CDC 中文文档:Apache Flink CDC | Apache Flink CDC
三、SpringBoot 整合 Flink-CDC
3.1、怎样集成到SpringBoot中?
Flink 作业通常独立于一样平常的服务之外,专门编写代码,用 Flink 命令行工具来运行和克制。将Flink 作业集成到 Spring Boot 应用中并不常见,而且一样平常也不发起这样做,由于Flink作业一样平常运行在大数据环境中。
然而,在特殊需求下,我们可以做一些改变使 Flink 应用适应 Spring Boot 环境,比如在你的场景中利用 Flink CDC 进行
数据变更捕捉
。将 Flink 作业以本地项目的方式启动,集成在 Spring Boot应用中,可以利用到 Spring 的便利性。
CommandLineRunner
ApplicationRunner
3.2、集成举例
1、CommandLineRunner
@SpringBootApplication
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class, args);
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("flinkuser")
.password("flinkpw")
.databaseList("mydb") // monitor all tables under "mydb" database
.tableList("mydb.table1", "mydb.table2") // monitor only "table1" and "table2" under "mydb" database
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
DataStreamSource<String> mysqlSource = env.addSource(sourceFunction);
// formulate processing logic here, e.g., printing to standard output
mysqlSource.print();
// execute the Flink job within the Spring Boot application
env.execute("Flink CDC");
};
}
}
复制代码
2、ApplicationRunner
@SpringBootApplication
public class FlinkCDCApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(FlinkCDCApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure your Flink job here
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("flinkuser")
.password("flinkpw")
.databaseList("mydb")
// set other source options ...
.deserializer(new StringDebeziumDeserializationSchema()) // Converts SourceRecord to String
.build();
DataStream<String> cdcStream = env.addSource(sourceFunction);
// Implement your processing logic here
// For example:
cdcStream.print();
// Start the Flink job within the Spring Boot application
env.execute("Flink CDC with Spring Boot");
}
}
复制代码
这次用例采用 ApplicationRunner,不外要改变一下,让 Flink CDC 作为 Bean 来实现。
四、功能实现
4.1、功能逻辑
总体来讲,不太想把 Flink CDC单独拉出来,更想让它依托于一个服务上,彻底当成一个组件。
其中在生产者中,我们将要进行实现:
4.2、所需环境
MySQL 5.7 +
:确保源数据库已经开启 Binlog 日志功能,并且设置 Row 格式
Spring Boot 2.7.6
:还是不要轻易利用 3.0 以上为好,有很多多少jar没有适配
RabbitMQ
:适配即可
Flink CDC
:特别留意版本
4.3、Flink CDC POM依赖
<flink.version>1.13.6</flink.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql -cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.42</version>
</dependency>
复制代码
上面是一些Flink CDC必须的依赖,固然如果需要实现其他数据库,可以替换其他数据库的CDC jar。怎么安排jar包的位置和其余需要的jar,这个可自行调整。
4.4、代码展示
焦点类
MysqlEventListener
:设置类
MysqlDeserialization
:MySQL消息读取自定义序列化
DataChangeInfo
:封装的变更对象
DataChangeSink
:继续一个Flink提供的抽象类,用于定义数据的输出或“下沉”逻辑,sink 是Flink处理流的最后阶段,通常用于将数据写入外部体系,如数据库、文件体系、消息队列等
(1)通过 ApplicationRunner 接入 SpringBoot
@Component
public class MysqlEventListener implements ApplicationRunner {
private final DataChangeSink dataChangeSink;
public MysqlEventListener(DataChangeSink dataChangeSink) {
this.dataChangeSink = dataChangeSink;
}
@Override
public void run(ApplicationArguments args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSourceRemote();
DataStream<DataChangeInfo> streamSource = env
.addSource(dataChangeInfoMySqlSource, "mysql-source")
.setParallelism(1);
streamSource.addSink(dataChangeSink);
env.execute("mysql-stream-cdc");
}
private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSourceLocal() {
return MySqlSource.<DataChangeInfo>builder()
.hostname("127.0.0.1")
.port(3306)
.username("root")
.password("0507")
.databaseList("flink-cdc-producer")
.tableList("flink-cdc-producer.producer_content", "flink-cdc-producer.name_content")
/*
* initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/
.startupOptions(StartupOptions.latest())
.deserializer(new MysqlDeserialization())
.serverTimeZone("GMT+8")
.build();
}
}
复制代码
(2)自定义 MySQL 消息读取序列化
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
public static final String TS_MS = "ts_ms";
public static final String BIN_FILE = "file";
public static final String POS = "pos";
public static final String CREATE = "CREATE";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String UPDATE = "UPDATE";
/**
* 反序列化数据,转为变更JSON对象
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
DataChangeInfo dataChangeInfo = new DataChangeInfo();
dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// String type = operation.toString().toUpperCase();
// int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
dataChangeInfo.setEventType(operation.name());
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
//7.输出数据
collector.collect(dataChangeInfo);
}
private Struct getStruct(Struct value, String fieldElement) {
return value.getStruct(fieldElement);
}
/**
* 从元数据获取出变更之前或之后的数据
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<DataChangeInfo> getProducedType() {
return TypeInformation.of(DataChangeInfo.class);
}
}
复制代码
(3)封装的变更对象
@Data
public class DataChangeInfo implements Serializable {
/**
* 变更前数据
*/
private String beforeData;
/**
* 变更后数据
*/
private String afterData;
/**
* 变更类型 1新增 2修改 3删除
*/
private String eventType;
/**
* binlog文件名
*/
private String fileName;
/**
* binlog当前读取点位
*/
private Integer filePos;
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 变更时间
*/
private Long changeTime;
}
复制代码
这里的 beforeData 、afterData直接存储 Struct 不好吗,还得费劲去来回转?
我曾实行过利用 Struct 存放在对象中,但是无法进行序列化。详细原因可以网上搜索,大概自己实行一下。
(4)定义 Flink 的 Sink
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {
transient RabbitTemplate rabbitTemplate;
transient ConfirmService confirmService;
transient TableDataConvertService tableDataConvertService;
@Override
public void invoke(DataChangeInfo value, Context context) {
log.info("收到变更原始数据:{}", value);
//转换后发送到对应的MQ
if (MIGRATION_TABLE_CACHE.containsKey(value.getTableName())) {
String routingKey = MIGRATION_TABLE_CACHE.get(value.getTableName());
//可根据需要自行进行confirmService的设计
rabbitTemplate.setReturnsCallback(confirmService);
rabbitTemplate.setConfirmCallback(confirmService);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, tableDataConvertService.convertSqlByDataChangeInfo(value));
}
}
/**
* 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,
* 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,
* 故在Flink自定义的Sink的open()方法中初始化Spring容器
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rabbitTemplate = ApplicationContextUtil.getBean(RabbitTemplate.class);
this.confirmService = ApplicationContextUtil.getBean(ConfirmService.class);
this.tableDataConvertService = ApplicationContextUtil.getBean(TableDataConvertService.class);
}
}
复制代码
(5)数据转换类接口和实现类
public interface TableDataConvertService {
String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo);
}
复制代码
@Service
public class TableDataConvertServiceImpl implements TableDataConvertService {
@Autowired
Map<String, SqlGeneratorService> sqlGeneratorServiceMap;
@Override
public String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo) {
SqlGeneratorService sqlGeneratorService = sqlGeneratorServiceMap.get(dataChangeInfo.getEventType());
return sqlGeneratorService.generatorSql(dataChangeInfo);
}
}
复制代码
由于在 dataChangeInfo 中我们有封装对象的类型(
CREATE
、
DELETE
、
UPDATE
),以是我盼望通过差异类来进行差异的工作。于是就有了下面的类结构:
根据 dataChangeInfo 的类型去生成对应的
SqlGeneratorServiceImpl
。
这是策略模式还是模板方法?
策略模式(Strategy Pattern)答应在运行时选择算法的行为。在策略模式中,定义了一系列的算法(策略),并将每一个算法封装起来,使它们可以相互替换。策略模式答应算法独立于利用它的客户端进行变化。
InsertSqlGeneratorServiceImpl、UpdateSqlGeneratorServiceImpl 和 DeleteSqlGeneratorServiceImpl 各自实现了 SqlGeneratorService 接口,这确实表明白一种策略。每一个实现类表现一个特定的SQL生成策略,并且可以相互替换,只要它们遵守同一个接口。
模板方法模式(Template Method Pattern),则偏重于在抽象类中定义算法的框架,让子类实现算法的某些步调而不改变算法的结构。AbstractSqlGenerator 作为抽象类的存在是为了被继续,但如果它不含有模板方法(即没有定义算法骨架的方法),那它就不符合模板方法模式。
在实际应用中,一个设计可能同时团结了多个设计模式,大概在某些环境下,一种设计模式的实现可能看起来与另一种模式类似。在这种环境下,若 AbstractSqlGenerator 提供了更多的共享代码或默认实现表现出框架脚色,那么它可能更接近模板方法。而如果 AbstractSqlGenerator 仅仅作为一种接口实现方式,且策略之间可以相互替换,那么这确实更符合策略模式。
值得留意的是,在
TableDataConvertServiceImpl
中,我们注入了一个
Map<String, SqlGeneratorService> sqlGeneratorServiceMap
,通过它来进行详细实现类的获取。那么他是个什么东西呢?作用是什么呢?为什么可以通过它来获取呢?
@Resource、@Autowired
标注作用于 Map 类型时,如果 Map 的 key 为 String 类型,则 Spring 会将容器中所有类型符合 Map 的 value 对应的类型的 Bean 增加进来,用 Bean 的 id 或 name 作为 Map 的 key。
那么可以看到下面第六步,在进行
DeleteSqlGeneratorServiceImpl
装配的时间进行指定了名字
@Service("DELETE")
,方便通过dataChangeInfo获取。
(6)转换类部门代码
public interface SqlGeneratorService {
String generatorSql(DataChangeInfo dataChangeInfo);
}
public abstract class AbstractSqlGenerator implements SqlGeneratorService {
@Override
public String generatorSql(DataChangeInfo dataChangeInfo) {
return null;
}
public String quoteIdentifier(String identifier) {
// 对字段名进行转义处理,这里简化为对其加反引号
// 实际应该处理数据库标识符的特殊字符
return "`" + identifier + "`";
}
}
复制代码
@Service("DELETE")
@Slf4j
public class DeleteSqlGeneratorServiceImpl extends AbstractSqlGenerator {
@Override
public String generatorSql(DataChangeInfo dataChangeInfo) {
String beforeData = dataChangeInfo.getBeforeData();
Map<String, Object> beforeDataMap = JSONObjectUtils.JsonToMap(beforeData);
StringBuilder wherePart = new StringBuilder();
for (String key : beforeDataMap.keySet()) {
Object beforeValue = beforeDataMap.get(key);
if ("create_time".equals(key)){
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
beforeValue = dateFormat.format(beforeValue);
}
if (wherePart.length() > 0) {
// 不是第一个更改的字段,增加逗号分隔
wherePart.append(", ");
}
wherePart.append(quoteIdentifier(key)).append(" = ").append(formatValue(beforeValue));
}
log.info("wherePart : {}", wherePart);
return "DELETE FROM " + dataChangeInfo.getTableName() + " WHERE " + wherePart;
}
}
复制代码
焦点代码如上所示,详细实现可自行设计。
五、源码获取
Github:incremental-sync-flink-cdc
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4