基于Flink MySQL CDC技术实现生意业务告警

打印 上一主题 下一主题

主题 845|帖子 845|积分 2535

前言

CDC 的全称是 Change Data Capture,是一种用于捕获数据库变动数据的技术。例如 MySQL 对数据的所有变动都会写入到 binlog,CDC 就可以通过监听 binlog 文件来实现对 MySQL 数据变动的捕获,然后做进一步的处理惩罚。
Flink CDC 将CDC技术和 Flink 流盘算整合到一起,把CDC捕获到的数据变动作为 Flink数据源,以实现对数据变动的流式处理惩罚。通过 Flink CDC 可以轻松实现以下功能:


  • 数据同步 将一个数据库中的数据变化实时同步到另一个数据库或数据存储中,实现数据的实时备份和迁移
  • 实时数据分析 捕获数据库的变动数据,并将其作为实时流数据源输入到 Flink 举行实时分析和处理惩罚,举行实时报表生成、实时监控、实时推荐等应用
  • 数据集成和 ETL 在数据集成和 ETL过程中,使用 Flink CDC 可以实现对源数据库的实时数据抽取,然后举行数据转换和加载到目的体系中
本文就来实现一个简朴的 Flink 作业,通过 Flink CDC 技术来监控 MySQL 中的用户生意业务记录,针对频繁生意业务和大额生意业务举行风控诉警。
需求描述

用户生意业务记录存储在 MySQL 的 user_trade 表中,编写一个 Flink 作业实现对用户生意业务记录的监听,针对每个用户在一分钟内,若生意业务次数超过十次,或者生意业务金额超过一万元的,生成一条生意业务告警记录并写入 user_trade_alert 表,由业务体系触发告警操作。
需求实现

前期准备

执行DDL语句,完成表的创建
  1. CREATE TABLE user_trade
  2. (
  3.     id         BIGINT(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  4.     user_id    BIGINT(20) NOT NULL,
  5.     amount     BIGINT(20) NOT NULL,
  6.     trade_time DATETIME   NOT NULL
  7. ) COMMENT '用户交易';
  8. CREATE TABLE user_trade_alert
  9. (
  10.     id           BIGINT(20)    NOT NULL AUTO_INCREMENT PRIMARY KEY,
  11.     user_id      BIGINT(20)    NOT NULL,
  12.     alert_reason VARCHAR(1024) NOT NULL
  13. ) COMMENT '用户交易告警';
复制代码
引入Maven依赖,mysql-connector-java 是底子,因为要读写MySQL数据库;flink-connector-jdbc 是 Flink 提供的JDBC 连接器,用于 Flink 读写数据库;flink-connector-mysql-cdc 是 Flink 提供的针对 MySQL 的 CDC 实现。
  1. <dependency>
  2.     <groupId>mysql</groupId>
  3.     <artifactId>mysql-connector-java</artifactId>
  4.     <version>8.0.31</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.flink</groupId>
  8.     <artifactId>flink-connector-jdbc</artifactId>
  9.     <version>3.2.0-1.19</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>org.apache.flink</groupId>
  13.     <artifactId>flink-connector-mysql-cdc</artifactId>
  14.     <version>3.2.0</version>
  15. </dependency>
复制代码
Flink作业编写

1、先对数据实体建模。UserTrade 对应用户生意业务记录,UserTradeStat 对应用户生意业务的统计,UserTradeAlert 对应用户生意业务的告警。
  1. @Data
  2. public static class UserTrade {
  3.     private Long id;
  4.     @JSONField(name = "user_id")
  5.     private Long userId;
  6.     private Long amount;
  7.     @JSONField(name = "trade_time")
  8.     private Long tradeTime;
  9. }
  10. @Data
  11. @NoArgsConstructor
  12. @AllArgsConstructor
  13. public static class UserTradeAlert {
  14.     private Long userId;
  15.     private String alertReason;
  16. }
  17. @Data
  18. @NoArgsConstructor
  19. @AllArgsConstructor
  20. public static class UserTradeStat {
  21.     private Long userId;
  22.     private long totalAmount;
  23.     private long total;
  24. }
复制代码
2、构建 MySqlSource,Flink cdc 的核心数据源,它会连接到 MySQL 主库并消耗 binlog 日记来获取数据变动记录,然后转发给下游算子处理惩罚。
  1. private static MySqlSource<UserTrade> ofMySqlSource(String database, String table) {
  2.     return MySqlSource.<UserTrade>builder()
  3.             .hostname(DB_HOST)
  4.             .port(DB_PORT)
  5.             .databaseList(database)
  6.             .tableList(database + "." + table)
  7.             .username(DB_USERNAME)
  8.             .password(DB_PASSWORD)
  9.             .deserializer(new UserTradeDeserialization()).build();
  10. }
复制代码
Flink CDC 获取到的数据被封装成org.apache.kafka.connect.source.SourceRecord 类,我们要自界说反序列化器,将获取到的日记记录转化成 UserTrade 实体对象。
  1. public static class UserTradeDeserialization implements DebeziumDeserializationSchema<UserTrade> {
  2.     transient JsonConverter jsonConverter;
  3.     @Override
  4.     public void deserialize(SourceRecord record, Collector<UserTrade> collector) throws Exception {
  5.         // 交易数据,不考虑[删改]的场景
  6.         byte[] bytes = getJsonConverter().fromConnectData(record.topic(), record.valueSchema(), record.value());
  7.         UserTrade userTrade = JSON.parseObject(bytes).getJSONObject("payload").getObject("after", UserTrade.class);
  8.         // 时区问题
  9.         userTrade.setTradeTime(userTrade.getTradeTime() - Duration.ofHours(8L).toMillis());
  10.         collector.collect(userTrade);
  11.     }
  12.     @Override
  13.     public TypeInformation<UserTrade> getProducedType() {
  14.         return TypeInformation.of(UserTrade.class);
  15.     }
  16.     JsonConverter getJsonConverter() {
  17.         if (jsonConverter == null) {
  18.             this.jsonConverter = new JsonConverter();
  19.             HashMap<String, Object> configs = new HashMap(2);
  20.             configs.put("converter.type", ConverterType.VALUE.getName());
  21.             configs.put("schemas.enable", true);
  22.             this.jsonConverter.configure(configs);
  23.         }
  24.         return jsonConverter;
  25.     }
  26. }
复制代码
3、因为是统计每分钟内用户的生意业务次数和生意业务额度,必然要用到窗口盘算,所以要编写窗口处理惩罚函数。UserTradeAggregateFunction 是对窗口内数据的聚合处理惩罚,UserTradeWindowFunction 是对窗口盘算结果的处理惩罚,假如满意告警规则,则会生成一个 UserTradeAlert 对象交给下游算子处理惩罚。
  1. public static class UserTradeAggregateFunction implements AggregateFunction<UserTrade, UserTradeStat, UserTradeStat> {
  2.     @Override
  3.     public UserTradeStat createAccumulator() {
  4.         return new UserTradeStat();
  5.     }
  6.     @Override
  7.     public UserTradeStat add(UserTrade value, UserTradeStat accumulator) {
  8.         System.err.println("add:" + value);
  9.         accumulator.setUserId(value.getUserId());
  10.         accumulator.setTotalAmount(accumulator.getTotalAmount() + value.amount);
  11.         accumulator.setTotal(accumulator.getTotal() + 1);
  12.         return accumulator;
  13.     }
  14.     @Override
  15.     public UserTradeStat getResult(UserTradeStat accumulator) {
  16.         return accumulator;
  17.     }
  18.     @Override
  19.     public UserTradeStat merge(UserTradeStat a, UserTradeStat b) {
  20.         return null;
  21.     }
  22. }
  23. public static class UserTradeWindowFunction implements WindowFunction<UserTradeStat, UserTradeAlert, Long, TimeWindow> {
  24.     @Override
  25.     public void apply(Long key, TimeWindow timeWindow, Iterable<UserTradeStat> iterable, Collector<UserTradeAlert> collector) throws Exception {
  26.         System.err.println("win:" + timeWindow.getStart() + "," + timeWindow.getEnd());
  27.         iterable.forEach(stat -> {
  28.             if (stat.getTotal() > 10 || stat.getTotalAmount() > 1000000) {
  29.                 LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timeWindow.getStart()), ZoneId.systemDefault());
  30.                 LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timeWindow.getEnd()), ZoneId.systemDefault());
  31.                 String alertReason = "用户: " + key + " 在[" + startTime + "," + endTime + "]期间产生交易:"
  32.                         + stat.getTotal() + "笔,交易金额:" + stat.getTotalAmount() + ",触发告警规则.";
  33.                 collector.collect(new UserTradeAlert(key, alertReason));
  34.             }
  35.         });
  36.     }
  37. }
复制代码
4、窗口算子生成的 UserTradeAlert 对象会交给 Sink 算子写进数据库里,所以最后还差一个 SinkFunction,因为是写进MySQL,所以这里构建一个 JdbcSink。
  1. private static SinkFunction<UserTradeAlert> ofMysqlSink() {
  2.     return JdbcSink.<UserTradeAlert>sink("INSERT INTO user_trade_alert (user_id,alert_reason) VALUES (?,?)",
  3.             (ps, value) -> {
  4.                 ps.setLong(1, value.getUserId());
  5.                 ps.setString(2, value.getAlertReason());
  6.             }, JdbcExecutionOptions.builder()
  7.                     .withBatchIntervalMs(100)
  8.                     .withMaxRetries(0)
  9.                     .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  10.                     .withUrl("jdbc:mysql://ip:port/flink_db?useSSL=false")
  11.                     .withUsername(DB_USERNAME)
  12.                     .withPassword(DB_PASSWORD)
  13.                     .withDriverName("com.mysql.cj.jdbc.Driver")
  14.                     .build());
  15. }
复制代码
5、所有组件都写完了,最后就是启动 Flink 执行环境,把整个作业串起来。
  1. private static final String DB_HOST = "your_ip";
  2. private static final int DB_PORT = 3306;
  3. private static final String DB_USERNAME = "admin";
  4. private static final String DB_PASSWORD = "xxx";
  5. public static void main(String[] args) throws Exception {
  6.     StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  7.     environment.fromSource(ofMySqlSource("flink_db", "user_trade"), WatermarkStrategy
  8.                     .<UserTrade>forMonotonousTimestamps()
  9.                     .withTimestampAssigner((value, time) -> value.getTradeTime()), "mysql-cdc")
  10.             .setParallelism(1)
  11.             .keyBy(UserTrade::getUserId)
  12.             .window(TumblingEventTimeWindows.of(Duration.ofSeconds(60L)))
  13.             .aggregate(new UserTradeAggregateFunction(), new UserTradeWindowFunction())
  14.             .addSink(ofMysqlSink());
  15.     environment.execute();
  16. }
复制代码
功能验证

提交并运行Flink作业,往MySQL user_trade 表写入一些用户生意业务记录,当这些生意业务记录触发告警规则时,Flink 作业就会往 user_trade_alert 表生成如下告警记录示例:
iduser_idalert_reason11用户: 1 在[2024-09-13T15:29:10,2024-09-13T15:29:15]期间产生生意业务:1笔,生意业务金额:9999999,触发告警规则.32用户: 2 在[2024-09-13T15:30:25,2024-09-13T15:30:30]期间产生生意业务:11笔,生意业务金额:21,触发告警规则. 尾巴

Flink MySQL CDC 具有强大的功能特性,为实时数据处理惩罚提供了高效可靠的解决方案。它能够实现对 MySQL 数据库的实时数据捕获,确保数据的实时性和正确性。可以快速响应数据库中的数据变化,将变动数据实时传输到 Flink 流处理惩罚引擎中举行进一步的分析和处理惩罚。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

前进之路

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表