Apache Iceberg 与 Spark整合-使用教程(Iceberg 官方文档剖析) ...

打印 上一主题 下一主题

主题 952|帖子 952|积分 2856

官方文档链接(Spark整合Iceberg)


1.Getting Started

Spark 目前是举行 Iceberg 操纵最丰富的计算引擎。官方建议从 Spark 开始,以理解 Iceberg 的概念和功能。
The latest version of Iceberg is 1.6.1.(2024年9月24日11:45:55)
在 Spark shell 中使用 Iceberg,需使用 --packages 选项:
  1. spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
复制代码
Tips:

如果您想将 Iceberg 包罗在 Spark 安装中,请将 iceberg-spark-runtime-3.5_2.12 Jar 添加到 Spark 的 jars 文件夹中。
Adding Catalogs 添加目次

Iceberg 提供了目次功能,使 SQL 命令能够管理表并通过名称加载它们。目次通过以下属性举行配置:spark.sql.catalog.(catalog_name)。
创建一个名为 local 的基于路径的目次,用于管理 $PWD/warehouse 下的表,并为 Spark 的内置目次添加对 Iceberg 表的支持:
  1. spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1 \
  2.     --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  3.     --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  4.     --conf spark.sql.catalog.spark_catalog.type=hive \
  5.     --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
  6.     --conf spark.sql.catalog.local.type=hadoop \
  7.     --conf spark.sql.catalog.local.warehouse=$PWD/warehouse
复制代码
创建 Iceberg 表

在 Spark 中创建第一个 Iceberg 表,可以使用 spark-sql shell 或 spark.sql(...) 来运行 CREATE TABLE 命令:

  1. -- local 是上述定义的基于路径的目录
  2. CREATE TABLE local.db.table (id bigint, data string) USING iceberg;
复制代码
Iceberg 目次支持完整的 SQL DDL 命令,包括:


  • CREATE TABLE ... PARTITIONED BY
  • CREATE TABLE ... AS SELECT
  • ALTER TABLE
  • DROP TABLE
写入数据

创建表后,可以使用 INSERT INTO 向表中插入数据:
  1. INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
  2. INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;
复制代码
Iceberg 还支持行级 SQL 更新,包括 MERGE INTO 和 DELETE FROM:
  1. MERGE INTO local.db.target t
  2. USING (SELECT * FROM updates) u
  3. ON t.id = u.id
  4. WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count
  5. WHEN NOT MATCHED THEN INSERT *;
复制代码
别的,Iceberg 支持通过新的 v2 DataFrame 写入 API 写入 DataFrames:
  1. spark.table("source").select("id", "data")
  2.      .writeTo("local.db.table").append()
复制代码
旧的写入 API 得到支持,但不推荐使用。
读取数据

要使用 SQL 读取数据,可以在 SELECT 查询中使用 Iceberg 表的名称:
  1. SELECT count(1) as count, data
  2. FROM local.db.table
  3. GROUP BY data;
复制代码
SQL 也是检查表的推荐方式。要查看表中的全部快照,可以使用快照元数据表:
  1. SELECT * FROM local.db.table.snapshots;
复制代码
输出:
  1. +-------------------------+----------------+-----------+-----------+----------------------------------------------------+
  2. | committed_at            | snapshot_id    | parent_id | operation | manifest_list                                      |
  3. +-------------------------+----------------+-----------+-----------+----------------------------------------------------+
  4. | 2019-02-08 03:29:51.215 | 57897183625154 | null      | append    | s3://.../table/metadata/snap-57897183625154-1.avro |
  5. |                         |                |           |           |                                                    |
  6. | ...                     | ...            | ...       | ...       | ...                                                |
  7. +-------------------------+----------------+-----------+-----------+----------------------------------------------------+
复制代码
DataFrame 读取也得到了支持,可以通过名称引用表:
  1. val df = spark.table("local.db.table")
  2. df.count()
复制代码
这就是在 Spark 中使用 Iceberg 创建、写入和读取表的基本步调。

Spark 和 Iceberg 的范例兼容性

Spark type to Iceberg type

Spark 范例Iceberg 范例备注booleanbooleanshortintegerbyteintegerintegerintegerlonglongfloatfloatdoubledoubledatedatetimestamp带时区的时间戳timestamp_ntz不带时区的时间戳charstringvarcharstringstringstringbinarybinarydecimaldecimalstructstructarraylistmapmap Tips:


  • 数字范例(integer、long、float、double、decimal)在写入时支持提升。例如,可以将 Spark 范例 short、byte、integer 或 long 写入 Iceberg 范例 long。
  • 可以使用 Spark 的 binary 范例写入 Iceberg 固定范例,但会举行长度验证。
Iceberg type to Spark type

Iceberg 范例Spark 范例备注booleanbooleanintegerintegerlonglongfloatfloatdoubledoubledatedatetime不支持带时区的时间戳timestamp不带时区的时间戳timestamp_ntzstringstringuuidstringfixedbinarybinarybinarydecimaldecimalstructstructlistarraymapmap 2.Spark DDL

(1)CREATE TABLE 创建表

Spark 3 can create tables in any Iceberg catalog with the clause USING iceberg:
  1. CREATE TABLE prod.db.sample (
  2.     id bigint NOT NULL COMMENT '唯一ID',
  3.     data string)
  4. USING iceberg;
复制代码

Iceberg会将Spark中的列范例转换为相应的Iceberg范例。
创建表的命令(包括CTAS和RTAS)支持一系列Spark创建选项,包括:


  • PARTITIONED BY (partition-expressions):配置分区。
  • LOCATION ‘(fully-qualified-uri)’:设置表的位置。
  • COMMENT ‘table documentation’:设置表描述。
  • TBLPROPERTIES (‘key’=‘value’, …):设置表配置。
Tips:CREATE TABLE ... LIKE ...语法不受支持。
(2)PARTITIONED BY 分区表的创建


  1. CREATE TABLE prod.db.sample (
  2.     id bigint,
  3.     data string,
  4.     category string)
  5. USING iceberg
  6. PARTITIONED BY (category);
复制代码
PARTITIONED BY子句支持转换表达式以创建隐藏分区
  1. CREATE TABLE prod.db.sample (
  2.     id bigint,
  3.     data string,
  4.     category string,
  5.     ts timestamp)
  6. USING iceberg
  7. PARTITIONED BY (bucket(16, id), days(ts), category);
复制代码
支持的分区转换表达式:


  • year(ts):按年分区
  • month(ts):按月分区
  • day(ts)date(ts):等同于按日期整数分区
  • hour(ts)date_hour(ts):等同于按日期整数和小时分区
  • bucket(N, col):按哈希值取模N的分区
  • truncate(L, col):按截断值分区(字符串按照给定长度截断;整数和长整型按区间截断,例如 truncate(10, i) 生成分区 0, 10, 20, 30, …)
   注:为了向后兼容,旧语法 years(ts)、months(ts)、days(ts) 和 hours(ts) 也被支持。
  
(3)创建表(CTAS)

Iceberg 支持使用 SparkCatalog 举行原子操纵的 CREATE TABLE AS SELECT(CTAS)。在使用 SparkSessionCatalog 时,CTAS 是不原子的。
基本语法:
  1. CREATE TABLE prod.db.sample
  2. USING iceberg
  3. AS SELECT ...;
复制代码


  • 新创建的表不会继承源表的分区规范和表属性。可以使用 PARTITIONED BY 和 TBLPROPERTIES 来声明新表的分区规范和表属性
示例:
  1. CREATE TABLE prod.db.sample
  2. USING iceberg
  3. PARTITIONED BY (part)
  4. TBLPROPERTIES ('key'='value')
  5. AS SELECT ...;
复制代码
(4)更换表(RTAS)



  • 原子更换表操纵会创建一个新快照,保存表的历史记录。
基本语法:
  1. REPLACE TABLE prod.db.sample
  2. USING iceberg
  3. AS SELECT ...;
复制代码
示例:
  1. REPLACE TABLE prod.db.sample
  2. USING iceberg
  3. PARTITIONED BY (part)
  4. TBLPROPERTIES ('key'='value')
  5. AS SELECT ...;
复制代码
(5)创建或更换表

  1. CREATE OR REPLACE TABLE prod.db.sample
  2. USING iceberg
  3. AS SELECT ...;
复制代码

  • 更换表时的影响

    • 如果使用 REPLACE TABLE 命令来更换一个表,而且新的查询效果的模式(schema)或分区规范(partition spec)发生了变化,那么原有的模式和分区会被新的内容更换。

  • 如何避免修改

    • 如果想保持表的现有模式和分区稳定,可以使用 INSERT OVERWRITE 命令,而不是 REPLACE TABLE。这样做可以更新表的数据,但不会影响表的布局或分区设置。

  • 表属性的处理处罚

    • 在 REPLACE TABLE 命令中,如果你定义了新的表属性,这些新属性会与现有的属性归并。
    • 如果新属性与现有属性相同,则保持稳定;如果差别,则会更新现有的属性。



  • 使用 REPLACE TABLE 会改变表的布局和分区。
  • 使用 INSERT OVERWRITE 可以仅更新数据而不影响布局。
  • 新的表属性会与旧的表属性归并,存在冲突时会更新。

(6)删除表



  • 在 0.14 之前,运行 DROP TABLE 将从目次中移除表,并删除表内容。
  • 从 0.14 开始,DROP TABLE 仅从目次中移除表。要删除表内容,需使用 DROP TABLE PURGE。
基本语法:


  • 删除表:
  1. DROP TABLE prod.db.sample;
复制代码


  • 删除表及其内容:
  1. DROP TABLE prod.db.sample PURGE;
复制代码

(7)修改表

1.重定名表 ALTER TABLE … RENAME TO:
  1. ALTER TABLE prod.db.sample RENAME TO prod.db.new_name;
复制代码
2.设置或移除表属性:


  • 设置属性 ALTER TABLE … SET TBLPROPERTIES
    1. ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    2.     'read.split.target-size'='268435456'
    3. );
    复制代码
  • 移除属性 ALTER TABLE … UNSET TBLPROPERTIES
    1. ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
    复制代码
  • 设置表解释
    1. ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    2.     'comment' = 'A table comment.'
    3. );
    复制代码


  • 添加、删除和重定名列

    • 添加列 ALTER TABLE … ADD COLUMN
      1. ALTER TABLE prod.db.sample ADD COLUMNS (
      2.     new_column string comment 'new_column docs'
      3. );
      复制代码



  • 添加嵌套字段

    • 创建布局列
      1. ALTER TABLE prod.db.sample ADD COLUMN point struct<x: double, y: double>;
      复制代码
    • 向布局中添加字段
      1. ALTER TABLE prod.db.sample ADD COLUMN point.z double;
      复制代码
    • 创建嵌套数组列
      1. ALTER TABLE prod.db.sample ADD COLUMN points array<struct<x: double, y: double>>;
      复制代码
    • 向数组中的布局添加字段
      1. ALTER TABLE prod.db.sample ADD COLUMN points.element.z double;
      复制代码
    • 创建映射列
      1. ALTER TABLE prod.db.sample ADD COLUMN points map<struct<x: int>, struct<a: int>>;
      复制代码
    • 向映射值布局中添加字段
      1. ALTER TABLE prod.db.sample ADD COLUMN points.value.b int;
      复制代码

  • 调解列的位置


    • 在其他列后添加新列
      1. ALTER TABLE prod.db.sample ADD COLUMN new_column bigint AFTER other_column;
      复制代码
    • 在最前面添加新列
      1. ALTER TABLE prod.db.sample ADD COLUMN nested.new_column bigint FIRST;
      复制代码



  • Note: Altering a map ‘key’ column by adding columns is not allowed. Only map values can be updated.
  • 修改map的“键”列以添加列是不答应的,只能更新map的“值”。

6.重定名列 RENAME COLUMN
使用 RENAME COLUMN 可以重定名任何字段:
  1. ALTER TABLE prod.db.sample RENAME COLUMN data TO payload;
  2. ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude;
复制代码
在 Iceberg 中,嵌套重定名命令只会影响到最底层的字段(即叶子字段)。
将 location.lat 重定名为 location.latitude。
  1. location
  2. └─ lat
  3. └─ long
复制代码
执行命令后,布局变为:
  1. location
  2. └─ latitude
  3. └─ long
复制代码
这里的 location 仍然是父布局,但 lat 字段被重定名为 latitude,而其他字段保持稳定。这就是嵌套重定名的含义。
7.修改列范例 ALTER COLUMN
使用 ALTER COLUMN 来修改列的范例,前提是这种更新是安全的。安全更新包括:


  • int 到 bigint
  • float 到 double
  • decimal(P,S) 到 decimal(P2,S) (当 P2 > P 时)

  1. ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double;
复制代码
8.更新列解释
  1. ALTER TABLE prod.db.sample ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
  2. ALTER TABLE prod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';
复制代码
9. 重新排序列
使用 FIRST 和 AFTER 子句,可以重新排序顶级列或布局中的列:
  1. ALTER TABLE prod.db.sample ALTER COLUMN col FIRST;
  2. ALTER TABLE prod.db.sample ALTER COLUMN nested.col AFTER other_col;
复制代码
10. 更改列的可空性
对于非可空列,可以使用 DROP NOT NULL 更改可空性:
  1. ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL;
复制代码
注意:不能通过 SET NOT NULL 将可空列更改为非可空列,因为 Iceberg 无法确保是否存在空值。
11. 删除列
要删除列,可以使用 DROP COLUMN:
  1. ALTER TABLE prod.db.sample DROP COLUMN id;
  2. ALTER TABLE prod.db.sample DROP COLUMN point.z;
复制代码

ALTER TABLE SQL 扩展

Iceberg 支持使用 ADD PARTITION FIELD 命令向分区规范中添加新的分区字段:
  1. ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog; -- 身份变换
复制代码
也可以使用差别的分区变换,例如:
  1. ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id);
  2. ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(4, data);
  3. ALTER TABLE prod.db.sample ADD PARTITION FIELD year(ts);
复制代码
使用可选的 AS 关键字可以为分区字段指定自定义名称:
  1. ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard;
复制代码


  • 添加分区字段是一个元数据操纵,不会更改现有表的数据。新数据将按照新的分区方式写入,但现有数据仍将保持在旧的分区布局中。在元数据表中,旧数据文件的新的分区字段将体现为 null 值。

当表的分区发生变化时,动态分区覆盖举动将发生变化,因为动态覆盖会隐式更换分区。要显式覆盖,请使用新的 DataFrameWriterV2 API。

如果从按天禀区迁移到按小时分区,动态分区覆盖举动将有所差别。例如,如果原本是按天禀区,改为按小时分区,覆盖操纵将只覆盖小时分区,而不再覆盖天禀区。
如果需要从日分区迁移到小时分区,建议保存日分区字段,以确保现有元数据表查询能够继续正常工作。

3.Spark Queries

选择表中的全部记录

  1. SELECT * FROM prod.db.table;
复制代码


  • prod 是目次,db 是定名空间,table 是表名。
访问元数据表:
可以使用 Iceberg 表名作为定名空间查询元数据表,例如,要读取特定 Iceberg 表的文件元数据:

使用 DataFrame 查询

要将 Iceberg 表加载为 Spark 中的 DataFrame,可以使用以下命令:
  1. val df = spark.table("prod.db.table")
复制代码
使用此命令可以利用 Spark DataFrame 操纵的全范围对 DataFrame 举行操纵。
加载 DataFrame 后,可以执行各种操纵。例

  • 体现 DataFrame:
    1. df.show()
    复制代码
  • 过滤记录:
    1. val filteredDf = df.filter($"columnName" === "someValue")
    复制代码
  • 分组和聚合:
    1. val aggregatedDf = df.groupBy("columnName").count()
    复制代码
  • 写回 Iceberg 表:
    1. filteredDf.write.format("iceberg").mode("append").save("prod.db.table")
    复制代码

Iceberg 表的时间观光(Time Travel)

从 Spark 3.3 开始,Iceberg 支持在 SQL 查询中使用 TIMESTAMP AS OF 或 VERSION AS OF 子句举行时间观光。
时间观光查询示例:

  • 基于时间戳的查询

    • 时间观光到1986年10月26日01:21:00
      1. SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
      复制代码

  • 基于快照 ID 的查询

    • 时间观光到快照 ID 为 10963874102873 的快照
      1. SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
      复制代码

  • 基于分支的查询

    • 时间观光到 audit-branch 的最新快照
      1. SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
      复制代码

  • 基于标签的查询

    • 时间观光到 historical-snapshot 标签引用的快照
      1. SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
      复制代码

时间戳也可以以 Unix 时间戳(秒)提供:


  • 使用 Unix 时间戳查询
    1. SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;
    2. SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;
    复制代码
可以使用类似于元数据表的语法指定分支或标签:


  • 指定分支
    1. SELECT * FROM prod.db.table.`branch_audit-branch`;
    复制代码
  • 指定标签
    1. SELECT * FROM prod.db.table.`tag_historical-snapshot`;
    复制代码
(包罗“-”的标识符无效,因此必须使用反引号转义。)
差别的时间观光查询可以使用快照的架构或表的架构:


  • 使用快照的架构(基于时间戳和快照 ID 的查询)
    1. SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
    2. SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
    3. SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
    4. SELECT * FROM prod.db.table.`tag_historical-snapshot`;
    复制代码
  • 使用表的架构(基于分支的查询)
    1. SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
    2. SELECT * FROM prod.db.table.`branch_audit-branch`;
    复制代码

Iceberg 中的DataFrame时间观光

Iceberg 支持在 DataFrame API 中使用四个 Spark 读取选项来选择特定的表快照或特定时间的快照:

  • snapshot-id: 选择特定的表快照。
  • as-of-timestamp: 选择某个时间点的当前快照,单元为毫秒。
  • branch: 选择指定分支的最新快照。注意,目前不能将分支与时间戳联合使用。
  • tag: 选择与指定标签关联的快照。标签也不能与时间戳联合使用。
示例:

  • 时间观光到1986年10月26日01:21:00
    1. spark.read
    2.     .option("as-of-timestamp", "499162860000")
    3.     .format("iceberg")
    4.     .load("path/to/table")
    复制代码
  • 时间观光到快照 ID 为 10963874102873 的快照
    1. spark.read
    2.     .option("snapshot-id", 10963874102873L)
    3.     .format("iceberg")
    4.     .load("path/to/table")
    复制代码
  • 时间观光到标签 historical-snapshot
    1. spark.read
    2.     .option(SparkReadOptions.TAG, "historical-snapshot")
    3.     .format("iceberg")
    4.     .load("path/to/table")
    复制代码
  • 时间观光到 audit-branch 的最新快照
    1. spark.read
    2.     .option(SparkReadOptions.BRANCH, "audit-branch")
    3.     .format("iceberg")
    4.     .load("path/to/table")
    复制代码

Iceberg增量读取


  • start-snapshot-id: 用于增量扫描的起始快照 ID(不包括该快照)。
  • end-snapshot-id: 用于增量扫描的结束快照 ID(包括该快照)。这是可选的。如果省略,将默认为当前快照。
示例代码
  1. // 获取在 start-snapshot-id (10963874102873L) 之后追加的数据,直到 end-snapshot-id (63874143573109L)
  2. spark.read
  3.   .format("iceberg")
  4.   .option("start-snapshot-id", "10963874102873")
  5.   .option("end-snapshot-id", "63874143573109")
  6.   .load("path/to/table")
复制代码


  • 当前仅支持从追加操纵中获取数据,无法支持更换、覆盖或删除操纵。
  • 增量读取适用于 V1 和 V2 格式版本。
  • Spark 的 SQL 语法不支持增量读取。


Iceberg 支持使用元数据表来检查表的历史和快照。元数据表通过在原始表名称后添加元数据表名来辨认。例如,查看 db.table 的历史可以使用 db.table.history。
表历史查询

要体现表的历史,可以执行以下查询:
  1. SELECT * FROM prod.db.table.history;
复制代码
made_current_atsnapshot_idparent_idis_current_ancestor2019-02-08 03:29:51.2155781947118336215154NULLtrue2019-02-08 03:47:55.94851792995261850568305781947118336215154true2019-02-09 16:24:30.132964100402475335445179299526185056830false2019-02-09 16:32:47.33629998756080624373305179299526185056830true2019-02-09 19:42:03.91989245587860605834792999875608062437330true2019-02-09 19:49:16.34365367338231819750458924558786060583479true 元数据日记条目查询

要查看表的元数据日记条目,可以执行以下查询:
  1. SELECT * FROM prod.db.table.metadata_log_entries;
复制代码
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number2022-07-28 10:43:52.93s3://…/table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull2022-07-28 10:43:57.487s3://…/table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json170260833677645300012022-07-28 10:43:58.25s3://…/table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402 快照查询

  1. SELECT * FROM prod.db.table.snapshots;
复制代码
示例效果:
committed_atsnapshot_idparent_idoperationmanifest_listsummary2019-02-08 03:29:51.21557897183625154nullappends3://…/table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, spark.app.id -> application_1520379288616_155055 }
Apache Iceberg 是一个高性能的表格式,用于大数据处理处罚。下面我将根据你提到的各个方面,概述 Iceberg 的使用。
Entries 快照的详细信息

Entries 表提供了有关表中每个快照的详细信息。查询示例如下:
  1. SELECT * FROM my_catalog.db.my_table.entries;
复制代码

4.Spark Write

1. 特性支持

Iceberg 利用 Apache Spark 的 DataSourceV2 API,支持多种写入方式。差别版本的 Spark 对某些功能的支持水平差别:
功能Spark 3备注SQL 插入✔️⚠ 需要 spark.sql.storeAssignmentPolicy=ANSI(自 Spark 3.0 默认)SQL 归并✔️⚠ 需要 Iceberg Spark 扩展SQL 覆盖插入✔️⚠ 需要 spark.sql.storeAssignmentPolicy=ANSISQL 删除✔️⚠ 行级删除需要 Iceberg Spark 扩展SQL 更新✔️⚠ 需要 Iceberg Spark 扩展DataFrame 附加写入✔️DataFrame 覆盖写入✔️DataFrame CTAS 和 RTAS✔️⚠ 需要 DSv2 API 2. 使用 SQL 写入

Spark 3 支持 SQL 的 INSERT INTO、MERGE INTO 和 INSERT OVERWRITE 操纵。
INSERT INTO

用于向表中追加新数据:
  1. INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b');
  2. INSERT INTO prod.db.table SELECT ...;
复制代码
MERGE INTO

支持对目标表举行行级更新。Iceberg 通过重写包罗需要更新行的数据文件来实现 MERGE INTO。
推荐使用 MERGE INTO 而不是 INSERT OVERWRITE,因为它只更换受影响的数据文件,避免了因分区变化导致的数据覆盖差别等问题。
  1. MERGE INTO prod.db.target t   -- 目标表
  2. USING (SELECT ...) s          -- 源更新
  3. ON t.id = s.id                -- 用于找到更新的条件
  4. WHEN MATCHED AND s.op = 'delete' THEN DELETE
  5. WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
  6. WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
  7. WHEN NOT MATCHED THEN INSERT *;
复制代码
可以根据条件添加多个 WHEN MATCHED 子句。如果源数据中的多条记录匹配同一目标表的行,将会抛出错误。

INSERT OVERWRITE

在 Iceberg 中,INSERT OVERWRITE 答应用查询效果更换表中的数据。此操纵是原子的,确保数据的同等性。
1. 覆盖举动

Iceberg 支持两种覆盖模式:静态模式动态模式


  • 静态覆盖模式(默认):

    • 通过将 PARTITION 子句转换为过滤器来确定要覆盖的分区。
    • 如果省略了 PARTITION 子句,将更换表中的全部分区。

  • 动态覆盖模式(推荐):

    • 仅更换由 SELECT 查询产生的行所在的分区。
    • 通过设置 spark.sql.sources.partitionOverwriteMode=dynamic 来启用。

2. 示例表布局

以下是一个示例日记表的 DDL 定义:
  1. CREATE TABLE prod.my_app.logs (
  2.     uuid string NOT NULL,
  3.     level string NOT NULL,
  4.     ts timestamp NOT NULL,
  5.     message string)
  6. USING iceberg
  7. PARTITIONED BY (level, hours(ts))
复制代码
3. 动态覆盖示例

当 Spark 的覆盖模式为动态时,以下查询会更换全部包罗查询效果的分区:
  1. INSERT OVERWRITE prod.my_app.logs
  2. SELECT uuid, first(level), first(ts), first(message)
  3. FROM prod.my_app.logs
  4. WHERE cast(ts as date) = '2020-07-01'
  5. GROUP BY uuid
复制代码
在动态模式下,仅会更换 2020 年 7 月 1 日的小时分区。
4. 静态覆盖示例

在静态模式下,如果没有 PARTITION 子句,将更换全部现有行:
  1. INSERT OVERWRITE prod.my_app.logs
  2. SELECT uuid, first(level), first(ts), first(message)
  3. FROM prod.my_app.logs
  4. WHERE cast(ts as date) = '2020-07-01'
  5. GROUP BY uuid
复制代码
这将删除表中全部行,只写入 7 月 1 日的日记。
要仅覆盖特定分区,可以添加 PARTITION 子句:
  1. INSERT OVERWRITE prod.my_app.logs
  2. PARTITION (level = 'INFO')
  3. SELECT uuid, first(level), first(ts), first(message)
  4. FROM prod.my_app.logs
  5. WHERE level = 'INFO'
  6. GROUP BY uuid
复制代码
注意:静态模式无法像动态模式那样更换小时分区,因为 PARTITION 子句只能引用表列,而不能引用隐藏分区。

DELETE

DELETE FROM 查询答应根据条件过滤来删除表中的行。

  • 删除指定时间范围内的记录:
    1. DELETE FROM prod.db.table
    2. WHERE ts >= '2020-05-01 00:00:00' AND ts < '2020-06-01 00:00:00'
    复制代码
  • 删除all_events表中 session_time 小于good_events表中的最小 session_time 的记录:
    1. DELETE FROM prod.db.all_events
    2. WHERE session_time < (SELECT MIN(session_time) FROM prod.db.good_events)
    复制代码
  • 删除orders表中存在于returned_orders表的订单:
    1. DELETE FROM prod.db.orders AS t1
    2. WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
    复制代码
注意:


  • 如果删除条件匹配整个分区,Iceberg 将执行元数据仅删除(metadata-only delete)。
  • 如果删除条件匹配单个行,Iceberg 将仅重写受影响的数据文件。
UPDATE


  • 更新指定时间范围内的记录的字段值:
    1. UPDATE prod.db.table
    2. SET c1 = 'update_c1', c2 = 'update_c2'
    3. WHERE ts >= '2020-05-01 00:00:00' AND ts < '2020-06-01 00:00:00'
    复制代码
  • 更新all_events表中 session_time 小于good_events表中的最小 session_time 的记录:
    1. UPDATE prod.db.all_events
    2. SET session_time = 0, ignored = true
    3. WHERE session_time < (SELECT MIN(session_time) FROM prod.db.good_events)
    复制代码
  • 更新orders表中存在于returned_orders表的订单状态:
    1. UPDATE prod.db.orders AS t1
    2. SET order_status = 'returned'
    3. WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
    复制代码

Iceberg 分支写入指南



  • 分支存在性:在执行任何写入操纵之前,分支必须已经存在。可以使用 Spark DDL 命令创建分支。
  • 模式验证:在向分支写入数据时,将验证表的当前模式。
通过 SQL 写入


  • 插入到审计分支
    1. INSERT INTO prod.db.table.branch_audit VALUES (1, 'a'), (2, 'b');
    复制代码
  • 归并到审计分支
    1. MERGE INTO prod.db.table.branch_audit t
    2. USING (SELECT ...) s        
    3. ON t.id = s.id         
    4. WHEN ...
    复制代码
  • 更新审计分支
    1. UPDATE prod.db.table.branch_audit AS t1
    2. SET val = 'c';
    复制代码
  • 从审计分支删除
    1. DELETE FROM prod.db.table.branch_audit WHERE id = 2;
    复制代码
  • WAP 分支写入
    1. SET spark.wap.branch = audit-branch;
    2. INSERT INTO prod.db.table VALUES (3, 'c');
    复制代码
通过 DataFrame 写入


  • 插入到审计分支
    1. val data: DataFrame = ...
    2. data.writeTo("prod.db.table.branch_audit").append()
    复制代码
  • 覆盖审计分支
    1. val data: DataFrame = ...
    2. data.writeTo("prod.db.table.branch_audit").overwritePartitions()
    复制代码

后续继续更新~

列位可以移步博主Apache Iceberg专栏,大概对您理解Iceberg有所资助

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

勿忘初心做自己

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表