SpringBoot集成Flink-CDC,实现对数据库数据的监听

  金牌会员 | 2024-6-13 23:02:23 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 760|帖子 760|积分 2280

一、什么是 CDC ?

  CDC 是 Change Data Capture(变更数据获取) 的简称。 焦点思想是,监测并捕捉数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完备记载下来,写入到消息中间件中以供其他服务进行订阅及消耗。
二、Flink-CDC 是什么?

CDC Connectors for Apache Flink是一组用于Apache Flink 的源毗连器,利用变更数据捕捉 (CDC) 从差异数据库获取变更。用于 Apache Flink 的 CDC 毗连器将 Debezium 集成为捕捉数据更改的引擎。以是它可以充分发挥 Debezium 的能力。
大概意思就是,Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。

Flink-CDC 开源地址: Apache/Flink-CDC
Flink-CDC 中文文档:Apache Flink CDC | Apache Flink CDC
三、SpringBoot 整合 Flink-CDC

3.1、怎样集成到SpringBoot中?

Flink 作业通常独立于一样平常的服务之外,专门编写代码,用 Flink 命令行工具来运行和克制。将Flink 作业集成到 Spring Boot 应用中并不常见,而且一样平常也不发起这样做,由于Flink作业一样平常运行在大数据环境中。
然而,在特殊需求下,我们可以做一些改变使 Flink 应用适应 Spring Boot 环境,比如在你的场景中利用 Flink CDC 进行 数据变更捕捉。将 Flink 作业以本地项目的方式启动,集成在 Spring Boot应用中,可以利用到 Spring 的便利性。


  • CommandLineRunner
  • ApplicationRunner
3.2、集成举例

1、CommandLineRunner
  1. @SpringBootApplication
  2. public class MyApp {
  3.   public static void main(String[] args) {
  4.     SpringApplication.run(MyApp.class, args);
  5.   }
  6.   @Bean
  7.   public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
  8.     return args -> {
  9.       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10.       DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  11.               .hostname("localhost")
  12.               .port(3306)
  13.               .username("flinkuser")
  14.               .password("flinkpw")
  15.               .databaseList("mydb") // monitor all tables under "mydb" database
  16.               .tableList("mydb.table1", "mydb.table2") // monitor only "table1" and "table2" under "mydb" database
  17.               .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
  18.               .build();
  19.       DataStreamSource<String> mysqlSource = env.addSource(sourceFunction);
  20.       
  21.       // formulate processing logic here, e.g., printing to standard output
  22.       mysqlSource.print();
  23.       // execute the Flink job within the Spring Boot application
  24.       env.execute("Flink CDC");
  25.     };
  26.   }
  27. }
复制代码
2、ApplicationRunner
  1. @SpringBootApplication
  2. public class FlinkCDCApplication implements ApplicationRunner {
  3.     public static void main(String[] args) {
  4.         SpringApplication.run(FlinkCDCApplication.class, args);
  5.     }
  6.     @Override
  7.     public void run(ApplicationArguments args) throws Exception {
  8.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9.         // Configure your Flink job here
  10.         DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  11.                 .hostname("localhost")
  12.                 .port(3306)
  13.                 .username("flinkuser")
  14.                 .password("flinkpw")
  15.                 .databaseList("mydb")
  16.                 // set other source options ...
  17.                 .deserializer(new StringDebeziumDeserializationSchema()) // Converts SourceRecord to String
  18.                 .build();
  19.         DataStream<String> cdcStream = env.addSource(sourceFunction);
  20.         // Implement your processing logic here
  21.         // For example:
  22.         cdcStream.print();
  23.         // Start the Flink job within the Spring Boot application
  24.         env.execute("Flink CDC with Spring Boot");
  25.     }
  26. }
复制代码
这次用例采用 ApplicationRunner,不外要改变一下,让 Flink CDC 作为 Bean 来实现。
四、功能实现

4.1、功能逻辑


总体来讲,不太想把 Flink CDC单独拉出来,更想让它依托于一个服务上,彻底当成一个组件。
其中在生产者中,我们将要进行实现:

4.2、所需环境



  • MySQL 5.7 +:确保源数据库已经开启  Binlog 日志功能,并且设置 Row 格式
  • Spring Boot 2.7.6:还是不要轻易利用 3.0 以上为好,有很多多少jar没有适配
  • RabbitMQ:适配即可
  • Flink CDC:特别留意版本
4.3、Flink CDC POM依赖

  1. <flink.version>1.13.6</flink.version>
  2. <dependency>
  3.    <groupId>org.apache.flink</groupId>
  4.    <artifactId>flink-clients_2.12</artifactId>
  5.    <version>${flink.version}</version>
  6. </dependency>
  7.    <dependency>
  8.    <groupId>org.apache.flink</groupId>
  9.    <artifactId>flink-java</artifactId>
  10.    <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13.    <groupId>org.apache.flink</groupId>
  14.    <artifactId>flink-streaming-java_2.12</artifactId>
  15.    <version>${flink.version}</version>
  16. </dependency>
  17. <!--mysql -cdc-->
  18. <dependency>
  19.    <groupId>com.ververica</groupId>
  20.    <artifactId>flink-connector-mysql-cdc</artifactId>
  21.    <version>2.0.0</version>
  22. </dependency>
  23. <dependency>
  24.    <groupId>org.projectlombok</groupId>
  25.    <artifactId>lombok</artifactId>
  26.    <version>1.18.10</version>
  27. </dependency>
  28. <dependency>
  29.    <groupId>cn.hutool</groupId>
  30.    <artifactId>hutool-all</artifactId>
  31.    <version>5.8.5</version>
  32. </dependency>
  33. <dependency>
  34.     <groupId>org.apache.commons</groupId>
  35.     <artifactId>commons-lang3</artifactId>
  36.     <version>3.10</version>
  37. </dependency>
  38. <dependency>
  39.    <groupId>com.alibaba</groupId>
  40.    <artifactId>fastjson</artifactId>
  41.    <version>2.0.42</version>
  42. </dependency>
复制代码
上面是一些Flink CDC必须的依赖,固然如果需要实现其他数据库,可以替换其他数据库的CDC jar。怎么安排jar包的位置和其余需要的jar,这个可自行调整。
4.4、代码展示

焦点类


  • MysqlEventListener:设置类
  • MysqlDeserialization:MySQL消息读取自定义序列化
  • DataChangeInfo:封装的变更对象
  • DataChangeSink:继续一个Flink提供的抽象类,用于定义数据的输出或“下沉”逻辑,sink 是Flink处理流的最后阶段,通常用于将数据写入外部体系,如数据库、文件体系、消息队列等
(1)通过 ApplicationRunner 接入 SpringBoot

  1. @Component
  2. public class MysqlEventListener implements ApplicationRunner {
  3.     private final DataChangeSink dataChangeSink;
  4.     public MysqlEventListener(DataChangeSink dataChangeSink) {
  5.         this.dataChangeSink = dataChangeSink;
  6.     }
  7.     @Override
  8.     public void run(ApplicationArguments args) throws Exception {
  9.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10.         env.setParallelism(1);
  11.         DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSourceRemote();
  12.         DataStream<DataChangeInfo> streamSource = env
  13.                 .addSource(dataChangeInfoMySqlSource, "mysql-source")
  14.                 .setParallelism(1);
  15.         streamSource.addSink(dataChangeSink);
  16.         env.execute("mysql-stream-cdc");
  17.     }
  18.     private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSourceLocal() {
  19.         return MySqlSource.<DataChangeInfo>builder()
  20.                 .hostname("127.0.0.1")
  21.                 .port(3306)
  22.                 .username("root")
  23.                 .password("0507")
  24.                 .databaseList("flink-cdc-producer")
  25.                 .tableList("flink-cdc-producer.producer_content", "flink-cdc-producer.name_content")
  26.                 /*
  27.                  * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
  28.                  * latest:只进行增量导入(不读取历史变化)
  29.                  * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
  30.                  */
  31.                 .startupOptions(StartupOptions.latest())
  32.                 .deserializer(new MysqlDeserialization())
  33.                 .serverTimeZone("GMT+8")
  34.                 .build();
  35.     }
  36. }
复制代码
(2)自定义 MySQL 消息读取序列化

  1. public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
  2.     public static final String TS_MS = "ts_ms";
  3.     public static final String BIN_FILE = "file";
  4.     public static final String POS = "pos";
  5.     public static final String CREATE = "CREATE";
  6.     public static final String BEFORE = "before";
  7.     public static final String AFTER = "after";
  8.     public static final String SOURCE = "source";
  9.     public static final String UPDATE = "UPDATE";
  10.     /**
  11.      * 反序列化数据,转为变更JSON对象
  12.      */
  13.     @Override
  14.     public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
  15.         String topic = sourceRecord.topic();
  16.         String[] fields = topic.split("\\.");
  17.         String database = fields[1];
  18.         String tableName = fields[2];
  19.         Struct struct = (Struct) sourceRecord.value();
  20.         final Struct source = struct.getStruct(SOURCE);
  21.         DataChangeInfo dataChangeInfo = new DataChangeInfo();
  22.         dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
  23.         dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
  24.         //5.获取操作类型  CREATE UPDATE DELETE
  25.         Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  26. //        String type = operation.toString().toUpperCase();
  27. //        int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
  28.         dataChangeInfo.setEventType(operation.name());
  29.         dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
  30.         dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));
  31.         dataChangeInfo.setDatabase(database);
  32.         dataChangeInfo.setTableName(tableName);
  33.         dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
  34.         //7.输出数据
  35.         collector.collect(dataChangeInfo);
  36.     }
  37.     private Struct getStruct(Struct value, String fieldElement) {
  38.         return value.getStruct(fieldElement);
  39.     }
  40.     /**
  41.      * 从元数据获取出变更之前或之后的数据
  42.      */
  43.     private JSONObject getJsonObject(Struct value, String fieldElement) {
  44.         Struct element = value.getStruct(fieldElement);
  45.         JSONObject jsonObject = new JSONObject();
  46.         if (element != null) {
  47.             Schema afterSchema = element.schema();
  48.             List<Field> fieldList = afterSchema.fields();
  49.             for (Field field : fieldList) {
  50.                 Object afterValue = element.get(field);
  51.                 jsonObject.put(field.name(), afterValue);
  52.             }
  53.         }
  54.         return jsonObject;
  55.     }
  56.     @Override
  57.     public TypeInformation<DataChangeInfo> getProducedType() {
  58.         return TypeInformation.of(DataChangeInfo.class);
  59.     }
  60. }
复制代码
(3)封装的变更对象

  1. @Data
  2. public class DataChangeInfo implements Serializable {
  3.     /**
  4.      * 变更前数据
  5.      */
  6.     private String beforeData;
  7.     /**
  8.      * 变更后数据
  9.      */
  10.     private String afterData;
  11.     /**
  12.      * 变更类型 1新增 2修改 3删除
  13.      */
  14.     private String eventType;
  15.     /**
  16.      * binlog文件名
  17.      */
  18.     private String fileName;
  19.     /**
  20.      * binlog当前读取点位
  21.      */
  22.     private Integer filePos;
  23.     /**
  24.      * 数据库名
  25.      */
  26.     private String database;
  27.     /**
  28.      * 表名
  29.      */
  30.     private String tableName;
  31.     /**
  32.      * 变更时间
  33.      */
  34.     private Long changeTime;
  35. }
复制代码
  这里的 beforeData 、afterData直接存储 Struct 不好吗,还得费劲去来回转?
  
  我曾实行过利用 Struct 存放在对象中,但是无法进行序列化。详细原因可以网上搜索,大概自己实行一下。
  (4)定义 Flink 的 Sink

  1. @Component
  2. @Slf4j
  3. public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {
  4.     transient RabbitTemplate rabbitTemplate;
  5.     transient ConfirmService confirmService;
  6.     transient TableDataConvertService tableDataConvertService;
  7.     @Override
  8.     public void invoke(DataChangeInfo value, Context context) {
  9.         log.info("收到变更原始数据:{}", value);
  10.         //转换后发送到对应的MQ
  11.         if (MIGRATION_TABLE_CACHE.containsKey(value.getTableName())) {
  12.             String routingKey = MIGRATION_TABLE_CACHE.get(value.getTableName());
  13.             //可根据需要自行进行confirmService的设计
  14.             rabbitTemplate.setReturnsCallback(confirmService);
  15.             rabbitTemplate.setConfirmCallback(confirmService);
  16.             rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, tableDataConvertService.convertSqlByDataChangeInfo(value));
  17.         }
  18.     }
  19.     /**
  20.      * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,
  21.      * 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,
  22.      * 故在Flink自定义的Sink的open()方法中初始化Spring容器
  23.      */
  24.     @Override
  25.     public void open(Configuration parameters) throws Exception {
  26.         super.open(parameters);
  27.         this.rabbitTemplate = ApplicationContextUtil.getBean(RabbitTemplate.class);
  28.         this.confirmService = ApplicationContextUtil.getBean(ConfirmService.class);
  29.         this.tableDataConvertService = ApplicationContextUtil.getBean(TableDataConvertService.class);
  30.     }
  31. }
复制代码
(5)数据转换类接口和实现类

  1. public interface TableDataConvertService {
  2.     String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo);
  3. }
复制代码
  1. @Service
  2. public class TableDataConvertServiceImpl implements TableDataConvertService {
  3.     @Autowired
  4.     Map<String, SqlGeneratorService> sqlGeneratorServiceMap;
  5.     @Override
  6.     public String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo) {
  7.         SqlGeneratorService sqlGeneratorService = sqlGeneratorServiceMap.get(dataChangeInfo.getEventType());
  8.         return sqlGeneratorService.generatorSql(dataChangeInfo);
  9.     }
  10. }
复制代码
由于在 dataChangeInfo 中我们有封装对象的类型(CREATEDELETEUPDATE),以是我盼望通过差异类来进行差异的工作。于是就有了下面的类结构:

根据 dataChangeInfo 的类型去生成对应的 SqlGeneratorServiceImpl
   这是策略模式还是模板方法?
  
  策略模式(Strategy Pattern)答应在运行时选择算法的行为。在策略模式中,定义了一系列的算法(策略),并将每一个算法封装起来,使它们可以相互替换。策略模式答应算法独立于利用它的客户端进行变化。
  InsertSqlGeneratorServiceImpl、UpdateSqlGeneratorServiceImpl 和 DeleteSqlGeneratorServiceImpl 各自实现了 SqlGeneratorService 接口,这确实表明白一种策略。每一个实现类表现一个特定的SQL生成策略,并且可以相互替换,只要它们遵守同一个接口。
  
  模板方法模式(Template Method Pattern),则偏重于在抽象类中定义算法的框架,让子类实现算法的某些步调而不改变算法的结构。AbstractSqlGenerator 作为抽象类的存在是为了被继续,但如果它不含有模板方法(即没有定义算法骨架的方法),那它就不符合模板方法模式。
  
在实际应用中,一个设计可能同时团结了多个设计模式,大概在某些环境下,一种设计模式的实现可能看起来与另一种模式类似。在这种环境下,若 AbstractSqlGenerator 提供了更多的共享代码或默认实现表现出框架脚色,那么它可能更接近模板方法。而如果 AbstractSqlGenerator 仅仅作为一种接口实现方式,且策略之间可以相互替换,那么这确实更符合策略模式。
  值得留意的是,在 TableDataConvertServiceImpl 中,我们注入了一个 Map<String, SqlGeneratorService> sqlGeneratorServiceMap,通过它来进行详细实现类的获取。那么他是个什么东西呢?作用是什么呢?为什么可以通过它来获取呢?
@Resource、@Autowired 标注作用于 Map 类型时,如果 Map 的 key 为 String 类型,则 Spring 会将容器中所有类型符合 Map 的 value 对应的类型的 Bean 增加进来,用 Bean 的 id 或 name 作为 Map 的 key。
那么可以看到下面第六步,在进行DeleteSqlGeneratorServiceImpl装配的时间进行指定了名字@Service("DELETE"),方便通过dataChangeInfo获取。
(6)转换类部门代码

  1. public interface SqlGeneratorService {
  2.     String generatorSql(DataChangeInfo dataChangeInfo);
  3. }
  4. public abstract class AbstractSqlGenerator implements SqlGeneratorService {
  5.     @Override
  6.     public String generatorSql(DataChangeInfo dataChangeInfo) {
  7.         return null;
  8.     }
  9.      public String quoteIdentifier(String identifier) {
  10.         // 对字段名进行转义处理,这里简化为对其加反引号
  11.         // 实际应该处理数据库标识符的特殊字符
  12.         return "`" + identifier + "`";
  13.     }
  14. }
复制代码
  1. @Service("DELETE")
  2. @Slf4j
  3. public class DeleteSqlGeneratorServiceImpl extends AbstractSqlGenerator {
  4.     @Override
  5.     public String generatorSql(DataChangeInfo dataChangeInfo) {
  6.         String beforeData = dataChangeInfo.getBeforeData();
  7.         Map<String, Object> beforeDataMap = JSONObjectUtils.JsonToMap(beforeData);
  8.         StringBuilder wherePart = new StringBuilder();
  9.         for (String key : beforeDataMap.keySet()) {
  10.             Object beforeValue = beforeDataMap.get(key);
  11.             if ("create_time".equals(key)){
  12.                 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  13.                 beforeValue = dateFormat.format(beforeValue);
  14.             }
  15.             if (wherePart.length() > 0) {
  16.                 // 不是第一个更改的字段,增加逗号分隔
  17.                 wherePart.append(", ");
  18.             }
  19.             wherePart.append(quoteIdentifier(key)).append(" = ").append(formatValue(beforeValue));
  20.         }
  21.         log.info("wherePart : {}", wherePart);
  22.         return "DELETE FROM " + dataChangeInfo.getTableName() + " WHERE " + wherePart;
  23.     }
  24. }
复制代码
焦点代码如上所示,详细实现可自行设计。
五、源码获取

Github:incremental-sync-flink-cdc

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表