大数据-业务数据采集-FlinkCDC

打印 上一主题 下一主题

主题 513|帖子 513|积分 1539

CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
基于查询的 CDC基于 Binlog 的 CDC开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获所有数据变化否是延迟性高延迟低延迟是否增加数据库压力是否FlinkCDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取【全量数据】和【增量变更数据】的 source 组件。而不需要使用类似 Kafka 之类的中间件中转数据
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors


ConnectorDatabaseDrivermongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.1mysql-cdcMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1JDBC Driver: 8.0.27oceanbase-cdcOceanBase CE: 3.1.x
OceanBase EE (MySQL mode): 2.x, 3.xJDBC Driver: 5.1.4xoracle-cdcOracle: 11, 12, 19Oracle Driver: 19.3.0.0postgres-cdcPostgreSQL: 9.6, 10, 11, 12JDBC Driver: 42.2.12sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 7.2.2.jre8tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27db2-cdcDb2: 11.5DB2 Driver: 11.5.0.0DataStream:

  • 优点: 多库多表
  • 缺点: 需要自定义反序列化器(但灵活)
    FlinkSQL:
  • 优点: 不需要自定义反序列化器
  • 缺点: 单表
Demo

注意开启 binlog_format=ROW
my.ini
  1. log-bin=mysql-bin
  2. #binlog_format="STATEMENT"
  3. binlog_format="ROW"
  4. #binlog_format="MIXED"
  5. #service-id=1
复制代码

POM
  1.   <dependencies>
  2.         <dependency>
  3.             <groupId>org.apache.flink</groupId>
  4.             <artifactId>flink-java</artifactId>
  5.             <version>1.12.0</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>org.apache.flink</groupId>
  9.             <artifactId>flink-streaming-java_2.12</artifactId>
  10.             <version>1.12.0</version>
  11.         </dependency>
  12.         <dependency>
  13.             <groupId>org.apache.flink</groupId>
  14.             <artifactId>flink-clients_2.12</artifactId>
  15.             <version>1.12.0</version>
  16.         </dependency>
  17.         <dependency>
  18.             <groupId>org.apache.hadoop</groupId>
  19.             <artifactId>hadoop-client</artifactId>
  20.             <version>3.1.3</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>mysql</groupId>
  24.             <artifactId>mysql-connector-java</artifactId>
  25.             <version>5.1.49</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>com.alibaba.ververica</groupId>
  29.             <artifactId>flink-connector-mysql-cdc</artifactId>
  30.             <version>1.2.0</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>com.alibaba</groupId>
  34.             <artifactId>fastjson</artifactId>
  35.             <version>1.2.75</version>
  36.         </dependency>
  37.         <dependency>
  38.             <groupId>org.apache.flink</groupId>
  39.             <artifactId>flink-table-planner-blink_2.12</artifactId>
  40.             <version>1.12.0</version>
  41.         </dependency>
  42.     </dependencies>
复制代码
基于 DataStream

CustomerDeserialization.java
  1. package com.vipsoft;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. import java.util.List;
  13. public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
  14.     /**
  15.      * 封装的数据格式
  16.      * {
  17.      * "database":"",
  18.      * "tableName":"",
  19.      * "before":{"id":"","tm_name":""....},
  20.      * "after":{"id":"","tm_name":""....},
  21.      * "type":"c u d",
  22.      * //"ts":156456135615
  23.      * }
  24.      */
  25.     @Override
  26.     public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  27.         //1.创建JSON对象用于存储最终数据
  28.         JSONObject result = new JSONObject();
  29.         //2.获取库名&表名
  30.         String topic = sourceRecord.topic();
  31.         String[] fields = topic.split("\\.");
  32.         String database = fields[1];
  33.         String tableName = fields[2];
  34.         Struct value = (Struct) sourceRecord.value();
  35.         //3.获取"before"数据
  36.         Struct before = value.getStruct("before");
  37.         JSONObject beforeJson = new JSONObject();
  38.         if (before != null) {
  39.             Schema beforeSchema = before.schema();
  40.             List<Field> beforeFields = beforeSchema.fields();
  41.             for (Field field : beforeFields) {
  42.                 Object beforeValue = before.get(field);
  43.                 beforeJson.put(field.name(), beforeValue);
  44.             }
  45.         }
  46.         //4.获取"after"数据
  47.         Struct after = value.getStruct("after");
  48.         JSONObject afterJson = new JSONObject();
  49.         if (after != null) {
  50.             Schema afterSchema = after.schema();
  51.             List<Field> afterFields = afterSchema.fields();
  52.             for (Field field : afterFields) {
  53.                 Object afterValue = after.get(field);
  54.                 afterJson.put(field.name(), afterValue);
  55.             }
  56.         }
  57.         //5.获取操作类型  CREATE UPDATE DELETE
  58.         Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  59.         String type = operation.toString().toLowerCase();
  60.         if ("create".equals(type)) {
  61.             type = "insert";
  62.         }
  63.         //6.将字段写入JSON对象
  64.         result.put("database", database);
  65.         result.put("tableName", tableName);
  66.         result.put("before", beforeJson);
  67.         result.put("after", afterJson);
  68.         result.put("type", type);
  69.         //7.输出数据
  70.         collector.collect(result.toJSONString());
  71.     }
  72.     @Override
  73.     public TypeInformation<String> getProducedType() {
  74.         return BasicTypeInfo.STRING_TYPE_INFO;
  75.     }
  76. }
复制代码
FlinkCDC.java
  1. package com.vipsoft;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. public class FlinkCDC {
  9.     public static void main(String[] args) throws Exception {
  10.         //1.获取执行环境
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         env.setParallelism(1);
  13.         //2.通过FlinkCDC构建SourceFunction并读取数据
  14.         DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  15.                 .hostname("localhost")
  16.                 .serverTimeZone("GMT+8")  //时区报错增加这个设置
  17.                 .port(3306)
  18.                 .username("root")
  19.                 .password("110")
  20.                 .databaseList("springboot")
  21.                 .tableList("springboot.sys_user")   //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  22.                 //.deserializer(new StringDebeziumDeserializationSchema())
  23.                 .deserializer(new CustomerDeserialization()) //使用自定义反序列化器
  24.                 .startupOptions(StartupOptions.initial())
  25.                 .build();
  26.         DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  27.         //3.打印数据
  28.         streamSource.print();
  29.         //4.启动任务
  30.         env.execute("FlinkCDC");
  31.     }
  32. }
复制代码
运行效果


  • 默认 StringDebeziumDeserializationSchema

  • 自定义反序列化器

FlinkSQL
  1. package com.vipsoft;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.types.Row;
  8. public class FlinkCDCWithSQL {
  9.     public static void main(String[] args) throws Exception {
  10.         //1.获取执行环境
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         env.setParallelism(1);
  13.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14.         //2.DDL方式建表
  15.         tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
  16.                 " id STRING NOT NULL, " +
  17.                 " username STRING, " +
  18.                 " nick_name STRING " +
  19.                 ") WITH ( " +
  20.                 " 'connector' = 'mysql-cdc', " +
  21.                 " 'hostname' = 'localhost', " +
  22.                 " 'port' = '3306', " +
  23.                 " 'username' = 'root', " +
  24.                 " 'password' = '110', " +
  25.                 " 'database-name' = 'springboot', " +
  26.                 " 'table-name' = 'sys_user' " +
  27.                 ")");
  28.         //3.查询数据
  29.         Table table = tableEnv.sqlQuery("select * from mysql_binlog");
  30.         //4.将动态表转换为流
  31.         DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
  32.         retractStream.print();
  33.         //5.启动任务
  34.         env.execute("FlinkCDCWithSQL");
  35.     }
  36. }
复制代码
运行效果


对比

通过对比,FlinkCDC 最舒服
FlinkCDCMaxwellCanal断点续传CKMySQL本地磁盘SQL -> 数据无无一对一(炸开处理)初始化功能有(多库多表)有(单表)无(单独查询历史数据)封装格式自定义JSONJSON(c/s自定义)高可用运行集群高可用无集群(ZK)插入对比

插入两条数据
  1. INSER INTO z_user_info VALUES(30,'zhang3','13800000000'),(31,'li4','13999999999')
复制代码

FlinkCDC 每条变化都会产生一条 json

Maxwell 每条变化都会产生一条 json

Canal 一次性执行的SQL,会产生一条JSON(两条数据组合在一起)【不方便,需要炸开解析】

更新对比
  1. UPDATE z_user_info SET user_name='wang5' WHERE id IN(30,31)
复制代码
FlinkCDC 包括了修改前的 before 数据

Maxwell 不包括修改前的数据

Canal 仍然是一条json

删除对比
  1. DELETE FROM z_user_info WHERE id IN(30,31)
复制代码
FlinkCDC 两条删除的 json 数据

Maxwell

Canal

【尚硅谷】Flink数据仓库视频教程

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表