ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink1.18 同步 MySQL 到 Doris [打印本页]

作者: 种地    时间: 2024-9-17 11:26
标题: Flink1.18 同步 MySQL 到 Doris
一、前言        

        利用Apache Flink实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行须要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示怎样利用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个Doris数据库的ETL过程。
Flink官网:Apache Flink CDC | Apache Flink CDC
开启 Mysql Binlog
修改我们的设置文件 my.cnf,增长:
  1. server_id=1
  2. log_bin=mysql-bin
  3. binlog_format=ROW
  4. expire_logs_days=30
复制代码
  重启 mysql 
  1. # 判断MySQL是否已经开启binlog   on  为打开状态
  2. SHOW VARIABLES LIKE 'log_bin';   
  3. # 查看MySQL的binlog模式
  4. show global variables like "binlog%";
  5. # 查看日志开启状态
  6. show variables like 'log_%';
  7. # 刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果
  8. flush logs;
  9. # 清空所有binlog日志
  10. reset master;
复制代码
二、预备 Flink Standalone 集群

1、下载 Flink 1.18.0,解压后得到 flink-1.18.0 目录。 利用下面的下令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
  1. tar -xzf flink-*.tgz
  2. cd flink-1.18.0
  3. export FLINK_HOME=/usr/local/flink-1.18.0
复制代码
2、通过在 conf/flink-conf.yaml 设置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
  1. execution.checkpointing.interval: 3000
复制代码
 3、利用下面的下令启动 Flink 集群。
  1. cd /usr/local/flink-1.18.0
  2. ./bin/start-cluster.sh
复制代码
Flink如今作为背景历程运行。您可以利用以下下令检查其状态:
  1. ps aux | grep flink
复制代码
 启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,如下所示:

多次实行 start-cluster.sh 可以拉起多个 TaskManager。
   假如想修改端口,可以在 conf/flink-conf.yaml 中修改 rest.port
  要快速停止集群和所有正在运行的组件,您可以利用提供的脚本:
  1. ./bin/stop-cluster.sh
复制代码
三、通过 FlinkCDC cli 提交任务 

支持的连接器:
连接器类型支持的外部系统Apache DorisSink     
MySQLSource     
StarRocksSink     
1、下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.1.0; flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 下会包罗 bin、lib、log、conf 四个目录。
2、下载下面列出的 connector 包,并且移动到 lib 目录下; 下载链接只对已发布的版本有用, SNAPSHOT 版本必要本地基于 master 或 release- 分支编译.

3、编写任务设置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:
  1. ################################################################################
  2. # Description: Sync MySQL all tables to Doris
  3. ################################################################################
  4. source:
  5.   type: mysql
  6.   hostname: localhost
  7.   port: 3306
  8.   username: root
  9.   password: 123456
  10.   tables: app_db.\.*
  11.   server-id: 5400-5404
  12.   server-time-zone: UTC
  13. sink:
  14.   type: doris
  15.   fenodes: 127.0.0.1:8030
  16.   username: root
  17.   password: ""
  18.   table.create.properties.light_schema_change: true
  19.   table.create.properties.replication_num: 1
  20. pipeline:
  21.   name: Sync MySQL Database to Doris
  22.   parallelism: 2
复制代码
其中: source 中的 tables: app_db.\.* 通过正则匹配同步 app_db 下的所有表。 sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。
4、最后,通过下令行提交任务到 Flink Standalone cluster
  1. bash bin/flink-cdc.sh mysql-to-doris.yaml
复制代码
提交成功后,返复书息如:
  1. Pipeline has been submitted to cluster.
  2. Job ID: ae30f4580f1918bebf16752d4963dc54
  3. Job Description: Sync MySQL Database to Doris
复制代码
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris 的任务正在运行。

四、Route the changes 

Flink CDC 提供了将源表的表结构/数据路由到其他表名的设置,借助这种本领,我们能够实现表名库名替换,整库同步等功能。 下面提供一个设置文件说明:
  1. ################################################################################
  2. # Description: Sync MySQL all tables to Doris
  3. ################################################################################
  4. source:
  5.    type: mysql
  6.    hostname: localhost
  7.    port: 3306
  8.    username: root
  9.    password: 123456
  10.    tables: app_db.\.*
  11.    server-id: 5400-5404
  12.    server-time-zone: UTC
  13. sink:
  14.    type: doris
  15.    fenodes: 127.0.0.1:8030
  16.    benodes: 127.0.0.1:8040
  17.    username: root
  18.    password: ""
  19.    table.create.properties.light_schema_change: true
  20.    table.create.properties.replication_num: 1
  21. route:
  22.    - source-table: app_db.orders
  23.      sink-table: ods_db.ods_orders
  24.    - source-table: app_db.shipments
  25.      sink-table: ods_db.ods_shipments
  26.    - source-table: app_db.products
  27.      sink-table: ods_db.ods_products
  28. pipeline:
  29.    name: Sync MySQL Database to Doris
  30.    parallelism: 2
复制代码
通过上面的 route 设置,会将 app_db.orders 表的结构和数据同步到 ods_db.ods_orders 中。从而实现数据库迁移的功能。 特别地,source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,比方下面的设置:
  1. route:
  2.   - source-table: app_db.order\.*
  3.     sink-table: ods_db.ods_orders
复制代码
 这样,就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 ods_db.ods_orders 中。注意,现在还不支持多表中存在相同主键数据的场景,将在后续版本支持。
   以下扩展笔记
  五、通过Flink Sql Client 方式与 Flink 进行交互

支持的连接器:
ConnectorDatabaseDrivermongodb-cdc     
MongoDB Driver: 4.9.1mysql-cdc     
JDBC Driver: 8.0.28oceanbase-cdc     
OceanBase Driver: 2.4.xoracle-cdc     
Oracle Driver: 19.3.0.0postgres-cdc     
JDBC Driver: 42.5.1sqlserver-cdc     
JDBC Driver: 9.4.1.jre8tidb-cdc     
JDBC Driver: 8.0.27db2-cdc     
Db2 Driver: 11.5.0.0vitess-cdc     
MySql JDBC Driver: 8.0.26 下表显示了Flink CDC连接器和Flink 之间的版本映射: 
Flink® CDC VersionFlink® Version1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.*1.13.*2.1.*1.13.*2.2.*1.13.*, 1.14.*2.3.*1.13.*, 1.14.*, 1.15.*, 1.16.*2.4.*1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*3.0.*1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.* 利用下面的下令启动 Flink SQL CLI
  1. #在flink目录下
  2. ./bin/sql-client.sh
复制代码
然后建表语句创建Flink表 ,以下展示将flink_source.source_test表实时同步到flink_sink、flink_sink_second的sink_test表,Mysql同步Mysql。
  1. # 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟
  2. Flink SQL> SET execution.checkpointing.interval = 3s;
  3. [INFO] Session property has been set.
  4. Flink SQL> CREATE TABLE source_test (
  5. >   user_id STRING,
  6. >   user_name STRING,
  7. >   PRIMARY KEY (user_id) NOT ENFORCED
  8. > ) WITH (
  9. >    'connector' = 'mysql-cdc',
  10. >    'hostname' = '192.168.3.31',
  11. >    'port' = '3306',
  12. >    'username' = 'root',
  13. >    'password' = '******',
  14. >    'database-name' = 'flink_source',
  15. >    'table-name' = 'source_test'
  16. > );
  17. [INFO] Execute statement succeed.
  18. Flink SQL> CREATE TABLE sink_test (
  19. >   user_id STRING,
  20. >   user_name STRING,
  21. >   PRIMARY KEY (user_id) NOT ENFORCED
  22. > ) WITH (
  23. >    'connector' = 'jdbc',
  24. >    'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink',
  25. >    'driver' = 'com.mysql.cj.jdbc.Driver',
  26. >    'username' = 'root',
  27. >    'password' = '******',
  28. >    'table-name' = 'sink_test'
  29. > );
  30. [INFO] Execute statement succeed.
  31. Flink SQL> CREATE TABLE sink_test_second (
  32. >   user_id STRING,
  33. >   user_name STRING,
  34. >   PRIMARY KEY (user_id) NOT ENFORCED
  35. > ) WITH (
  36. >    'connector' = 'jdbc',
  37. >    'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink_second',
  38. >    'driver' = 'com.mysql.cj.jdbc.Driver',
  39. >    'username' = 'root',
  40. >    'password' = '******',
  41. >    'table-name' = 'sink_test'
  42. > );
  43. [INFO] Execute statement succeed.
  44. Flink SQL> insert into sink_test select * from source_test;
  45. [INFO] Submitting SQL update statement to the cluster...
  46. [INFO] SQL update statement has been successfully submitted to the cluster:
  47. Job ID: 0c49758cc251699f0b4acd6c9f735e6e
  48. Flink SQL> insert into sink_test_second select * from source_test;
  49. [INFO] Submitting SQL update statement to the cluster...
  50. [INFO] SQL update statement has been successfully submitted to the cluster:
  51. Job ID: ecea685a715d7d40ee1a94aac3236c18
  52. Flink SQL>
复制代码
  注意必要将flink-sql-connector-mysql-cdc-3.1.0.jar放到{flink-1.18.0}/lib/ 下 。
  下载 Flink CDC 相关 Jar 包:Central Repository: com/ververica/flink-sql-connector-mysql-cdc
 假如报如下错误:将flink-connector-jdbc-3.2.0-1.18.jar放到{flink-1.18.0}/lib/ 下 。
  1. [ERROR] Could not execute SQL statement. Reason:
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
  3. Available factory identifiers are:
  4. blackhole
  5. datagen
  6. filesystem
  7. mysql-cdc
  8. print
  9. python-input-format
复制代码
JDBC SQL 连接器:JDBC 连接器(利用地点)允许利用 JDBC 驱动向恣意类型的关系型数据库读取或者写入数据。针对关系型数据库怎样通过创建 JDBC 连接器来实行 SQL 查询。假如在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消耗 UPDATE/DELETE 消息。下载JDBC SQL连接器的依靠包,放到目录 {flink-1.18.0}/lib/ 下,用于利用JDBC SQL连接器连接MySQL的sink库。 
Mysql连接器选项:
OptionRequiredDefaultTypeDescriptionconnectorrequired(none)String指定要利用的连接器, 这里应该是 'mysql-cdc'.hostnamerequired(none)StringMySQL 数据库服务器的 IP 地点或主机名。usernamerequired(none)String连接到 MySQL 数据库服务器时要利用的 MySQL 用户的名称。passwordrequired(none)String连接 MySQL 数据库服务器时利用的密码。database-namerequired(none)String要监视的 MySQL 服务器的数据库名称。数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。table-namerequired(none)String必要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。注意:MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后利用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。portoptional3306IntegerMySQL 数据库服务器的整数端标语。server-idoptional(none)String读取数据利用的 server id,server id 可以是个整数或者一个整数范围,好比 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,设置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 以是当连接器参加 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间天生一个随机数,但是我们建议用户明确指定 Server id。scan.incremental.snapshot.enabledoptionaltrueBoolean增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多长处,包罗: (1)在快照读取期间,Source 支持并发读取, (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint, (3)在快照读取之前,Source 不必要数据库锁权限。 假如希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,以是 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。 请查阅 增量快照读取 章节了解更多详细信息。scan.incremental.snapshot.chunk.sizeoptional8096Integer表快照的块大小(行数),读取表的快照时,捕捉的表被拆分为多个块。scan.snapshot.fetch.sizeoptional1024Integer读取表快照时每次读取数据的最大条数。scan.startup.modeoptionalinitialStringMySQL CDC 消耗者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。scan.startup.specific-offset.fileoptional(none)String在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。scan.startup.specific-offset.posoptional(none)Long在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。scan.startup.specific-offset.gtid-setoptional(none)String在 "specific-offset" 启动模式下,启动位点的 GTID 集合。scan.startup.specific-offset.skip-eventsoptional(none)Long在指定的启动位点后必要跳过的事件数量。scan.startup.specific-offset.skip-rowsoptional(none)Long在指定的启动位点后必要跳过的数据行数量。server-time-zoneoptional(none)String数据库服务器中的会话时区, 比方: "Asia/Shanghai". 它控制 MYSQL 中的时间戳类型怎样转换为字符串。 更多请参考 这里. 假如没有设置,则利用ZoneId.systemDefault()来确定服务器时区。debezium.min.row. count.to.stream.resultoptional1000Integer在快照操作期间,连接器将查询每个包罗的表,以天生该表中所有行的读取事件。 此参数确定 MySQL 连接是否将表的所有用果拉入内存(速率很快,但必要大量内存), 或者效果是否必要流式传输(传输速率可能较慢,但适用于非常大的表)。 该值指定了在连接器对效果进行流式处理之前,表必须包罗的最小行数,默认值为1000。将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有用果进行流式处理。connect.timeoutoptional30sDuration连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。connect.max-retriesoptional3Integer连接器应重试以创建 MySQL 数据库服务器连接的最大重试次数。connection.pool.sizeoptional20Integer连接池大小。jdbc.properties.*optional20String转达自定义 JDBC URL 属性的选项。用户可以转达自定义属性,如 'jdbc.properties.useSSL' = 'false'.heartbeat.intervaloptional30sDuration用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。debezium.*optional(none)String将 Debezium 的属性转达给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕捉数据更改。 For example: 'debezium.snapshot.mode' = 'never'. 查察更多关于 Debezium 的 MySQL 连接器属性scan.incremental.close-idle-reader.enabledoptionalfalseBoolean是否在快照结束后关闭空闲的 Reader。 此特性必要 flink 版本大于即是 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 必要设置为 true。 Jdbc连接器选项:
参数是否必填默认值类型描述 connector

必填(none)String指定利用什么类型的连接器,这里应该是'jdbc'。 url

必填(none)StringJDBC 数据库 url。 table-name

必填(none)String连接到 JDBC 表的名称。 driver

可选(none)String用于连接到此 URL 的 JDBC 驱动类名,假如不设置,将自动从 URL 中推导。 compatible-mode

可选(none)String数据库的兼容模式。 username

可选(none)StringJDBC 用户名。假如指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 password

可选(none)StringJDBC 密码。 connection.max-retry-timeout

可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。 scan.partition.column

可选(none)String用于将输入进行分区的列名。请参阅下面的分区扫描部分了解更多详情。 scan.partition.num

可选(none)Integer分区数。 scan.partition.lower-bound

可选(none)Integer第一个分区的最小值。 scan.partition.upper-bound

可选(none)Integer最后一个分区的最大值。 scan.fetch-size

可选0Integer每次循环读取时应该从数据库中获取的行数。假如指定的值为 '0',则该设置项会被忽略。 scan.auto-commit

可选trueBoolean在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能必要将此设置为 false 以便流化效果。 lookup.cache

可选NONE 枚举类型
可选值: NONE, PARTIAL维表的缓存策略。 现在支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。 lookup.cache.max-rows

可选(none)Integer维表缓存的最大行数,若超过该值,则最老的行记录将会过期。 利用该设置时 "lookup.cache" 必须设置为 "ARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 lookup.partial-cache.expire-after-write

可选(none)Duration在记录写入缓存后该记录的最大保存时间。 利用该设置时 "lookup.cache" 必须设置为 "ARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 lookup.partial-cache.expire-after-access

可选(none)Duration在缓存中的记录被访问后该记录的最大保存时间。 利用该设置时 "lookup.cache" 必须设置为 "ARTIAL”。请参阅下面的 Lookup Cache 部分了解更多详情。 lookup.partial-cache.cache-missing-key

可选trueBoolean是否缓存维表中不存在的键,默认为true。 利用该设置时 "lookup.cache" 必须设置为 "ARTIAL”。 lookup.max-retries

可选3Integer查询数据库失败的最大重试次数。 sink.buffer-flush.max-rows

可选100Integerflush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 sink.buffer-flush.interval

可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并设置适当的 flush 时间间隔。 sink.max-retries

可选3Integer写入记录到数据库失败后的最大重试次数。 sink.parallelism

可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:利用与上游链式算子相同的并行度。 六、利用数据流API

包罗以下Maven依靠项(通过Maven中央库提供):
  1. <dependency>
  2.   <groupId>org.apache.flink</groupId>
  3.   <!-- 添加与你的数据库匹配的依赖 -->
  4.   <artifactId>flink-connector-mysql-cdc</artifactId>
  5.   <!-- 该依赖仅适用于稳定发布,SNAPSHOT依赖需要根据master或release分支自行构建。 -->
  6.   <version>3.1.0</version>
  7. </dependency>
复制代码
  1. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
  4. import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
  5. public class MySqlSourceExample {
  6.   public static void main(String[] args) throws Exception {
  7.     MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  8.             .hostname("yourHostname")
  9.             .port(yourPort)
  10.             .databaseList("yourDatabaseName") // 设置捕获的数据库
  11.             .tableList("yourDatabaseName.yourTableName") // 设置捕获的表
  12.             .username("yourUsername")
  13.             .password("yourPassword")
  14.             .deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串
  15.             .build();
  16.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17.     // 启用检查点
  18.     env.enableCheckpointing(3000);
  19.     env
  20.       .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  21.       // 设置4个并行源任务
  22.       .setParallelism(4)
  23.       .print().setParallelism(1); // 为sink设置并行度为1,以保持消息顺序
  24.     env.execute("Print MySQL Snapshot + Binlog");
  25.   }
  26. }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4