Flink CDC 同步 Mysql 数据

诗林  金牌会员 | 2024-12-13 03:06:08 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 517|帖子 517|积分 1551

一、Flink CDC、Flink、CDC各有啥关系

  Flink:流式盘算框架,不包含 Flink CDC,和 Flink CDC不要紧
  CDC:是一种思想,理念,不涉及某一门具体的技术。CDC 是变更数据捕捉(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变更记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。目前专业做数据库事件担当和剖析的中间件是Debezium,如果是捕捉Mysql,还有Canal。
  Flink CDC:是 CDC 的一种实现而已,不属于 Flink 子版块。这个技术是阿里开辟的。目的是为了丰富 Flink 的生态。
1.1 概述

  Flink CDC 基于数据库日志的 Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 良好的管道能力和丰富的上下游生态,支持捕捉多种数据库的变更,并将这些变更实时同步到下游存储。
1.2 和 jdbc Connectors 对比

  JDBC Connectors 连接器,确实可以读取外部的 数据库。比如:MySQL、Oracle、SqlServer等。但是,JDBC连数据库,只是瞬时操纵,没办法连续监听数据库的数据变化。
  Flink CDC Connectors,可以实现数据库的变更捕捉,能够连续不断地把变更数据同步到下游的体系中。
官网概述:https://ververica.github.io/flink-cdc-connectors/
github链接:https://github.com/ververica/flink-cdc-connectors
二、利用

  FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是Flink DataStream 和 Table API 的方式。
  我这里直接用的是 ieda 测试的 DataStream 方式。
  代码来自:https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo
  CloudAcctProfit2DwsHdjProfitRecordAPI.java
  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  4. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  5. import com.xiaoqiang.utils.JdbcUtil;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.apache.commons.lang3.time.DateFormatUtils;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.api.common.functions.FilterFunction;
  10. import org.apache.flink.api.common.functions.FlatMapFunction;
  11. import org.apache.flink.configuration.Configuration;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  16. import org.apache.flink.util.Collector;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import java.sql.Connection;
  20. import java.sql.DriverManager;
  21. import java.sql.ResultSet;
  22. import java.sql.Statement;
  23. import java.util.*;
  24. public class CloudAcctProfit2DwsHdjProfitRecordAPI {
  25.     private static final Logger LOG = LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);
  26.     private static String MYSQL_HOST = "x.x.x.x";
  27.     private static int MYSQL_PORT = 3306;
  28.     private static String MYSQL_USER = "root";
  29.     private static String MYSQL_PASSWD = "xiaoqiang";
  30.     private static String SYNC_DB = "league_test";
  31.     private static List<String> SYNC_TABLES = Arrays.asList("league_test.oc_settle_profit");
  32.     public static void main(String[] args) throws Exception {
  33.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  34.                 .hostname(MYSQL_HOST)
  35.                 .port(MYSQL_PORT)
  36.                 .databaseList(SYNC_DB) // set captured database
  37.                 .tableList(String.join(",", SYNC_TABLES)) // set captured table
  38.                 .username(MYSQL_USER)
  39.                 .password(MYSQL_PASSWD)
  40.                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  41.                 .build();
  42.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  43.         env.setParallelism(1);
  44.         env.enableCheckpointing(5000);
  45.         DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + "xiaoqiang-flink");
  46.         List<String> tableList = getTableList();
  47.         System.out.println("tableList--->"+tableList);
  48.         for (String tbl : tableList) {
  49.             SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, "oc_settle_profit");
  50. //            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);
  51.             // 流的数据sink出去
  52.             filterStream.addSink(new CustomDealDataSink())
  53.                     .name("sink " + tbl);
  54.         }
  55.         env.execute("xiaoqiang-flink");
  56.     }
  57.     /**
  58.      * 自定义sink
  59.      */
  60.     private static class CustomDealDataSink extends RichSinkFunction<String> {
  61.         private transient Connection coalitiondbConnection;
  62.         private transient Statement coalitiondbStatement;
  63.         private transient Connection cloudConnection;
  64.         private transient Statement cloudStatement;
  65.         @Override
  66.         public void open(Configuration parameters) throws Exception {
  67.             super.open(parameters);
  68.             // 在这里初始化 JDBC 连接
  69.             coalitiondbConnection = DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/league_test", "root", "");
  70.             coalitiondbStatement = coalitiondbConnection.createStatement();
  71.             cloudConnection = DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/cloud_test", "root", "");
  72.             cloudStatement = cloudConnection.createStatement();
  73.         }
  74.         @Override
  75.         public void invoke(String value, Context context) throws Exception {
  76.             // 解析拿到的CDC-JSON数据
  77.             JSONObject rowJson = JSON.parseObject(value);
  78.             String outNo = rowJson.getString("out_no");
  79.             Integer userType = rowJson.getInteger("user_type");
  80.             String id = rowJson.getString("id");
  81.             String payOrderNo = rowJson.getString("pay_order_no");
  82.             String title = rowJson.getString("title");
  83.             String fromUserId = rowJson.getString("from_user_id");
  84.             String fromAccountId = rowJson.getString("from_account_id");
  85.             String userId = rowJson.getString("user_id");
  86.             String accountId = rowJson.getString("account_id");
  87.             Integer amount = rowJson.getInteger("amount");
  88.             Integer profitState = rowJson.getInteger("profit_state");
  89.             Date profitTime = rowJson.getTimestamp("profit_time");
  90.             Integer refundState = rowJson.getInteger("refund_state");
  91.             Date refundTime = rowJson.getTimestamp("refund_time");
  92.             Date addTime = rowJson.getTimestamp("add_time");
  93.             String remark = rowJson.getString("remark");
  94.             String acctCircle = rowJson.getString("acct_circle");
  95.             Integer fromUserType = rowJson.getInteger("from_user_type");
  96.             String companyId = rowJson.getString("company_id");
  97.             String bizCompanyId = rowJson.getString("biz_company_id");
  98. //            if (1 != profitState || !"PG11111".equals(acctCircle)) {
  99. //                return;
  100. //            }
  101. //
  102. //            // 读取相关表的数据(与其他表进行关联)
  103. //            Integer bizType = null;
  104. //            String contributeUserId = null;
  105. //            String relationBrandOwnerId = null;
  106. //            ResultSet virtualOrderResultSet = coalitiondbStatement.executeQuery("select * from tc_virtual_order where order_type != 2 and id = '" + outNo + "'");
  107. //            // 如果是tc_virtual_order订单(上岗卡、安心卡、课程)
  108. //            if (virtualOrderResultSet.next()) {
  109. //                // 处理数据逻辑
  110. //                Integer virtualOrder4OrderType = virtualOrderResultSet.getInt("order_type");
  111. //                String virtualOrder4CompanyId = virtualOrderResultSet.getString("company_id");
  112. //                String virtualOrder4BrandId = virtualOrderResultSet.getString("brand_id");
  113. //                // 上岗卡订单排掉,因为已经有别的任务处理了
  114. //                if (virtualOrder4OrderType == 2) {
  115. //                    return;
  116. //                }
  117. //                // orderType转换
  118. //                if (virtualOrder4OrderType == 6) {
  119. //                    bizType = 10;
  120. //                } else if (virtualOrder4OrderType == 1) {
  121. //                    bizType = 11;
  122. //                } else if (virtualOrder4OrderType == 5) {
  123. //                    bizType = 12;
  124. //                }
  125. //                // userType转换
  126. //                if (virtualOrder4OrderType == 6 && userType == 92) {
  127. //                    contributeUserId = virtualOrder4CompanyId;
  128. //                } else if (virtualOrder4OrderType == 1 && userType == 92) {
  129. //                    contributeUserId = virtualOrder4CompanyId;
  130. //                } else if (virtualOrder4OrderType == 5 && userType == 92) {
  131. //                    contributeUserId = virtualOrder4CompanyId;
  132. //                }
  133. //                // relationBrandOwnerId转换
  134. //                if (virtualOrder4OrderType == 6 && userType == 90) {
  135. //                    relationBrandOwnerId = virtualOrder4BrandId;
  136. //                } else if (virtualOrder4OrderType == 1 && userType == 90) {
  137. //                    relationBrandOwnerId = virtualOrder4BrandId;
  138. //                } else if (virtualOrder4OrderType == 5 && userType == 90) {
  139. //                    relationBrandOwnerId = virtualOrder4BrandId;
  140. //                }
  141. //                // remark转换
  142. //                if (virtualOrder4OrderType == 1 || virtualOrder4OrderType == 5) {
  143. //                    remark = title;
  144. //                }
  145. //            } else {
  146. //                // 如果不是tc_virtual_order的数据,则可能是其他数据,此处只保留好到家实物商品数据
  147. //                if (StringUtils.isBlank(payOrderNo)) {
  148. //                    return;
  149. //                }
  150. //                ResultSet acctPayOrderResultSet = cloudStatement.executeQuery("select * from acct_pay_order t where t.id = '" + payOrderNo + "'");
  151. //                if (!acctPayOrderResultSet.next()) {
  152. //                    return;
  153. //                }
  154. //                Integer payCate = acctPayOrderResultSet.getInt("pay_cate");
  155. //                if (200100 != payCate) { // 好到家实物商品类型
  156. //                    return;
  157. //                }
  158. //
  159. //                bizType = 20;
  160. //                if (userType == 92 && StringUtils.isNotBlank(bizCompanyId)) {
  161. //                    contributeUserId = bizCompanyId;
  162. //                } else if (userType == 90 && StringUtils.isNotBlank(bizCompanyId)) {
  163. //                    ResultSet brandOwnerIdResultSet = cloudStatement.executeQuery("select * from uc_brand_partner t where t.company_id = '" + bizCompanyId + "'");
  164. //                    if (brandOwnerIdResultSet.next()) {
  165. //                        relationBrandOwnerId = brandOwnerIdResultSet.getString("brand_owner_id");
  166. //                    }
  167. //                }
  168. //            }
  169. //            if (StringUtils.isBlank(remark)) {
  170. //                remark = title;
  171. //            }
  172.             // 数据写入到mysql
  173.             String insertSql = "INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n" +
  174.                     "                                                    user_type, amount, profit_time, state, acct_circle, biz_type,\n" +
  175.                     "                                                    contribute_user_id, relation_brand_owner_id, remark, add_time)\n" +
  176.                     "VALUES ('" + id + "', '" + "JSD" + id + "', '" + outNo + "', '" + fromUserId + "', " + fromUserType + ", '" + userId + "', " + userType + ",\n" +
  177.                     "        " + amount + ", '" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")) + "', " + profitState + ", '" + acctCircle + "', " + 1 + ", " + (StringUtils.isBlank("123") ? null : "'" + "contributeUserId" + "'") + ", " + (StringUtils.isBlank("relationBrandOwnerId") ? null : "'" + "relationBrandOwnerId" + "'") + ", '" + remark + "',\n" +
  178.                     "        '" + DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss", TimeZone.getTimeZone("GMT")) + "');";
  179.             cloudStatement.execute("delete from dws_profit_record_hdj_flink_api where id = '" + id + "'");
  180.             System.out.println("insertSql--->"+insertSql);
  181.             cloudStatement.execute(insertSql);
  182.         }
  183.         @Override
  184.         public void close() throws Exception {
  185.             super.close();
  186.             // 在这里关闭 JDBC 连接
  187.             coalitiondbStatement.close();
  188.             coalitiondbConnection.close();
  189.             cloudStatement.close();
  190.             cloudConnection.close();
  191.         }
  192.     }
  193.     /**
  194.      * 清晰数据
  195.      *
  196.      * @param source
  197.      * @return
  198.      */
  199.     private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {
  200.         return source.flatMap(new FlatMapFunction<String, String>() {
  201.             @Override
  202.             public void flatMap(String row, Collector<String> out) throws Exception {
  203.                 try {
  204.                     LOG.info("============================row:{}", row);
  205.                     JSONObject rowJson = JSON.parseObject(row);
  206.                     String op = rowJson.getString("op");
  207.                     //history,insert,update
  208.                     if (Arrays.asList("r", "c", "u").contains(op)) {
  209.                         out.collect(rowJson.getJSONObject("after").toJSONString());
  210.                     } else {
  211.                         LOG.info("filter other op:{}", op);
  212.                     }
  213.                 } catch (Exception ex) {
  214.                     LOG.warn("filter other format binlog:{}", row);
  215.                 }
  216.             }
  217.         });
  218.     }
  219.     /**
  220.      * 过滤数据
  221.      *
  222.      * @param source
  223.      * @param table
  224.      * @return
  225.      */
  226.     private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {
  227.         return source.filter(new FilterFunction<String>() {
  228.             @Override
  229.             public boolean filter(String row) throws Exception {
  230.                 try {
  231.                     JSONObject rowJson = JSON.parseObject(row);
  232.                     JSONObject source = rowJson.getJSONObject("source");
  233.                     String tbl = source.getString("table");
  234.                     return table.equals(tbl);
  235.                 } catch (Exception ex) {
  236.                     ex.printStackTrace();
  237.                     return false;
  238.                 }
  239.             }
  240.         });
  241.     }
  242.     private static List<String> getTableList() {
  243.         List<String> tables = new ArrayList<>();
  244.         String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
  245.         List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);
  246.         for (JSONObject jsob : tableList) {
  247.             String schemaName = jsob.getString("TABLE_SCHEMA");
  248.             String tblName = jsob.getString("TABLE_NAME");
  249.             String schemaTbl = schemaName + "." + tblName;
  250.             if (SYNC_TABLES.contains(schemaTbl)) {
  251.                 tables.add(tblName);
  252.             }
  253.         }
  254.         return tables;
  255.     }
  256. }
复制代码
  JdbcUtil.java
  1. import com.alibaba.fastjson.JSONObject;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.sql.*;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. public class JdbcUtil {
  8.     static {
  9.         try {
  10. //            Class.forName("com.mysql.cj.jdbc.Driver");
  11.             Class.forName("com.mysql.jdbc.Driver");
  12.         } catch (ClassNotFoundException e) {
  13.             e.printStackTrace();
  14.         }
  15.     }
  16.     private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
  17.     public static void main(String[] args) throws SQLException {
  18.     }
  19.     public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql) {
  20.         List<JSONObject> beJson = new ArrayList<>();
  21.         String connectionUrl = String.format("jdbc:mysql://%s:%s/league_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai", hostUrl, port);
  22.         Connection con = null;
  23.         try {
  24.             con = DriverManager.getConnection(connectionUrl, user, password);
  25.             PreparedStatement ps = con.prepareStatement(sql);
  26.             ResultSet rs = ps.executeQuery();
  27.             beJson = resultSetToJson(rs);
  28.         } catch (SQLException e) {
  29.             e.printStackTrace();
  30.         } catch (Exception e) {
  31.             e.printStackTrace();
  32.         } finally {
  33.             try {
  34.                 con.close();
  35.             } catch (Exception e) {
  36.             }
  37.         }
  38.         return beJson;
  39.     }
  40.     private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
  41.         List<JSONObject> list = new ArrayList<>();
  42.         ResultSetMetaData metaData = rs.getMetaData();
  43.         int columnCount = metaData.getColumnCount();
  44.         while (rs.next()) {
  45.             JSONObject jsonObj = new JSONObject();
  46.             for (int i = 1; i <= columnCount; i++) {
  47.                 String columnName = metaData.getColumnLabel(i);
  48.                 String value = rs.getString(columnName);
  49.                 jsonObj.put(columnName, value);
  50.             }
  51.             list.add(jsonObj);
  52.         }
  53.         return list;
  54.     }
  55. }
复制代码
  pom.xml:
  1.         <dependency>
  2.             <groupId>com.ververica</groupId>
  3.             <artifactId>flink-connector-mysql-cdc</artifactId>
  4.             <version>2.4.0</version>
  5.         </dependency>
复制代码
2.1 Mysql 打开 bin-log 功能

  og_bin 的Value如果为ON代表开启,如果为OFF代表关闭,MySQL8.0默认是开启的:
  1. # 查看是否开启binlog
  2. mysql> SHOW VARIABLES LIKE '%log_bin%';
复制代码
  关闭状态:



  • log_bin为ON代表MySQL已经开启binlog日志记录
  • log_bin_basename设置了binlog的文件路径及文件前缀名
  • log_bin_index设置了binlog索引文件的路径
  开启状态:

  1. # 在centos中mysql的配置文件一般都在/etc/mysql目录下,如果不在可以通过 find / -name "my.cnf" 查找
  2. vi /etc/mysql/my.cnf
  3. # 服务ID
  4. server-id=1
  5. # binlog 配置 只要配置了log_bin地址 就会开启
  6. log_bin = /var/lib/mysql/mysql_bin
  7. # 日志存储天数 默认0 永久保存
  8. # 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
  9. expire_logs_days = 30
  10. # binlog最大值
  11. max_binlog_size = 1024M
  12. # 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
  13. binlog_format = ROW
  14. # 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
  15. sync_binlog = 1
  16. # 重启MySQL服务使配置生效
  17. systemctl restart mysqld / service mysql restart
  18. # 查看日志列表
  19. SHOW MASTER LOGS;
复制代码
可参考:MySQL 开启设置binlog以及通过binlog恢复数据
2.2 在 Mysql 中建库建表准备

  1. CREATE DATABASE IF NOT EXISTS cloud_test;
  2. CREATE DATABASE IF NOT EXISTS league_test;
  3. CREATE TABLE league_test.oc_settle_profit (
  4.         id                           varchar(32),
  5.         show_profit_id               varchar(32),
  6.         order_no                     varchar(32),
  7.         from_user_id                 varchar(32),
  8.         from_user_type               int(11),
  9.         user_id                      varchar(32),
  10.         user_type                    int(11),
  11.         rate                         int(11),
  12.         amount                       int(11),
  13.         type                         int(11),
  14.         add_time                     datetime,
  15.         state                        int(11),
  16.         expect_profit_time           datetime,
  17.         profit_time                  datetime,
  18.         profit_mode                  int(11),
  19.         opt_code                     varchar(32),
  20.         opt_name                     varchar(32),
  21.         acct_circle                  varchar(32),
  22.         process_state                int(11),
  23.         parent_id                    varchar(32),
  24.         keep_account_from_user_id    varchar(32),
  25.         keep_account_from_bm_user_id varchar(32),
  26.         keep_account_user_id         varchar(32),
  27.         keep_account_bm_user_id      varchar(32),
  28.         biz_type                     int(11),
  29.         remark                       varchar(32),
  30.         contribute_user_id           varchar(32),
  31.         relation_brand_owner_id      varchar(32),
  32.         PRIMARY KEY (id) USING BTREE
  33. );
  34. CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (
  35.         id                      varchar(32),
  36.         show_profit_id          varchar(32),
  37.         order_no                varchar(32),
  38.         from_user_id            varchar(32),
  39.         from_user_type          int(11),
  40.         user_id                 varchar(32),
  41.         user_type               int(11),
  42.         amount                  int(11),
  43.         profit_time             datetime,
  44.         state                   int(11),
  45.         acct_circle             varchar(32),
  46.         biz_type                int(11),
  47.         contribute_user_id      varchar(32),
  48.         relation_brand_owner_id varchar(32),
  49.         remark                  varchar(32),
  50.         add_time                datetime,
  51.         PRIMARY KEY (id) USING BTREE
  52.         );
复制代码
2.3 遇到的坑

2.3.1 The MySQL server has a timezone offset (0 seconds ahead of UTC)

  用 JDBC 连接 Mysql 的时间报错:The MySQL server has a timezone offset (0 seconds ahead of UTC)
  缘故原由:从错误即可知道是时区的错误。
  1. show variables like '%time_zone%';
  2. Variable_name   |Value |
  3. ----------------+------+
  4. time_zone       |SYSTEM|
  5. // 或者下面这条命令
  6. SELECT @@global.time_zone;
复制代码
  办理:利用 root 用户登录 mysql,再执行 set global time_zone='+8:00' 命令。
  注意:一开始改成了 SET GLOBAL time_zone = 'Asia/Shanghai',但并不好使。
2.3.2 自动转换datetime为时间戳题目

  Apache Flink 是一种流处理框架,它可以读取 MySQL binlog 日志,并将其转换为流数据。然而,由于Flink内部采取的时间戳格式与 MySQL 的 datetime 格式差别,所以在抓取 binlog 时,须要进行一定的转换才华正确地剖析数据。
  自界说类:DateTimeConverter 实现 CustomConverter 接口,重写对应方法对 mysql 的时间类型进行尺度转换
  1. import io.debezium.spi.converter.CustomConverter;
  2. import io.debezium.spi.converter.RelationalColumn;
  3. import org.apache.kafka.connect.data.SchemaBuilder;
  4. import java.time.*;
  5. import java.time.format.DateTimeFormatter;
  6. import java.util.Properties;
  7. /**
  8. * @BelongsProject:
  9. * @BelongsPackage:
  10. * @Author:
  11. * @CreateTime:
  12. * @Description: TODO 实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
  13. * @Version: 1.0
  14. */
  15. public class DateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn>{
  16.     private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
  17.     private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
  18.     private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
  19.     private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
  20.     private ZoneId timestampZoneId = ZoneId.systemDefault();
  21.     @Override
  22.     public void configure(Properties props) {
  23.     }
  24.     @Override
  25.     public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
  26.         String sqlType = column.typeName().toUpperCase();
  27.         SchemaBuilder schemaBuilder = null;
  28.         Converter converter = null;
  29.         if ("DATE".equals(sqlType)) {
  30.             schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
  31.             converter = this::convertDate;
  32.         }
  33.         if ("TIME".equals(sqlType)) {
  34.             schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
  35.             converter = this::convertTime;
  36.         }
  37.         if ("DATETIME".equals(sqlType)) {
  38.             schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
  39.             converter = this::convertDateTime;
  40.         }
  41.         if ("TIMESTAMP".equals(sqlType)) {
  42.             schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
  43.             converter = this::convertTimestamp;
  44.         }
  45.         if (schemaBuilder != null) {
  46.             registration.register(schemaBuilder, converter);
  47.         }
  48.     }
  49.    
  50.     private String convertDate(Object input) {
  51.         if (input == null) return null;
  52.         if (input instanceof LocalDate) {
  53.             return dateFormatter.format((LocalDate) input);
  54.         }
  55.         
  56.         if (input instanceof Integer) {
  57.             LocalDate date = LocalDate.ofEpochDay((Integer) input);
  58.             return dateFormatter.format(date);
  59.         }
  60.         return String.valueOf(input);
  61.     }
  62.     private String convertTime(Object input) {
  63.         if (input == null) return null;
  64.         if (input instanceof Duration) {
  65.             Duration duration = (Duration) input;
  66.             long seconds = duration.getSeconds();
  67.             int nano = duration.getNano();
  68.             LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
  69.             return timeFormatter.format(time);
  70.         }
  71.         return String.valueOf(input);
  72.     }
  73.     private String convertDateTime(Object input) {
  74.         if (input == null) return null;
  75.         if (input instanceof LocalDateTime) {
  76.             return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
  77.         }
  78.         return String.valueOf(input);
  79.     }
  80.     private String convertTimestamp(Object input) {
  81.         if (input == null) return null;
  82.         if (input instanceof ZonedDateTime) {
  83.             // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
  84.             ZonedDateTime zonedDateTime = (ZonedDateTime) input;
  85.             LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
  86.             return timestampFormatter.format(localDateTime).replaceAll("T", " ");
  87.         }
  88.         return String.valueOf(input);
  89.     }
  90. }
复制代码
  引入:
  1. public class FlinkSourceUtil {
  2.     public static MySqlSource<JSONObject> getMySqlSource(ParameterTool parameterTool,List<String> databases, List<String> tables){
  3.         Properties props = new Properties();
  4.         props.setProperty("useSSL", "false");
  5.         props.setProperty("allowPublicKeyRetrieval", "true");
  6.         Properties debeziumProperties = new Properties();
  7.         debeziumProperties.setProperty("converters", "dateConverters");
  8.         debeziumProperties.setProperty("dateConverters.type", "com.xxx.util.DateTimeConverter");
  9.         debeziumProperties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
  10.         debeziumProperties.setProperty("dateConverters.format.time", "HH:mm:ss");
  11.         debeziumProperties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
  12.         debeziumProperties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
  13.         debeziumProperties.setProperty("dateConverters.format.timestamp.zone", "UTC+8");
  14.         
  15.         String[] databaseArray = databases.toArray(new String[0]);
  16.         String[] tableArray = tables.toArray(new String[0]);
  17.         MySqlSource<JSONObject> mySqlSource = MySqlSource.<JSONObject>builder()
  18.                 .hostname(parameterTool.get(Constant.MYSQl_SOURCE_HOST_NAME))
  19.                 .port(Integer.parseInt(parameterTool.get(Constant.MYSQl_SOURCE_PORT)))
  20.                 .databaseList(databaseArray)
  21.                 .tableList(tableArray)
  22.                 .username(parameterTool.get(Constant.MYSQL_SOURCE_USER_NAME))
  23.                 .password(parameterTool.get(Constant.MYSQL_SOURCE_PASSWORD))
  24.                 .deserializer(new MyDebeziumDeserializationSchema())
  25.                 .debeziumProperties(debeziumProperties)
  26.                 .startupOptions(StartupOptions.initial())
  27.                 .serverId(parameterTool.get(Constant.SERVER_ID))
  28.                 .jdbcProperties(props)
  29.                 .build();
  30.         return mySqlSource;
  31.     }
  32. }
复制代码
参考:
关于flinkCDC监听MySQL binlog时自动转换datetime为时间戳题目
FlinkCDC时间题目timestamp等
flink-core抓mysql-binlog,字段datetime会自动转换成时间戳,怎么办理?
2.3.3 重起程序会全量读取 binlog,我想要的是增量

  其实 MySqlSourceBuilder 是有一个方法特意指定 startUP mode 的,改造代码
  1.         MySqlSourceBuilder<String> mySqlSource = new MySqlSourceBuilder<>();
  2.         mySqlSource.startupOptions(StartupOptions.latest());
  3.         mySqlSource
  4.                 .hostname(MYSQL_HOST)
  5.                 .port(MYSQL_PORT)
  6.                 .databaseList(SYNC_DB) // set captured database
  7.                 .tableList(SYNC_TABLES) // set captured table
  8.                 .username(MYSQL_USER)
  9.                 .password(MYSQL_PASSWD)
  10.                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  11.                 .debeziumProperties(debeziumProperties)
  12.                 .build();
  13.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.         env.setParallelism(1);
  15.         env.enableCheckpointing(5000);
  16.         DataStreamSource<String> cdcSource = env.fromSource(mySqlSource.build(), WatermarkStrategy.noWatermarks(), "CDC Source" + "flinkcdc-kafka");
复制代码
参考:Flink cdc怎样只进行增量同步,差别步汗青数据(只读取binlog)
2.4 测试

  Idea 启动程序后,在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。
参考:
【博学谷学习记录】超强总结,专心分享|大数据之flinkCDC
一次打通FlinkCDC同步Mysql数据
三、番外

  用 Flink CDC 可以监控 Mysql,但无法监控 StarRocks,和官方扣问过,目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操纵的 bin-log 功能,所以暂时还无法用 Flink CDC 监控 StarRocks。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

诗林

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表