MySQL Connector #
MySQL CDC Pipeline 连接器答应从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文形貌了怎样设置 MySQL CDC Pipeline 连接器。
依赖设置 #
由于 MySQL Connector 接纳的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 您大概必要手动设置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 --jar 参数将其传入:
依赖名称说明mysql:mysql-connector-java:8.0.27用于连接到 MySQL 数据库。 示例 #
从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
- source:
- type: mysql
- name: MySQL Source
- hostname: 127.0.0.1
- port: 3306
- username: admin
- password: pass
- tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
- server-id: 5401-5404
- sink:
- type: doris
- name: Doris Sink
- fenodes: 127.0.0.1:8030
- username: root
- password: pass
- pipeline:
- name: MySQL to Doris Pipeline
- parallelism: 4
复制代码 连接器设置项 #
OptionRequiredDefaultTypeDescriptionhostnamerequired(none)StringMySQL 数据库服务器的 IP 地点或主机名。portoptional3306IntegerMySQL 数据库服务器的整数端标语。usernamerequired(none)String连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。passwordrequired(none)String连接 MySQL 数据库服务器时使用的密码。tablesrequired(none)String必要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。 必要注意的是,点号(.)被视为数据库和表名的分隔符。 如果必要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。 例如,db0.., db1.user_table_[0-9]+, db[1-2].[app|web]order_.tables.excludeoptional(none)String必要扫除的 MySQL 数据库的表名,参数会在tables参数后发生扫除作用。表名支持正则表达式,以扫除满足正则表达式的多个表。 用法和tables参数雷同schema-change.enabledoptionaltrueBoolean是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。server-idoptional(none)String读取数据使用的 server id,server id 可以是个整数或者一个整数范围,好比 ‘5400’ 或 ‘5400-5408’, 发起在 ‘scan.incremental.snapshot.enabled’ 参数为启用时,设置成整数范围。因为在当前 MySQL 集群中运行的全部 slave 节点,标志每个 salve 节点的 id 都必须是唯一的。 所以当连接器到场 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的环境下),它就可以读取 binlog。 默认环境下,连接器会在 5400 和 6400 之间天生一个随机数,但是我们发起用户明确指定 Server id。scan.incremental.snapshot.chunk.sizeoptional8096Integer表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。scan.snapshot.fetch.sizeoptional1024Integer读取表快照时每次读取数据的最大条数。scan.startup.modeoptionalinitialStringMySQL CDC 消费者可选的启动模式, 正当的模式为 “initial”,“earliest-offset”,“latest-offset”,“specific-offset”,“timestamp” 和 "“snapshot”。scan.startup.specific-offset.fileoptional(none)String在 “specific-offset” 启动模式下,启动位点的 binlog 文件名。scan.startup.specific-offset.posoptional(none)Long在 “specific-offset” 启动模式下,启动位点的 binlog 文件位置。scan.startup.specific-offset.gtid-setoptional(none)String在 “specific-offset” 启动模式下,启动位点的 GTID 集合。scan.startup.timestamp-millisoptional(none)Long在 “timestamp” 启动模式下,启动位点的毫秒时间戳。scan.startup.specific-offset.skip-eventsoptional(none)Long在指定的启动位点后必要跳过的事件数目。scan.startup.specific-offset.skip-rowsoptional(none)Long在指定的启动位点后必要跳过的数据行数目。connect.timeoutoptional30sDuration连接器在尝试连接到 MySQL 数据库服务器后超时前应等候的最长时间。该时长不能少于250毫秒。connect.max-retriesoptional3Integer连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。connection.pool.sizeoptional20Integer连接池大小。jdbc.properties.*optional20String通报自定义 JDBC URL 属性的选项。用户可以通报自定义属性,如 ‘jdbc.properties.useSSL’ = ‘false’.heartbeat.intervaloptional30sDuration用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。debezium.*optional(none)String将 Debezium 的属性通报给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 例如: 'debezium.snapshot.mode' = 'never'. 检察更多关于 Debezium 的 MySQL 连接器属性scan.incremental.close-idle-reader.enabledoptionalfalseBoolean是否在快照竣事后关闭空闲的 Reader。 此特性必要 flink 版本大于等于 1.14 并且 ‘execution.checkpointing.checkpoints-after-tasks-finish.enabled’ 必要设置为 true。 若 flink 版本大于等于 1.15,‘execution.checkpointing.checkpoints-after-tasks-finish.enabled’ 默认值变更为 true,可以不消显式设置 ‘execution.checkpointing.checkpoints-after-tasks-finish.enabled’ = true。scan.newly-added-table.enabledoptionalfalseBoolean是否启用动态加表特性,默认关闭。 此设置项只有作业从savepoint/checkpoint启动时才生效。 启动模式 #
设置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包罗:
- initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
- earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
- latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的末了处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
- snapshot: 只进行快照阶段,跳过增量阶段,快照阶段读取竣事后退出。
例如,可以在 YAML 设置文件中如许指定启动模式:
- source:
- type: mysql
- scan.startup.mode: earliest-offset # Start from earliest offset
- scan.startup.mode: latest-offset # Start from latest offset
- scan.startup.mode: specific-offset # Start from specific offset
- scan.startup.mode: timestamp # Start from timestamp
- scan.startup.mode: snapshot # Read snapshot only
- scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
- scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
- scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
- scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
- # ...
复制代码 数据类型映射 #
MySQL typeCDC typeNOTETINYINT(n)TINYINTSMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILLSMALLINTINT YEAR MEDIUMINT MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILLINTBIGINT INT UNSIGNED INT UNSIGNED ZEROFILLBIGINTBIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIALDECIMAL(20, 0)FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILLFLOATREAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILLDOUBLENUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38DECIMAL(p, s)NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65STRING在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以制止精度损失。BOOLEAN TINYINT(1) BIT(1)BOOLEANDATEDATETIME [§]TIME [§]TIMESTAMP [§]TIMESTAMP_LTZ [§]DATETIME [§]TIMESTAMP [§]CHAR(n)CHAR(n)VARCHAR(n)VARCHAR(n)BIT(n)BINARY(⌈(n + 7) / 8⌉)BINARY(n)BINARY(n)VARBINARY(N)VARBINARY(N)TINYTEXT TEXT MEDIUMTEXT LONGTEXTSTRINGTINYBLOB BLOB MEDIUMBLOB LONGBLOBBYTES现在,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。ENUMSTRINGJSONSTRINGJSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。SET-暂不支持GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTIONSTRINGMySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。 空间数据类型映射 #
MySQL中除GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
- {"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
复制代码 字段srid标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。 由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid将始终为 0。
字段type标识空间数据类型,例如POINT/LINESTRING/POLYGON。
字段coordinates表示空间数据的坐标。
对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:
- {"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
复制代码 Geometrics字段是一个包罗全部空间数据的数组。
差别空间数据类型映射的示例如下:
Spatial data in MySQLJson String converted in FlinkPOINT(1 1){“coordinates”:[1,1],“type”:“Point”,“srid”:0}LINESTRING(3 0, 3 3, 3 5){“coordinates”:[[3,0],[3,3],[3,5]],“type”:“LineString”,“srid”:0}POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)){“coordinates”:[[[1,1],[2,1],[2,2],[1,2],[1,1]]],“type”:“Polygon”,“srid”:0}MULTIPOINT((1 1),(2 2)){“coordinates”:[[1,1],[2,2]],“type”:“MultiPoint”,“srid”:0}MultiLineString((1 1,2 2,3 3),(4 4,5 5)){“coordinates”:[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],“type”:“MultiLineString”,“srid”:0}MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))){“coordinates”:[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],“type”:“MultiPolygon”,“srid”:0}GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)){“geometries”:[{“type”:“Point”,“coordinates”:[10,10]},{“type”:“Point”,“coordinates”:[30,30]},{“type”:“LineString”,“coordinates”:[[15,15],[20,20]]}],“type”:“GeometryCollection”,“srid”:0} links:
MySQL | Apache Flink CDC
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |