聊聊流式数据湖Paimon(四)

打印 上一主题 下一主题

主题 691|帖子 691|积分 2077

Partial Update

数据打宽

通过不同的流写不同的字段,打宽了数据的维度,填充了数据内容;如下所示:
  1. --FlinkSQL参数设置
  2. set
  3.     `table.dynamic-table-options.enabled` = `true`;
  4. SET
  5.     `env.state.backend` = `rocksdb`;
  6. SET
  7.     `execution.checkpointing.interval` = `60000`;
  8. SET
  9.     `execution.checkpointing.tolerable-failed-checkpoints` = `3`;
  10. SET
  11.     `execution.checkpointing.min-pause` = `60000`;
  12. --创建Paimon catalog
  13. CREATE CATALOG paimon WITH (
  14.     'type' = 'paimon',
  15.     'metastore' = 'hive',
  16.     'uri' = 'thrift://localhost:9083',
  17.     'warehouse' = 'hdfs://paimon',
  18.     'table.type' = 'EXTERNAL'
  19. );
  20. --创建Partial update结果表
  21. CREATE TABLE if not EXISTS paimon.dw.order_detail (
  22.     `order_id` string,
  23.     `product_type` string,
  24.     `plat_name` string,
  25.     `ref_id` bigint,
  26.     `start_city_name` string,
  27.     `end_city_name` string,
  28.     `create_time` timestamp(3),
  29.     `update_time` timestamp(3),
  30.     `dispatch_time` timestamp(3),
  31.     `decision_time` timestamp(3),
  32.     `finish_time` timestamp(3),
  33.     `order_status` int,
  34.     `binlog_time` bigint,
  35.     PRIMARY KEY (order_id) NOT ENFORCED
  36. ) WITH (
  37.     'bucket' = '20',
  38.     -- 指定20个bucket
  39.     'bucket-key' = 'order_id',
  40.     -- 记录排序字段
  41.     'sequence.field' = 'binlog_time',
  42.     -- 选择 full-compaction ,在compaction后产生完整的changelog
  43.     'changelog-producer' = 'full-compaction',
  44.     -- compaction 间隔时间
  45.     'changelog-producer.compaction-interval' = '2 min',
  46.     'merge-engine' = 'partial-update',
  47.     -- 忽略DELETE数据,避免运行报错
  48.     'partial-update.ignore-delete' = 'true'
  49. );
  50. INSERT INTO
  51.     paimon.dw.order_detail
  52. -- order_info表提供主要字段
  53. SELECT
  54.     order_id,
  55.     product_type,
  56.     plat_name,
  57.     ref_id,
  58.     cast(null as string) as start_city_name,
  59.     cast(null as string) as end_city_name,
  60.     create_time,
  61.     update_time,
  62.     dispatch_time,
  63.     decision_time,
  64.     finish_time,
  65.     order_status,
  66.     binlog_time
  67. FROM
  68.     paimon.ods.order_info
  69.     /*+ OPTIONS ('scan.mode'='latest') */
  70. union
  71. all
  72. -- order_address表提供城市字段
  73. SELECT
  74.     order_id,
  75.     cast(null as string) as product_type,
  76.     cast(null as string) as plat_name,
  77.     cast(null as bigint) as ref_id,
  78.     start_city_name,
  79.     end_city_name,
  80.     cast(null as timestamp(3)) as create_time,
  81.     cast(null as timestamp(3)) as update_time,
  82.     cast(null as timestamp(3)) as dispatch_time,
  83.     cast(null as timestamp(3)) as decision_time,
  84.     cast(null as timestamp(3)) as finish_time,
  85.     cast(null as int) as order_status,
  86.     binlog_time
  87. FROM
  88.     paimon.ods.order_address
  89.     /*+ OPTIONS ('scan.mode'='latest') */
  90. ;
复制代码
完整的Changlog

Paimon中的表被多流填充数据且打宽维度后,支持流读、批读的方式提供完整的Changelog给下游。
Sequence-Group

配置:'fields.G.sequence-group'='A,B'
由字段G控制是否更新字段A, B;总得来说,G的值如果为null或比更新值大将不更新A,B;如下单测
  1. public void testSequenceGroup() {
  2.     sql(
  3.             "CREATE TABLE SG ("
  4.                     + "k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED)"
  5.                     + " WITH ("
  6.                     + "'merge-engine'='partial-update', "
  7.                     + "'fields.g_1.sequence-group'='a,b', "
  8.                     + "'fields.g_2.sequence-group'='c,d');");
  9.     sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)");
  10.     // g_2 should not be updated
  11.     sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))");
  12.     // select *
  13.     assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 1, 1, 1));
  14.     // projection
  15.     assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(1, 1));
  16.     // g_1 should not be updated
  17.     sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)");
  18.     assertThat(sql("SELECT * FROM SG")).containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 3, 3, 3));
  19.     // d should be updated by null
  20.     sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))");
  21.     sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))");
  22.     sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)");
  23.     assertThat(sql("SELECT a, b FROM SG")).containsExactlyInAnyOrder(Row.of(4, 4));
  24.     assertThat(sql("SELECT c, d FROM SG")).containsExactlyInAnyOrder(Row.of(5, null));
  25. }
复制代码
其作用是:

  • 在多个数据流更新期间的无序问题。每个数据流都定义自己的序列组。
  • 真正的部分更新,而不仅仅是非空值的更新。
  • 接受删除记录来撤销部分列。
Changelog-Producer

Paimon通过Changelog-Producer支持生成changelog,并支持下游以流读、批读的形式读取changelog。
Changelog的生成有多种方式,input、lookup、full-compaction;其生成代价是由低到高。
None

不查找旧值,不额外写Changelog;但会下游任务中通过ChangelogNormalize算子补足Changelog。
Input

不查找旧值,额外写Changelog;适用与CDC的数据源。
Lookup

查找旧值,额外写Changelog;如果不是CDC数据源,需要通过LookupCompaction查找旧值,即在 compaction 的过程中, 会去向高层查找本次新增 key 的旧值, 如果没有查找到, 那么本次的就是新增 key, 如果有查找到, 那么就生成完整的 UB 和 UA 消息。
Full-Compaction

查找旧值,额外写Changelog;在 full compact 的过程中, 其实数据都会被写到最高层, 所以所有 value 的变化都是可以推演出来的.
数据一致性

数据版本

通过Flink的checkpoint机制,生成Snapshot并标记版本,即,一个Snapshot对应数据的一个版本。
比如 Job-A 基于 Table-A 的 Snapshot-20 产出了 Table-B 的 Snapshot-11。Job-B 基于 Table-A 的Snapshot-20产出了 Table-C 的 Snapshot-15。那么 Job-C 的查询就应该基于 Table-B 的 Snapshot-11 和 Table-C 的 Snapshot-15 进行计算,明确了数据版本,从而实现计算的一致性。

生成的snapshot-xx,就是数据的版本号。
数据对齐

将 Checkpoint 插入到两个 Snapshot 的数据之间。如果当前的 Snapshot 还没有完全被消费,这个 Checkpoint 的触发会被推迟,从而实现按照 Snapshot 对数据进行划分和对齐。

实现分为两个部分。

  • 在提交阶段,需要去血缘关系表中查询上下游表的一致性版本,并且基于查询结果给对应的上游表设置起始的消费位置。
  • 在运行阶段,按照消费的 Snapshot 来协调 Checkpoint,在 Flink 的 Checkpoint Coordinator 向 Source 发出 Checkpoint 的请求时,会强制要求将 Checkpoint 插入到两个 Snapshot 的数据之间。如果当前的 Snapshot 还没有完全被消费,这个 Checkpoint 的触发会被推迟,从而实现按照 Snapshot 对数据进行划分和处理。
数据血缘

概念

数据从产生到消费的整个流转过程中所经历的各种转换、处理和流动的轨迹。数据血缘提供了数据的来源、去向以及中间处理过程的透明度,帮助用户理解数据如何在系统中被处理和移动,以及数据是如何从原始状态转化为最终的可消费形态。
实现

在checkpoint的提交时将数据的血缘关系写入到System Table,记录血缘关系。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表