flinkcdc学习 (一)data source数据源

打印 上一主题 下一主题

主题 1725|帖子 1725|积分 5175

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
data source: 数据源连接

两种连接方式:Pipeline 连接器 和  Flink Source 连接器
1、Pipeline 连接器: 

Pipeline 连接器只支持mysql连接数据源 
   仅支持以下mysql版本 :


  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • PolarDB MySQL: 5.6, 5.7, 8.0.x
  • Aurora MySQL: 5.6, 5.7, 8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1
2、连接参数

 1.基本连接参数:


2. 数据读取控制参数

这些参数用于控制数据从数据库的读取行为,确保 Flink CDC 的运行服从和行为符合预期。
tables.excludeoptional(none)String必要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。
用法和tables参数相同
schema-change.enabledoptionaltrueBoolean是否发送模式更改事件,下游 sink 可以相应模式变动事件实现表布局同步,默认为true。
server-idoptional(none)String读取数据利用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 发起在 'scan.incremental.snapshot.enabled' 参数为启用时,设置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器参加 MySQL 集群作为另一个 slave 节点(而且具有唯一 id 的环境下),它就可以读取 binlog。 默认环境下,连接器会在 5400 和 6400 之间天生一个随机数,但是我们发起用户明确指定 Server id。
scan.incremental.snapshot.chunk.sizeoptional8096Integer表快照的块大小(行数),读取表的快照时,捕捉的表被拆分为多个块。
scan.snapshot.fetch.sizeoptional1024Integer读取表快照时每次读取数据的最大条数。
scan.startup.modeoptionalinitialStringMySQL CDC 消费者可选的启动模式, 正当的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 ""snapshot"。
scan.startup.specific-offset.fileoptional(none)String在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。
scan.startup.specific-offset.posoptional(none)Long在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。
scan.startup.specific-offset.gtid-setoptional(none)String在 "specific-offset" 启动模式下,启动位点的 GTID 聚集。
scan.startup.timestamp-millisoptional(none)Long在 "timestamp" 启动模式下,启动位点的毫秒时间戳。
scan.startup.specific-offset.skip-eventsoptional(none)Long在指定的启动位点后必要跳过的事件数目。
scan.startup.specific-offset.skip-rowsoptional(none)Long在指定的启动位点后必要跳过的数据行数目。
启动模式 #

设置选项scan.startup.mode指定 MySQL CDC 利用者的启动模式。有效枚举包罗:


  • initial (默认):在第一次启动时对受监视的数据库表实行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  • latest-offset:首次启动时,从不对受监视的数据库表实行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 聚集指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  • snapshot: 只举行快照阶段,跳过增量阶段,快照阶段读取结束退却出。
比方,可以在 YAML 设置文件中这样指定启动模式:
  1. source:
  2.   type: mysql
  3.   scan.startup.mode: earliest-offset                    # Start from earliest offset
  4.   scan.startup.mode: latest-offset                      # Start from latest offset
  5.   scan.startup.mode: specific-offset                    # Start from specific offset
  6.   scan.startup.mode: timestamp                          # Start from timestamp
  7.   scan.startup.mode: snapshot                          # Read snapshot only
  8.   scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
  9.   scan.startup.specific-offset.pos: 4                   # Binlog position under specific offset mode
  10.   scan.startup.specific-offset.gtid-set: 24DA167-...    # GTID set under specific offset startup mode
  11.   scan.startup.timestamp-millis: 1667232000000          # Timestamp under timestamp startup mode
复制代码
3. 数据库设置相干参数

这些参数主要用于适配数据库的设置,比如时区、并行度等。
connect.timeoutoptional30sDuration连接器在实验连接到 MySQL 数据库服务器后超时前应等候的最长时间。该时长不能少于250毫秒。
connect.max-retriesoptional3Integer连接器应重试以创建 MySQL 数据库服务器连接的最大重试次数。
connection.pool.sizeoptional20Integer连接池大小。
jdbc.properties.*optional20String传递自界说 JDBC URL 属性的选项。用户可以传递自界说属性,如 'jdbc.properties.useSSL' = 'false'.
heartbeat.intervaloptional30sDuration用于跟踪最新可用 binlog 偏移的发送心跳事件的隔断。
debezium.*optional(none)String将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕捉数据更改。 比方: 'debezium.snapshot.mode' = 'never'. 查察更多关于 Debezium 的 MySQL 连接器属性
scan.incremental.close-idle-reader.enabledoptionalfalseBoolean是否在快照结束后关闭空闲的 Reader。 此特性必要 flink 版本大于即是 1.14 而且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 必要设置为 true。
若 flink 版本大于即是 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变动为 true,可以不用显式设置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。
scan.newly-added-table.enabledoptionalfalseBoolean是否启用动态加表特性,默认关闭。 此设置项只有作业从savepoint/checkpoint启动时才生效。
  1. jdbc.properties.connectionTimeout: 30000, -- 设置连接超时为 30 秒
  2. jdbc.properties.useSSL: true, -- 启用 SSL
  3. jdbc.properties.characterEncoding: UTF-8
复制代码
常见的 debezium.* 参数

以下是一些常见的 debezium.* 设置参数及其作用:

  • debezium.snapshot.mode

    • 用于指定快照模式。
    • 可选值包罗:

      • initial: 举行初始快照。
      • schema_only: 仅获取架构,不举行数据快照。
      • never: 不举行快照,只捕捉增量数据。


  • debezium.include.schema.changes

    • 指定是否包罗模式变动事件。
    • 设置为 true 时,捕捉模式变动(如添加列、修改列类型等)。

  • debezium.database.history.file.filename

    • 指定 Debezium 数据库汗青文件的位置,以便在重启时规复状态。

  • debezium.transforms

    • 用于设置 Debezium 的转换功能,可以对捕捉的数据举行格式化或修改。

  • debezium.key.converter 和 debezium.value.converter

    • 指定用于序列化和反序列化消息键和值的转换器,比方可以利用 JSON 或 Avro 格式。

  1. debezium.snapshot.mode: initial, -- 设置快照模式
  2. debezium.include.schema.changes: true, -- 包括模式变更 debezium.database.history.file.filename: /path/to/history/file -- 历史文件路径
复制代码
4.数据映射
MySQL typeCDC typeNOTETINYINT(n)TINYINTSMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILLSMALLINTINT
YEAR
MEDIUMINT
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILLINTBIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILLBIGINTBIGINT UNSIGNED
BIGINT UNSIGNED ZEROFILL
SERIALDECIMAL(20, 0)FLOAT
FLOAT UNSIGNED
FLOAT UNSIGNED ZEROFILLFLOATREAL
REAL UNSIGNED
REAL UNSIGNED ZEROFILL
DOUBLE
DOUBLE UNSIGNED
DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
DOUBLE PRECISION UNSIGNED ZEROFILL
FLOAT(p, s)
REAL(p, s)
DOUBLE(p, s)DOUBLENUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where p <= 38DECIMAL(p, s)NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where 38 < p <= 65STRING在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果界说精度大于 38 的十进制列,则应将其映射到字符串以制止精度损失。BOOLEAN
TINYINT(1)
BIT(1)BOOLEANDATEDATETIME [(p)]TIME [(p)]TIMESTAMP [(p)]TIMESTAMP_LTZ [(p)]DATETIME [(p)]TIMESTAMP [(p)]CHAR(n)CHAR(n)VARCHAR(n)VARCHAR(n)BIT(n)BINARY(⌈(n + 7) / 8⌉)BINARY(n)BINARY(n)VARBINARY(N)VARBINARY(N)TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXTSTRINGTINYBLOB
BLOB
MEDIUMBLOB
LONGBLOBBYTES目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。ENUMSTRINGJSONSTRINGJSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。SET-暂不支持GEOMETRY
POINT
LINESTRING
POLYGON
MULTIPOINT
MULTILINESTRING
MULTIPOLYGON
GEOMETRYCOLLECTIONSTRINGMySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。 空间数据类型映射 

MySQL中除GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
  1. {"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
复制代码
字段srid标识界说几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。 由于 MySQL 8+ 在界说空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid将始终为 0。
字段type标识空间数据类型,比方POINT/LINESTRING/POLYGON。
字段coordinates表现空间数据的坐标。
对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:
  1. {"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
复制代码
Geometrics字段是一个包罗所有空间数据的数组。
差别空间数据类型映射的示比方下:
Spatial data in MySQLJson String converted in FlinkPOINT(1 1){"coordinates":[1,1],"type":"oint","srid":0}LINESTRING(3 0, 3 3, 3 5){"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)){"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"olygon","srid":0}MULTIPOINT((1 1),(2 2)){"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}MultiLineString((1 1,2 2,3 3),(4 4,5 5)){"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))){"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)){"geometries":[{"type":"oint","coordinates":[10,10]},{"type":"oint","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊落一身雪

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表