马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
启动 spark-sql
由于 iceberg 相干的 jars 已经在 ${SPARK_HOME}/jars 目次,以是不用 --jars 大概 --package 参数。
- spark-shell --master local[1] \
- --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
- --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
- --conf spark.sql.catalog.spark_catalog.type=hive
复制代码 Compaction – 合并
特别是及时计算,每次插入特别少的记录,导致生成大量的文件。
表参考上一节的 orders
- CALL spark_catalog.system.rewrite_data_files(table => 'iceberg.orders', options => map('min-input-files','2'));
复制代码 rewrite_data_files 参数说明和输出说明
Partial Progress
Compaction之前,把文件举行分组。
每组文件合并之后,就生成一个新的快照,如许新的查询就可以利用合并的快照。也能防止大 Compaction 的 OOM 标题。必要平衡创建快照的代价和Partial Progress 的收益。可以用 max-commits 调节 compaction job 的最多提交次数。
- call spark_catalog.system.rewrite_data_files(
- table => 'tablename',
- strategy => 'binpack',
- where => 'created_at between "2024-09-20 00:00:00" and "2024-09-26 23:59:59"',
- options => map(
- 'rewrite-job-order', 'byte-asc',
- 'target-file-size-bytes',1073741824,
- 'max-file-group-size-bytes',10737418240,
- 'partial-progress-enabled', 'true'
- )
- );
复制代码 注意: created_at 是字段名。
自动合并
使用调度系统大概 crontab。
隐式分区信息
分区信息不依赖于文件的物理结构,是在 snapshot 文件和 manifest 文件中。
Partition 进化
表刚开始大概按 year 举行分区。
- CREATE TABLE spark_catalog.memembers(...) PARTITIONED BY years(registration_ts) USING iceberg;
复制代码 随后大概改成按 months 举行分区
- ALTER TABLE spark_catalog.memembers(...) ADD PARTITION FIELD months(registration_ts);
复制代码 还可以删除分区字段
- ALTER TABLE spark_catalog.memembers(...) DROP PARTITION FIELD BUCKET(24,ID);
复制代码 更改分区字段,新的写入立刻生效。老的数据用 rewriteDataFiles 生效。
Copy-on-Write 和 Merge-on-Read 的对比
| 更新模式 | 读速度 | 写速度(updates/deletes) | 最佳实践 |
| : – | :-- | :__ | :-- |
| Copy-on-Write | 最快 | 最慢 | |
| Merge-on-Read(position deletes) | 快 | 快 | 使用常规合并减少读开销 |
| Merge-on-Read(equality deletes) | 慢 | 最快 | 使用更频繁的合并减少读开销 |
设置
- CREATE TABLE spark_catalog.people (
- id int,
- first_name string,
- last_name string)
- TBLPROPERTIES (
- 'write.delete.mode'='copy-on-write',
- 'write.update.mode'='merge-on-read',
- 'write.merge.mode'='merge-on-read'
- )
复制代码 其他考虑事项
统计信息收集(Metrics Collection)
如果表的字段非常多,大概照成 meta 文件特别大,可以关闭某些字段的收集大概限制收集的内容。
- alter table spark_catalog.dbname.tablename set TBLPROPERTIES (
- 'write.metadata.metrics.column.col1' = 'none',
- 'write.metadata.metrics.column.col2' = 'full',
- 'write.metadata.metrics.column.col3' = 'counts',
- 'write.metadata.metrics.column.col4' = 'truncate(16)');
复制代码 none: 不收集
counts: 仅收集 count(values, distinct values, null values),没有最大最小值。
truncate(xx): count 和最大最小值截断到指定位数。如 count distinct values时,仅统计截断后的 distinct。
full: 全部收集
Iceberg 默认 truncate(16)
重新 Manifests (Rewriting Manifests)
- call spark_catalog.system.rewrite_manifests('MyTable');
复制代码 如果抛出 OOM,可以关闭 spark caching。
- call spark_catalog.system.rewrite_manifests('MyTable', false);
复制代码 优化存储(Optimizing Storage)
设置快照保存的数目
- call spark_catalog.system.expire_snapshots('MyTable',
- TIMESTAMP('2024-09-10 00:00:00.000', 100));
复制代码 第2个参数设置了在此之前的快照都可以被扫除。如果快照的数目少于第3个参数设置的数目,则不扫除。
第3个参数设置了保存近来100个快照。
删除指定的快照ID,以下示例删除 ID 为 53 的快照。
- call spark_catalog.system.expire_snapshots('MyTable',
- snapshot_ids => ARRAY(53));
复制代码 删除孤立文件
没有包含在 manifest 中的文件,如失败的作业创建的文件。
- call spark_catalog.system.remove_orphan_files('MyTable');
复制代码 写入分布式模式(Write Distribution Mode)
- ALTER TABLE spark_catalog.MyTable SET TBLPROPERTIES(
- 'write.distribution-mode'='hash',
- 'write.delete.distribution-mode'='none',
- 'write.update.distribution-mode'='range',
- 'write.merge.distribution-mode'='hash',
- )
复制代码 对象存储考虑事项(Object Storage Considerations)
设置对象存储,会在表目次下创建多个写目次,分散 prefix 的压力。
- alter table orders set tblproperties ('write.object-storage.enabled'=true);
复制代码 数据文件布隆过滤器(Datafile Bloom Filters)
- ALTER TABLE spark_catalog.MyTable SET TBLPROPERTIES(
- 'write.parquet.bloom-filter-enabled.column.col1' = true,
- 'write.parquet.bloom-filter-max-bytes' = 1048576
- )
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |