大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3) ...

打印 上一主题 下一主题

主题 878|帖子 878|积分 2634

Paimon的下载及安装,并且相识了主键表的引擎以及changelog-producer的含义参考:


  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)
利用Paimon表做lookup join,集成mysql cdc等参考:


  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)
利用Paimon的Tag兼容Hive,Branch管理等参考:


  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)
大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

本日,我们继续快速相识下近来比较火的Apache Paimon:


  • 官方文档:https://paimon.apache.org/docs/1.0/
  • 推荐阅读:当流计算邂逅数据湖:Paimon 的宿世今世
1 利用Tag兼容Hive



  • Paimon 的每一次写都会天生一个 Immutable 的快照,快照可以被 Time Travel 的读取。
  • 但在大多数情况下,作业会天生过多的快照,所以根据表设置,快照会在合适的时间点被过期。快照过期还会删除旧的数据文件,过期快照的汗青数据将无法再查询。
  • 要解决此问题,可以基于快照创建 Tag。Tag 将维护快照的清单和数据文件。

    • 典型的用法是每天创建Tag(如下图所示),然后可以维护每天的汗青数据以举行批式查询。
    • 推荐在 ODS 层使用 Tag 来替代 Hive 的分区,但是后续的 DWD 和 DWS 不建议。


1.1 Tag创建

1.1.1 自动创建

  1. -- Flink SQL
  2. CREATE TABLE t (
  3.     k INT PRIMARY KEY NOT ENFORCED,
  4.     f0 INT,
  5.     ...
  6. ) WITH (
  7.     'tag.automatic-creation' = 'process-time',  -- 基于process-time自动创建
  8.     'tag.creation-period' = 'daily',            -- 创建间隔:每天
  9.     'tag.creation-delay' = '10 m',              -- 延迟10min
  10.     'tag.num-retained-max' = '90'               -- 最大保存90天
  11. );
复制代码


  • 上面设置表明每天0点10分钟创建一个 Tag,最大保存3个月的 Tag,Flink 流式写入,自动创建 Tags,自动清理 Tags。
1.1.2 利用Action包创建Tag

  1. <FLINK_HOME>/bin/flink run \
  2.     /path/to/paimon-flink-action-1.0.0.jar \
  3.     create_tag \
  4.     --warehouse <warehouse-path> \
  5.     --database <database-name> \
  6.     --table <table-name> \
  7.     --tag_name <tag-name> \
  8.     [--snapshot <snapshot_id>] \
  9.     [--time_retained <time-retained>] \
  10.     [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
复制代码


  • 如果未设置snapshot, 那么默认snapshot_id默认为最新
  • 固然,也能删除Tag、回滚Tag,可以参考官网下令:

    • Manage Tags | Apache Paimon

1.2 利用Tag映射



  • 有了 Tag 后,可以在 Flink SQL 或者 Spark SQL 里使用 Time Travel 来查询 Tags;
  • 但是这给业务带来了一个问题,老的 Hive SQL 如何兼容?老的 Hive 可是一个全量分区表,而 Paimon 表是一个非分区主键表,Hive 数据仓库的传统使用更习惯于使用分区来指定查询的 Tag。
  • paimon引入了 metastore.tag-to-partition 和 metastore.tag-to-partition.preview'(设置此参数可以让 Hive SQL 查询到未 Tag 的分区,比如当前最新数据) 来将未分区的主键表映射到 Hive metastore 中的分区表,并映射分区字段为 Tag 查询。
  • Flink 联合 Paimon 打造的入湖架构如下:

    • 通过 Flink CDC 一键全增量一体入湖到 Paimon,此任务可以设置 Tag 的自动创建,然后通过 Paimon 的本领,将 Tag 映射为 Hive 的分区,完全兼容原有 Hive SQL 的用法。
    • 优势如下:

      • 架构链路复杂度低,不再因为各种组件的问题导致链路延时,你只用运维这一个流作业,而且可以完全兼容原有 Hive SQL 用法。
      • 时延低:延时取决于流作业的 Checkpoint Interval,数据最低1分钟实时可见 (建议1-5分钟)。不但如此,Paimon 也提供了流读的本领,让你完身分钟级的 Streaming 计算,也可以写到下游别的存储。
      • 存储成本低:得益于湖格式的 Snapshot 管理,加上 LSM 的文件复用,比如同样是存储 100天的快照,原有 Hive 数仓 100 天需要 100 份的存储,Paimon 在某些增量数据不多的场景只需要 2 份的存储,大幅节省存储资源。
      • 计算成本低:得益于 LSM 的增量合并本领,此条链路只有增量数据的处理,没有全量的合并。可能有用户会担心,常驻的流作业会斲丧更多的资源,对 Paimon 来说,你可以打开纯异步 Compaction 的机制,以 Paimon 良好的性能表现,只用少量的资源即可完成同步,Paimon 另有整库同步等本领资助节省资源。



1.2.1 tag-to-partition

  1. -- 创建映射的paimon表
  2. Flink SQL> drop table if exists mydb_t;
  3. Flink SQL> CREATE TABLE mydb_t (
  4.     pk INT,
  5.     col1 STRING,
  6.     col2 STRING
  7. ) WITH (
  8.     'bucket' = '-1',
  9.     -- Only Hive Engine can be used to query these upsert-to-partitioned tables.
  10.     -- 将tag映射为hive分区
  11.     'metastore.tag-to-partition' = 'dt'
  12. );
  13. -- 插入数据
  14. -- snapshot=1
  15. Flink SQL> INSERT INTO mydb_t VALUES (1, '10', '100'), (2, '20', '200');
  16. -- snapshot=2
  17. Flink SQL> INSERT INTO mydb_t VALUES (3, '30', '300'), (4, '40', '400');
复制代码


  • 然后,利用action包创建Tag
  1. # 利用action 包创建tag\
  2. # 依旧利用hive元数据做catalog
  3. [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
  4.     /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
  5.     create_tag \
  6.     --warehouse hdfs://centos01:8020/user/hive/warehouse \
  7.     --database paimon_db \
  8.     --table mydb_t \
  9.     --tag_name '2025-02-18' \
  10.         --catalog-conf metastore=hive \
  11.     --catalog-conf uri=thrift://centos01:9083 \
  12.     --snapshot 1
  13.    
  14. [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
  15.     /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
  16.     create_tag \
  17.     --warehouse hdfs://centos01:8020/user/hive/warehouse \
  18.     --database paimon_db \
  19.     --table mydb_t \
  20.     --tag_name '2025-02-19' \
  21.         --catalog-conf metastore=hive \
  22.     --catalog-conf uri=thrift://centos01:9083 \
  23.     --snapshot 2
复制代码


  • 我们就可以在hive中查看分区,且查询数据
  1. 0: jdbc:hive2://192.168.42.101:10000> show partitions mydb_t;
  2. +----------------+
  3. |   partition    |
  4. +----------------+
  5. | dt=2025-02-18  |
  6. | dt=2025-02-19  |
  7. +----------------+
  8. 2 rows selected (0.438 seconds)
  9. 0: jdbc:hive2://192.168.42.101:10000> select * from mydb_t a where dt = '2025-02-18';
  10. +-------+---------+---------+-------------+
  11. | a.pk  | a.col1  | a.col2  |    a.dt     |
  12. +-------+---------+---------+-------------+
  13. | 1     | 10      | 100     | 2025-02-18  |
  14. | 2     | 20      | 200     | 2025-02-18  |
  15. +-------+---------+---------+-------------+
  16. 2 rows selected (3.27 seconds)
  17. 0: jdbc:hive2://192.168.42.101:10000> select * from mydb_t a where dt = '2025-02-19';
  18. +-------+---------+---------+-------------+
  19. | a.pk  | a.col1  | a.col2  |    a.dt     |
  20. +-------+---------+---------+-------------+
  21. | 1     | 10      | 100     | 2025-02-19  |
  22. | 2     | 20      | 200     | 2025-02-19  |
  23. | 3     | 30      | 300     | 2025-02-19  |
  24. | 4     | 40      | 400     | 2025-02-19  |
  25. +-------+---------+---------+-------------+
复制代码
1.2.2 tag-to-partition.preview



  • 上述示例只能查询已经创建的tag,但Paimon是一个实时数据湖,您还需要查询最新的数据。因此,Paimon提供了一个预览功能
  • 'metastore.tag-to-partition.preview'可选值如下:

    • “none”:不自动创建标签;
    • “process-time”:基于呆板时间,当处理时间超过周期时间加上耽误时,创建标签;
    • “watermark”:基于输入的watermark,当watermark超过周期时间加上耽误时,创建标签;
    • “batch”:在批处理场景中,任务完成后天生当前快照对应的标签。

  1. Flink SQL> drop table if exists mydb_preview;
  2. Flink SQL> CREATE TABLE mydb_preview (
  3.     pk INT,
  4.     col1 STRING,
  5.     col2 STRING
  6. ) WITH (
  7.     'bucket' = '-1',
  8.     'metastore.tag-to-partition' = 'dt',
  9.     -- paimon会基于process-time提前创建partitions
  10.     'metastore.tag-to-partition.preview' = 'process-time'
  11. );
  12. -- snapshot=1
  13. Flink SQL> INSERT INTO mydb_preview VALUES (1, '10', '100'), (2, '20', '200');
  14. -- create tag '2025-02-19' for snapshot 1
  15. [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
  16.     /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
  17.     create_tag \
  18.     --warehouse hdfs://centos01:8020/user/hive/warehouse \
  19.     --database paimon_db \
  20.     --table mydb_preview \
  21.     --tag_name '2025-02-19' \
  22.         --catalog-conf metastore=hive \
  23.     --catalog-conf uri=thrift://centos01:9083 \
  24.     --snapshot 1
  25. 0: jdbc:hive2://192.168.42.101:10000> show partitions mydb_preview;;
  26. +----------------+
  27. |   partition    |
  28. +----------------+
  29. | dt=2025-02-19  |
  30. | dt=2025-02-20  |
  31. +----------------+
  32. 2 rows selected (0.085 seconds)
  33. 0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-19';
  34. +-------+---------+---------+-------------+
  35. | a.pk  | a.col1  | a.col2  |    a.dt     |
  36. +-------+---------+---------+-------------+
  37. | 1     | 10      | 100     | 2025-02-19  |
  38. | 2     | 20      | 200     | 2025-02-19  |
  39. +-------+---------+---------+-------------+
  40. 2 rows selected (0.292 seconds)
  41. 0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-20';
  42. +-------+---------+---------+-------------+
  43. | a.pk  | a.col1  | a.col2  |    a.dt     |
  44. +-------+---------+---------+-------------+
  45. | 1     | 10      | 100     | 2025-02-20  |
  46. | 2     | 20      | 200     | 2025-02-20  |
  47. +-------+---------+---------+-------------+
  48. 2 rows selected (0.263 seconds)
  49. -- new data in '2025-02-20'
  50. Flink SQL> INSERT INTO mydb_preview VALUES (3, '30', '300'), (4, '40', '400');
  51. 0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-20';
  52. +-------+---------+---------+-------------+
  53. | a.pk  | a.col1  | a.col2  |    a.dt     |
  54. +-------+---------+---------+-------------+
  55. | 1     | 10      | 100     | 2025-02-20  |
  56. | 2     | 20      | 200     | 2025-02-20  |
  57. | 3     | 30      | 300     | 2025-02-20  |
  58. | 4     | 40      | 400     | 2025-02-20  |
  59. +-------+---------+---------+-------------+
复制代码
2 Branch分支管理



  • 在流式数据处理中,修正数据具有挑战性,因为它可能会影响现有数据,而用户会看到流式的临时结果,这是不盼望的。
  • 我们假设现有工作流正在处理的分支是main分支。通过创建自定义数据分支,可以在现有表上对新任务举行实验性测试和数据验证,而无需制止现有的读取/写入工作流,也无需从主分支复制数据。
  • 通过合并或替换分支操纵,用户可以完成数据的修正。
  1. -- 1、创建paimon表
  2. Flink SQL> drop table if exists flink_branch_demo;
  3. Flink SQL> CREATE TABLE flink_branch_demo (
  4.     dt STRING NOT NULL,
  5.     name STRING NOT NULL,
  6.     amount BIGINT,
  7.     PRIMARY KEY (dt, name) NOT ENFORCED
  8. ) PARTITIONED BY (dt)
  9. WITH (
  10.     'connector' = 'paimon'
  11. );
  12. -- 2、创建一个专门用于流写的分支streambranch, 这个分支将负责接收实时流入的数据。
  13. [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
  14.     /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
  15.     create_branch \
  16.     --warehouse hdfs://centos01:8020/user/hive/warehouse \
  17.     --database paimon_db \
  18.     --table flink_branch_demo \
  19.     --branch_name streambranch \
  20.     --catalog-conf metastore=hive \
  21.     --catalog-conf uri=thrift://centos01:9083
  22. -- 3、设置流写分支的属性
  23. Flink SQL> ALTER TABLE  `flink_branch_demo$branch_streambranch` SET (
  24.     'bucket' = '4',
  25.     'changelog-producer' = 'lookup'
  26. );
  27. -- 4、设置回滚分支(如果要实现分支回滚必须要设置该参数)
  28. Flink SQL> ALTER TABLE flink_branch_demo SET ( 'scan.fallback-branch' = 'streambranch' );
  29. -- 5、写入数据
  30. -- 5-1、主分支写入数据
  31. Flink SQL> insert into flink_branch_demo values  ('20240725', 'apple', 3), ('20240725', 'banana', 5);
  32. Flink SQL> select * from flink_branch_demo;
  33. +----------+--------+--------+
  34. |       dt |   name | amount |
  35. +----------+--------+--------+
  36. | 20240725 |  apple |      3 |
  37. | 20240725 | banana |      5 |
  38. +----------+--------+--------+
  39. 2 rows in set
  40. --  5-2、再往streambranch分支写入数据
  41. Flink SQL> INSERT INTO `flink_branch_demo$branch_streambranch`
  42. VALUES ('20240725', 'apple', 666), ('20240725', 'peach', 999), ('20240726', 'cherry', 33), ('20240726', 'pear', 88);
  43. -- 5-3、查询主分支
  44. -- 20240725分区的新的数据没有生效! 那说明原表已经有的分区的数据,在streambranch写入这些分区的数据,原表是不会更新的,只要是往原表里面写了某个分区的数据,那么这个分区的数据以写入原表主分支的为准。
  45. -- 原表主分支没有的分区的数据,则按照streambranch读取,因为设置了原表的 'scan.fallback-branch' = 'streambranch' ,读取原表可以查到streambranch这部分的数据。
  46. Flink SQL> select * from flink_branch_demo;
  47. +----------+--------+--------+
  48. |       dt |   name | amount |
  49. +----------+--------+--------+
  50. | 20240726 | cherry |     33 | -- 26号分区主表没有,使用了分支表中的数据
  51. | 20240726 |   pear |     88 |
  52. | 20240725 |  apple |      3 | -- 25号的分区使用了主表中的数据
  53. | 20240725 | banana |      5 |
  54. +----------+--------+--------+
  55. -- 5-4、查询流分支
  56. Flink SQL> select * from `flink_branch_demo$branch_streambranch` ;
  57. +----------+--------+--------+
  58. |       dt |   name | amount |
  59. +----------+--------+--------+
  60. | 20240726 | cherry |     33 |
  61. | 20240726 |   pear |     88 |
  62. | 20240725 |  apple |    666 |
  63. | 20240725 |  peach |    999 |
  64. +----------+--------+--------+
  65. 4 rows in set
  66. -- 6、合并分支
  67. -- 合并分支表操作(Fast Forward),即:删除主表的一切数据,并将分支表的一切数据拷贝到主表
  68. [root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \
  69.     /opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \
  70.     fast_forward \
  71.     --warehouse hdfs://centos01:8020/user/hive/warehouse \
  72.     --database paimon_db \
  73.     --table flink_branch_demo \
  74.     --branch_name streambranch \
  75.     --catalog-conf metastore=hive \
  76.     --catalog-conf uri=thrift://centos01:9083
  77. -- 再次查询主表
  78. Flink SQL> select * from flink_branch_demo;
  79. +----------+--------+--------+
  80. |       dt |   name | amount |
  81. +----------+--------+--------+
  82. | 20240726 | cherry |     33 |
  83. | 20240726 |   pear |     88 |
  84. | 20240725 |  apple |    666 |
  85. | 20240725 |  peach |    999 |
  86. +----------+--------+--------+
  87. -- 7、数据回归到主分支版本(注意:不进行上面合并操作)
  88. Flink SQL> ALTER TABLE flink_branch_demo RESET( 'scan.fallback-branch');
复制代码
3 追加表(Append table)



  • 如果一个表没有定义主键,那它就是一个追加表(Append Table)。与主键表相比,追加表无法直接接收变更日记,也不能直接通过 upsert 更新数据,只能接收追加数据。
使用场景或优势说明批量写入和批量读取类似于常规的 Hive 分区表,实用于大规模数据的批量处理。友好的对象存储良好的兼容性和适应性,支持 S3、OSS 等对象存储。时间穿越和回滚支持数据的时间观光和回滚功能,方便数据的汗青查询和恢复。低成本的删除和更新在批量数据操纵中,可以或许以较低的计算和资源成本举行删除和更新操纵。流式接收中的小文件自动合并在流式写入过程中,自动处理小文件合并,减少存储碎片。队列形式的流式读写支持如队列般的流式读写操纵,可以像消息队列一样处理数据。高性能查询通过顺序和索引实现的高效查询性能。 3.1 流式处理



  • Append Table可以通过 Flink 举行非常机动的流式写入,并可以像队列一样通过 Flink 举行读取。
  • 唯一的区别是其耽误为分钟级别,但其优势在于非常低的成本以及可以或许举行过滤和投影下推。
3.1.1 小文件自动合并



  • 在流式写入作业中,如果没有定义分桶(bucket),写入器不会举行压缩;
  • 相反,将使用压缩协调器(Compact Coordinator)扫描小文件并将压缩任务传递给压缩工作者(Compact Worker)。
  • 流式模式下,如果在 Flink 中运行插入 SQL,拓扑结构将如下所示:



  • 留意:

    • 上面的压缩任务不会引起反压。
    • 如果设置 write-only 为 true,压缩协调器(Compact Coordinator)和压缩工作者(Compact Worker)将在拓扑中被移除。
    • 自动压缩仅在 Flink 引擎的流模式下被支持。可以通过 Paimon 在 Flink 中启动压缩作业,并通过设置 write-only 禁用全部其他压缩。

3.1.2 流式查询



  • 追加表可以像消息队列一样使用,举行流式查询,与主键表类似,有两个选项可以举行流式读取:

    • 默认模式:流式读取在首次启动时天生表的最新快照,并继续读取最新的增量记录。
    • 增量模式:可以指定 scan.mode 或 scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis 举行增量读取。

  • 追加表的流式查询类似 Flink-Kafka,默认情况下不包管顺序。如果数据需要某种顺序,也需要思量定义桶键(bucket-key),即Bucketed Append。
3.2 查询优化

3.2.1 按照顺序跳过查询



  • Paimon 默认在清单文件中记录每个字段的最大值和最小值。
  • 在查询时,根据查询的 WHERE 条件,通过清单中的统计信息举行文件过滤。如果过滤效果良好,查询时间可以从分钟级别加速到毫秒级别。
  • 然而,数据分布并不总是能有用过滤,因此如果可以根据 WHERE 条件中的字段对数据举行排序,将会更高效。

    • 详细可参考:Flink COMPACT Action or Flink COMPACT Procedure or Spark COMPACT Procedure.

3.2.2 按文件索引跳过数据



  • 如下代码所示,可以使用文件索引,会在读取端通过索引过滤文件

    • 定义 file-index.bloom-filter.columns 后,Paimon 将为每个文件创建相应的索引文件。
    • 如果索引文件太小,它将直接存储在清单中,否则将存储在数据文件的目录中。
    • 每个数据文件对应一个索引文件,该文件有独立的定义,可以包罗差别范例的多列索引。
    1. CREATE TABLE my_table (
    2.     product_id BIGINT,
    3.     price DOUBLE,
    4.     sales BIGINT
    5. ) WITH (
    6.     'file-index.bloom-filter.columns' = 'product_id',
    7.     'file-index.bloom-filter.product_id.items' = '200'
    8. );
    复制代码

  • 索引种类如下所示:
    1. # 布隆过滤器索引
    2. file-index.bloom-filter.columns:指定需要创建布隆过滤器索引的列。
    3. file-index.bloom-filter.<column_name>.fpp:配置布隆过滤器的误报率(False Positive Probability)。
    4. file-index.bloom-filter.<column_name>.items:配置每个数据文件中预期的唯一项数量。
    5. # Bitmap(位图索引):
    6. file-index.bitmap.columns:指定需要创建位图索引的列。
    7. # Bit-Slice Index Bitmap(位切片索引位图):
    8. file-index.bsi.columns:指定需要创建位切片索引(BSI)的列。
    9. 如果想为现有表添加文件索引,且不进行任何数据重写,可以使用rewrite_file_index过程。
    10. 在使用该过程之前,可以使用ALTER子句来为表配置file-index.<filter-type>.columns。
    11. 可以参考:
    12. https://paimon.apache.org/docs/1.0/flink/procedures/#procedures
    复制代码

    • 布隆过滤器索引和位图索引的区别
       特性布隆过滤器索引(Bloom Filter Index)位图索引(Bitmap Index)计划目标快速判断某个值是否可能存在,减少磁盘 I/O精确查询低基数列,支持多条件组合查询实现原理基于哈希函数的概率型数据结构基于Bitmap的精确索引结构实用数据范例高基数列(如唯一 ID、字符串等)低基数列(如性别、状态等)查询范例等值查询(=)等值查询(=)和多条件组合查询(AND、OR)存储效率存储空间小,得当大规模数据集低基数列存储效率高,高基数列存储开销大查询效率查询速率快,但存在误报率查询效率高,无误报率更新代价较低较高实用场景大数据集的快速过滤低基数列的精确查询和多条件组合查询   

    • 位图索引和位切片索引的区别
       特性Bitmap Index(位图索引)Bit-Slice Index (BSI)(位切片索引)实用数据范例低基数(即列中唯一值的数目较少)的任意范例(如罗列、状态等)高基数的数值型数据(如金额、时间戳等)查询范例等值查询、范围查询范围查询、聚合查询(如 SUM、MAX 等)存储效率低基数列高效,高基数列存储开销大高基数列存储效率高实现复杂度简单复杂更新代价较高较高
3.3 Bucketed Append



  • 可以指定 bucket 和 bucket-key 以创建一个Bucketed Append表。
  • 在Bucketed Append中,差别桶内的数据是严格有序的,流式读取将按写入顺序准确地传输记录。这样可以优化数据处理和查询性能。
  1. -- 创建Bucketed Append表
  2. CREATE TABLE my_table (
  3.     product_id BIGINT,
  4.     price DOUBLE,
  5.     sales BIGINT
  6. ) WITH (
  7.     'bucket' = '8',
  8.     'bucket-key' = 'product_id'
  9. );
复制代码

3.3.1 有界流



  • 流式泉源(Streaming Source)也可以是有界的,可以通过指定 scan.bounded.watermark 来定义有界流模式的竣事条件。
  • 比方,指定kafka源并声明watermark 的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将天生相应的watermark,以便流式读取此Paimon表时可以使用有界watermark的功能。
  1. -- 临时表
  2. drop TEMPORARY table if exists order_from_kafka;
  3. CREATE TEMPORARY TABLE order_from_kafka (
  4.     `user` int,
  5.     product STRING,
  6.     order_time TIMESTAMP(3),
  7.     WATERMARK FOR order_time AS order_time - INTERVAL '8' HOUR - INTERVAL '5' SECOND
  8. ) WITH (
  9.     'connector' = 'kafka',
  10.     'topic' = 'orders_test',
  11.     'properties.bootstrap.servers' = 'localhost:9092',
  12.     'format' = 'json',
  13.         'properties.group.id' = 'tGroup',
  14.         'json.fail-on-missing-field' = 'false',
  15.         'scan.startup.mode' = 'earliest-offset',
  16.         'json.ignore-parse-errors' = 'true'
  17. );
  18. -- 创建topic
  19. /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_test --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092
  20. -- paimon追加表
  21. drop table if exists paimon_r;
  22. CREATE TABLE paimon_r (
  23.     `user` int,
  24.     product STRING,
  25.     order_time TIMESTAMP(3),
  26.     WATERMARK FOR order_time AS order_time - INTERVAL '8' HOUR - INTERVAL '5' SECOND
  27. ) WITH (
  28.     'connector' = 'paimon'
  29. );
  30. -- 将Kafka表中的数据实时插入到Paimon表中:
  31. INSERT INTO paimon_r SELECT * FROM order_from_kafka;
  32. -- 启动有界流任务读取 Paimon 表
  33. -- 1696126500000 2023-10-01 10:15:00
  34. -- 当Flink处理过程中遇到第一个水印值大于或等于这个时间点的记录时,
  35. -- 它会停止继续读取后续的数据,即使数据源中还有更晚时间点的数据。
  36. Flink SQL> SELECT * FROM paimon_r /*+ OPTIONS('scan.bounded.watermark'='1696126500000') */;
  37. +----+-------------+--------------------------------+-------------------------+
  38. | op |        user |                        product |              order_time |
  39. +----+-------------+--------------------------------+-------------------------+
  40. | +I |        1001 |                      iPhone 15 | 2023-10-01 10:00:00.000 |
  41. | +I |        1002 |                    MacBook Pro | 2023-10-01 10:05:00.000 |
  42. | +I |        1003 |                    AirPods Pro | 2023-10-01 10:10:00.000 |
  43. | +I |        1004 |                       iPad Air | 2023-10-01 10:15:00.000 |
  44. +----+-------------+--------------------------------+-------------------------+
  45. Received a total of 4 rows
  46. -- 启动命令行生产者,模拟数据源源源不断地生产数据(每隔一段时间插入1条数据)
  47. /opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_test --bootstrap-server centos01:9092
  48. {"user": 1001, "product": "iPhone 15", "order_time": "2023-10-01 10:00:00"}
  49. {"user": 1002, "product": "MacBook Pro", "order_time": "2023-10-01 10:05:00"}
  50. {"user": 1003, "product": "AirPods Pro", "order_time": "2023-10-01 10:10:00"}
  51. -- "2023-10-01 10:15:00" 时候watermark是1696126495000,即:2023-10-01 10:14:55
  52. -- 此时有界流并未结束
  53. {"user": 1004, "product": "iPad Air", "order_time": "2023-10-01 10:15:00"}
  54. -- "2023-10-01 10:20:00" 时候watermark是1696126795000
  55. -- 即:2023-10-01 10:19:55 > 2023-10-01 10:15:00(1696126500000)
  56. -- 停止继续读取后续的数据,即使数据源中还有更晚时间点的数据
  57. {"user": 1005, "product": "Apple Watch", "order_time": "2023-10-01 10:20:00"}
  58. {"user": 1006, "product": "Apple Watch", "order_time": "2023-10-02 08:00:00"}
  59. {"user": 1007, "product": "Apple Watch", "order_time": "2023-10-03 08:20:00"}
  60. {"user": 1008, "product": "Apple Watch", "order_time": "2024-10-03 08:20:00"}
  61. {"user": 1009, "product": "Apple Watch", "order_time": "2025-10-03 08:20:00"}
复制代码
3.3.2 批处理(Batch)



  • 通过设置 spark.sql.sources.v2.bucketing.enabled 为 true,Spark 将识别 V2 数据源报告的特定分布,并在须要时尝试避免shuffle。
  • 如下代码所示,如果两个表具有类似的分桶策略和类似数目的桶,昂贵的 join shuffle 操纵将被避免。
  1. -- 在必要时尝试避免shuffle
  2. SET spark.sql.sources.v2.bucketing.enabled = true;
  3. -- 事实表
  4. CREATE TABLE FACT_TABLE (
  5.     order_id INT,
  6.     f1 STRING
  7. ) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');
  8. -- 维度表
  9. CREATE TABLE DIM_TABLE (
  10.     order_id INT,
  11.     f2 STRING
  12. ) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');
  13. SELECT
  14.         *
  15. FROM
  16.         FACT_TABLE
  17. JOIN
  18.         DIM_TABLE
  19. ON
  20.         FACT_TABLE.order_id = DIM_TABLE.order_id;
复制代码
注:


  • Paimon还有其他功能,这里就不再介绍,可以参考官网自行相识。比方:

    • Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新主键表记录、使用DELETE删除change-log数据;
    • 流式读取表时指定consumer-id,防止快照因为过期而被删除;
    • paimon提供了包罗有关每个表的元数据和信息的系统表,比方创建的快照和使用的选项。用户可以通过批量查询访问系统表。
      1. -- 快照表 Snapshots Table
      2. SELECT * FROM ws_t$snapshots;
      3. -- 模式表 Schemas Table
      4. SELECT * FROM ws_t$schemas;
      5. -- 选项表 Options Table
      6. SELECT * FROM ws_t$options;
      7. -- 标签表 Tags Table
      8. SELECT * FROM ws_t$tags;
      9. -- 审计日志表 Audit log Table
      10. SELECT * FROM ws_t$audit_log;
      11. ......
      复制代码
    • 可以集成其他引擎,如spark引擎等
    • Paimon表支持分区过期设置
    • 缩放Bucket官方示例

      • Rescale Bucket | Apache Paimon



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

老婆出轨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表