spark集成hudi详解

打印 上一主题 下一主题

主题 837|帖子 837|积分 2511

目录
  
  Spark 3 支持矩阵
  使用 Hudi 运行 spark-shell:
  创建表
  插入数据是关键!
  读取数据:
  索引数据
  常见写入策略
  常用配置项
  索引相干
  更新数据 
  归并数据
  删除数据
  时间观光查询
  增量查询
  
  
  Spark 3 支持矩阵

  hudi支持的 Spark 3 版本0.15.x 版本3.5.x(默认版本)、3.4.x、3.3.x、3.2.x、3.1.x、3.0.x0.14.x 版本3.4.x(默认版本)、3.3.x、3.2.x、3.1.x、3.0.x0.13.x 版本3.3.x(默认版本)、3.2.x、3.1.x0.12.x 版本3.3.x(默认版本)、3.2.x、3.1.x0.11.x 版本3.2.x(默认版本,仅限 Spark 捆绑包)、3.1.x0.10.x 版本3.1.x(默认版本)、3.0.x0.7.0 - 0.9.03.0.x 版本0.6.0 及更早版本不支持   从解压的目录中
  使用 Hudi 运行 spark-shell:

  1. # For Spark versions: 3.2 - 3.5
  2. export SPARK_VERSION=3.5 # or 3.4, 3.3, 3.2
  3. spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.15.0 \
  4. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  5. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  6. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  7. --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
复制代码
解释:
  这段配置旨在通过 spark-shell 引入 Apache Hudi 的功能模块,配置序列化器及 Spark SQL 扩展,使 Spark 能够高效地管理和查询 Hudi 数据表。
  启动 Spark Shell,并通过 --packages 选项动态引入 Apache Hudi 的依赖包。
  spark.ql.extensions配置:
  

  • 为 Spark 的 SQL 引擎添加 Hudi 的原生支持。
  • 扩展 Spark SQL 的本领,使其支持 Hudi 特有的操纵,好比 MERGE INTO 和 Hudi 表管理操纵。 
  在 Kryo 序列化上,建议用户设置此配置以减少 Kryo 序列化开销:
  1. --conf 'spark.kryo.registrator=org.apache.spark.HoodieKryoRegistrar'
复制代码
实用于 Spark 3.2 及更高版本,使用 scala 2.12 版本和额外的配置:
  1. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
复制代码
接下来进入spark shell了,默认支持的是scala,或者sparksql或者python
  假如想用java,必要写一个java项目,然后:
  1. spark-submit --class JavaSparkExample --master local my-spark-app.jar
复制代码
创建表

  示例,用scala新建一个表名,和一个路径
  1. // spark-shell
  2. import scala.collection.JavaConversions._
  3. import org.apache.spark.sql.SaveMode._
  4. import org.apache.hudi.DataSourceReadOptions._
  5. import org.apache.hudi.DataSourceWriteOptions._
  6. import org.apache.hudi.common.table.HoodieTableConfig._
  7. import org.apache.hudi.config.HoodieWriteConfig._
  8. import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
  9. import org.apache.hudi.common.model.HoodieRecord
  10. import spark.implicits._
  11. val tableName = "trips_table"
  12. val basePath = "file:///tmp/trips_table"
复制代码
Spark SQL创建表现例:
  1. -- create a Hudi table that is partitioned.
  2. CREATE TABLE hudi_table (
  3.     ts BIGINT,
  4.     uuid STRING,
  5.     rider STRING,
  6.     driver STRING,
  7.     fare DOUBLE,
  8.     city STRING
  9. ) USING HUDI
  10. PARTITIONED BY (city);
复制代码
插入数据是关键!

  示例1:假如已经有data,我们插入数据并读取数据:
  1. //定以basePath
  2. val basePath = "file:///tmp/trips_table"
  3. //插入数据
  4. data.write.format("hudi")
  5.     .options(Map(
  6.         "hoodie.table.name" -> tableName,
  7.         "hoodie.datasource.write.base.path" -> basePath,
  8.         "hoodie.datasource.write.recordkey.field" -> "id",
  9.         "hoodie.datasource.write.partitionpath.field" -> "date",
  10.         "hoodie.datasource.write.precombine.field" -> "timestamp"
  11.     ))
  12.     .mode("overwrite")
  13.     .save()
  14. //读取数据
  15. val hudiDf = spark.read.format("hudi")
  16.     .load(basePath)
  17. hudiDf.show()
复制代码
示例2:(官方文档的例子)
  1. // spark-shell
  2. val columns = Seq("ts","uuid","rider","driver","fare","city")
  3. val data =
  4.   Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  5.     (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  6.     (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  7.     (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
  8.     (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
  9. var inserts = spark.createDataFrame(data).toDF(columns:_*)
  10. inserts.write.format("hudi").
  11.   option("hoodie.datasource.write.partitionpath.field", "city").
  12.   option("hoodie.table.name", tableName).
  13.   mode(Overwrite).
  14.   save(basePath)
复制代码
示例3 :Spark SQL
  1. INSERT INTO hudi_table
  2. VALUES
  3. (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
  4. (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
  5. (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
  6. (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
  7. (1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
  8. (1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
  9. (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
  10. (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
复制代码
读取数据:

  1. // spark-shell
  2. val tripsDF = spark.read.format("hudi").load(basePath)
  3. tripsDF.createOrReplaceTempView("trips_table")
  4. spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
  5. spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()
复制代码
Spark SQL:
  1. SELECT ts, fare, rider, driver, city FROM  hudi_table WHERE fare > 20.0;
复制代码
索引数据

  1. -- Create a table with primary key
  2. CREATE TABLE hudi_indexed_table (
  3.     ts BIGINT,
  4.     uuid STRING,
  5.     rider STRING,
  6.     driver STRING,
  7.     fare DOUBLE,
  8.     city STRING
  9. ) USING HUDI
  10. options(
  11.     primaryKey ='uuid',
  12.     hoodie.datasource.write.payload.class = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
  13. )
  14. PARTITIONED BY (city);
  15. INSERT INTO hudi_indexed_table
  16. VALUES
  17. (1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
  18. (1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
  19. (1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
  20. (1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
  21. (1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
  22. (1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
  23. (1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
  24. (1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
  25. -- Create bloom filter expression index on city column
  26. CREATE INDEX idx_bloom_city ON hudi_indexed_table USING bloom_filters(city) OPTIONS(expr='identity');
  27. -- It would show bloom filter expression index
  28. SHOW INDEXES FROM hudi_indexed_table;
  29. -- Query on city column would prune the data using the idx_bloom_city index
  30. SELECT uuid, rider FROM hudi_indexed_table WHERE city = 'san_francisco';
复制代码
这一段spark sql我们来仔细的分析:
  起首看这个配置:
  

  • OverwriteWithLatestAvroPayload:

    • 配置写入策略,表现对于同一个主键,只保留最新的记录。

  还有别的写入策略:
  
  常见写入策略

  a. OverwriteWithLatestAvroPayload
  

  • 作用: 这是默认的写入策略,基于主键进行更新。假如主键冲突,保留最新的记录。
  • 用例: 适合大多数场景,保证数据按最新的值覆盖。
  b. DefaultHoodieRecordPayload
  

  • 作用: 基于主键的归并策略。假如冲突发生,会比较指定的字段(如时间戳),保留最新的记录。
  • 配置:

    • 必要设置 hoodie.datasource.write.precombine.field,指定用于比较的字段(如时间戳)。

  • 用例: 数据记录带有逻辑时间戳,按时间戳更新最新数据。
  c. EmptyHoodieRecordPayload
  

  • 作用: 忽略所有写入记录,仅打扫现有记录。
  • 用例: 在必要逻辑删除数据时使用。
  d. BulkInsertAvroPayload
  

  • 作用: 实用于 BULK_INSERT 模式,直接插入数据,不进行任何去重或归并。
  • 用例: 初次导入数据或全量导入的场景。
  e. DeleteOperationAvroPayload
  

  • 作用: 标记记录为已删除。
  • 用例: 逻辑删除记录,团结查询可排除这些记录。
  f. 自定义策略
  

  • 作用: Hudi 支持用户自定义 Payload 类,用户可以通过继承 HoodieRecordPayload 接口实现特定的业务逻辑。
  • 用例: 必要复杂的自定义更新逻辑时。

  常用配置项

  
hoodie.datasource.write.operationupsert指定写入操纵范例。值可以是 insert、upsert、bulk_insert、delete。
  
hoodie.datasource.write.precombine.field_hoodie_commit_time在记录冲突时,用于比较保留最新记录的字段(如时间戳)。
  
hoodie.datasource.write.table.typeCOPY_ON_WRITE表范例,支持 COPY_ON_WRITE 和 MERGE_ON_READ。
  
hoodie.datasource.write.partitionpath.field指定用于数据分区的字段。多个字段可用逗号分隔。
  
hoodie.datasource.write.recordkey.field主键字段,用于唯一标识每条记录。多个字段可用逗号分隔。
  
hoodie.datasource.write.keygenerator.classorg.apache.hudi.keygen.SimpleKeyGenerator主键生成策略类,如 SimpleKeyGenerator、ComplexKeyGenerator 等。
  
hoodie.upsert.shuffle.parallelism200Upsert 操纵的并行度。
  
hoodie.insert.shuffle.parallelism200Insert 操纵的并行度。
  
hoodie.bulkinsert.shuffle.parallelism200Bulk Insert 操纵的并行度。
  
hoodie.clean.automatictrue是否启用主动清理逾期文件。
  
hoodie.cleaner.commits.retained10清理时保留的近来 N 次提交记录。
  
hoodie.keep.max.commits30最大保留的提交数量。
  
hoodie.keep.min.commits20最小保留的提交数量。
  
hoodie.datasource.hive_sync.enablefalse是否启用 Hive 同步。
  
hoodie.datasource.hive_sync.partition_extractor_classorg.apache.hudi.hive.MultiPartKeysValueExtractorHive 分区提取器类。
  
  索引相干

  1. CREATE INDEX idx_bloom_city ON hudi_indexed_table USING bloom_filters(city) OPTIONS(expr='identity');
复制代码


  • 创建一个基于 city 列的布隆过滤器索引。
  • 布隆过滤器索引的作用:

    • 快速判断某个值是否大概存在,减少不必要的分区扫描。
    • 在分区裁剪(partition pruning)中非常高效。

  • OPTIONS(expr='identity'):

    • 表现索引使用列值的直接映射(identity 函数),不对列值进行转换。

  hudi默认会把主键设为布隆过滤器索引,假如必要非主键列的查询优化,可以像上面一样自己设定索引。
  

  • 布隆过滤器会根据主键字段(hoodie.datasource.write.recordkey.field 配置项)主动生成。(默认的布隆过滤器索引和文件组是逐一对应的,文件组是一批数据的所有历史版本,一个文件片就是一个历史版本。hudi为什么快,就是因为能通过索引去找对应的文件组进行归并操纵)
  • 默认环境下,主键字段由用户在写入时指定,通常用于唯一标识一条记录。
  • 假如未显式指定主键字段,Hudi 会尝试使用默认值 _row_key。
  
   查询验证索引:
  1. SHOW INDEXES FROM hudi_indexed_table;
  2. SELECT uuid, rider FROM hudi_indexed_table WHERE city = 'san_francisco';
复制代码


  • SHOW INDEXES:显示表中的索引,包罗布隆过滤器索引。
  • 查询 city='san_francisco' 时,Spark 会优先使用布隆过滤器索引来裁剪分区。
  
  更新数据 

  1. // Lets read data from target Hudi table, modify fare column for rider-D and update it.
  2. val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)
  3. updatesDf.write.format("hudi").
  4.   option("hoodie.datasource.write.operation", "upsert").
  5.   option("hoodie.datasource.write.partitionpath.field", "city").
  6.   option("hoodie.table.name", tableName).
  7.   mode(Append).
  8.   save(basePath)
复制代码
SparkSQL
  1. UPDATE hudi_table SET fare = 25.0 WHERE rider = 'rider-D';
复制代码

  归并数据

  1. // spark-shell
  2. val adjustedFareDF = spark.read.format("hudi").
  3.   load(basePath).limit(2).
  4.   withColumn("fare", col("fare") * 10)
  5. adjustedFareDF.write.format("hudi").
  6. option("hoodie.datasource.write.payload.class","com.payloads.CustomMergeIntoConnector").
  7. mode(Append).
  8. save(basePath)
  9. // Notice Fare column has been updated but all other columns remain intact.
  10. spark.read.format("hudi").load(basePath).show()
复制代码
将调解后的票价值添加到原始表中,并保留所有其他字段。 请参阅 此处 以获取 .com.payloads.CustomMergeIntoConnectorcom.payloads.CustomMergeIntoConnector
  逐段解释: 
  1. val adjustedFareDF = spark.read.format("hudi").
  2.   load(basePath).limit(2).
  3.   withColumn("fare", col("fare") * 10)
复制代码


  • spark.read.format("hudi").load(basePath):从 Hudi 表中读取数据,basePath 是 Hudi 表所在的路径。
  • .limit(2):仅限读取前两行数据(用于测试和验证)。
  • .withColumn("fare", col("fare") * 10):对 fare 列的所有值进行更新,将其值乘以 10,这会生成一个新的 DataFrame adjustedFareDF。
  使用自定义的归并策略进行写入: 
  1. adjustedFareDF.write.format("hudi").
  2.   option("hoodie.datasource.write.payload.class", "com.payloads.CustomMergeIntoConnector").
  3.   mode(Append).
  4.   save(basePath)
复制代码


  • .write.format("hudi"):指定命据写入 Hudi 表。
  • .option("hoodie.datasource.write.payload.class", "com.payloads.CustomMergeIntoConnector"):这行代码指定使用一个自定义的归并策略 CustomMergeIntoConnector。自定义的归并策略定义了如何归并新旧记录,这个策略通常涉及到如那边理冲突、更新现有数据或添加新数据。该策略类 com.payloads.CustomMergeIntoConnector 必要在类路径中可用,而且必须实现 Hudi 提供的归并接口。接口地址:Hudi CustomMergeIntoConnector · GitHub
  • .mode(Append):指定写入模式为 Append,表现将数据追加到现有的 Hudi 表中,而不是覆盖已有数据。
  • .save(basePath):将调解后的数据写入到 Hudi 表的路径 basePath 中。
  Spark SQL:(注:和上面scala的需求略微不一样)
  1. -- source table using Hudi for testing merging into target Hudi table
  2. CREATE TABLE fare_adjustment (ts BIGINT, uuid STRING, rider STRING, driver STRING, fare DOUBLE, city STRING)
  3. USING HUDI;
  4. INSERT INTO fare_adjustment VALUES
  5. (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',-2.70 ,'san_francisco'),
  6. (1695530237068,'3f3d9565-7261-40e6-9b39-b8aa784f95e2','rider-K','driver-U',64.20 ,'san_francisco'),
  7. (1695241330902,'ea4c36ff-2069-4148-9927-ef8c1a5abd24','rider-H','driver-R',66.60 ,'sao_paulo'    ),
  8. (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',1.85,'chennai'      );
  9. MERGE INTO hudi_table AS target
  10. USING fare_adjustment AS source
  11. ON target.uuid = source.uuid
  12. WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare
  13. WHEN NOT MATCHED THEN INSERT *
  14. ;
复制代码


  • MERGE INTO hudi_table AS target:指定目标表 hudi_table,将数据归并到此表。
  • USING fare_adjustment AS source:指定源表 fare_adjustment,它提供了要归并的数据。
  • ON target.uuid = source.uuid:定义归并条件,根据 uuid 字段进行匹配。即,只有当目标表和源表中的记录有相同的 uuid 时,才会进行更新或插入。
  • WHEN MATCHED THEN UPDATE SET target.fare = target.fare + source.fare:当目标表和源表的 uuid 匹配时,实验更新操纵,将目标表中的 fare 与源表中的 fare 相加。
  • WHEN NOT MATCHED THEN INSERT *:假如目标表中没有与源表匹配的记录,则将源表中的整行数据插入到目标表中。
  • 匹配(MATCHED):在归并过程中,Hudi 会根据 uuid 字段找到目标表中已经存在的记录。假如在目标表中找到与源表中的 uuid 匹配的记录,就会实验 UPDATE 操纵,将目标表中的 fare 值与源表中的 fare 值相加(target.fare = target.fare + source.fare)。
  • 未匹配(NOT MATCHED):假如在目标表中没有找到与源表中的记录匹配的 uuid,则会实验 INSERT 操纵,将源表中的数据插入到目标表中。
  
  删除数据

  1. // spark-shell
  2. // Lets  delete rider: rider-D
  3. val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")
  4. deletesDF.write.format("hudi").
  5.   option("hoodie.datasource.write.operation", "delete").
  6.   option("hoodie.datasource.write.partitionpath.field", "city").
  7.   option("hoodie.table.name", tableName).
  8.   mode(Append).
  9.   save(basePath)
复制代码
Spark SQL:
  1. DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2';
复制代码

  时间观光查询

  就是查询某个时间点之前的最新数据
  1. spark.read.format("hudi").
  2.   option("as.of.instant", "20210728141108100").
  3.   load(basePath)
  4. spark.read.format("hudi").
  5.   option("as.of.instant", "2021-07-28 14:11:08.200").
  6.   load(basePath)
  7. // It is equal to "as.of.instant = 2021-07-28 00:00:00"
  8. spark.read.format("hudi").
  9.   option("as.of.instant", "2021-07-28").
  10.   load(basePath)
复制代码
Spark SQL:(Requires Spark 3.2+)
  1. -- time travel based on commit time, for eg: `20220307091628793`
  2. SELECT * FROM hudi_table TIMESTAMP AS OF '20220307091628793' WHERE id = 1;
  3. -- time travel based on different timestamp formats
  4. SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-07 09:16:28.100' WHERE id = 1;
  5. SELECT * FROM hudi_table TIMESTAMP AS OF '2022-03-08' WHERE id = 1;
复制代码

  增量查询

  增量查询使你能够获取自上次查询以来发生变更的记录,而不必要重新扫描整个表,从而进步查询效率。
  1. // spark-shell
  2. // 读取 Hudi 表并创建临时视图
  3. spark.read.format("hudi").load(basePath).createOrReplaceTempView("trips_table")
  4. // 这里的commits是一个Array[String]类型
  5. val commits = spark.sql("SELECT DISTINCT(_hoodie_commit_time) AS commitTime FROM  trips_table ORDER BY commitTime").map(k => k.getString(0)).take(50)
  6. val beginTime = commits(commits.length - 2) // commit time we are interested in
  7. // incrementally query data
  8. val tripsIncrementalDF = spark.read.format("hudi").
  9.   option("hoodie.datasource.query.type", "incremental").
  10.   option("hoodie.datasource.read.begin.instanttime", 0).
  11.   load(basePath)
  12. tripsIncrementalDF.createOrReplaceTempView("trips_incremental")
  13. spark.sql("SELECT `_hoodie_commit_time`, fare, rider, driver, uuid, ts FROM  trips_incremental WHERE fare > 20.0").show()
复制代码
接下来逐一解读: 
  1. 获取近来的提交时间
  1. val commits = spark.sql("SELECT DISTINCT(_hoodie_commit_time) AS commitTime FROM trips_table ORDER BY commitTime").map(k => k.getString(0)).take(50)
  2. val beginTime = commits(commits.length - 2) // commit time we are interested in
复制代码


  • 这段代码查询了 _hoodie_commit_time 字段,获取了 Hudi 表的不同提交时间(提交时间是 Hudi 用来跟踪数据变更的时间戳)。
  • commits 获取了前 50 个提交时间,并将它们按时间排序。
  • beginTime 选取了 倒数第二个提交时间,即我们关注的增量查询的起始时间(beginTime 表现你想查询的变更开始时间)。
  2. 进行增量查询 
  1. val tripsIncrementalDF = spark.read.format("hudi").
  2.   option("hoodie.datasource.query.type", "incremental").
  3.   option("hoodie.datasource.read.begin.instanttime", 0).
  4.   load(basePath)
复制代码


  • hoodie.datasource.query.type 设置为 "incremental",表现我们要实验增量查询。
  • hoodie.datasource.read.begin.instanttime 设置为 0,这是起始查询时间,用来定义增量查询的时间范围。通常这会设为之前查询的提交时间 beginTime,用来确保只查询自上次提交以来的数据。

    • 注意:在此代码中,0 是一个占位符,应将其替换为实际的 beginTime。

  
  Spark SQL:
  1. -- syntax
  2. hudi_table_changes(table or path, queryType, beginTime [, endTime]);
  3. -- table or path: table identifier, example: db.tableName, tableName,
  4. --                or path for of your table, example: path/to/hudiTable  
  5. --                in this case table does not need to exist in the metastore,
  6. -- queryType: incremental query mode, example: latest_state, cdc  
  7. --            (for cdc query, first enable cdc for your table by setting cdc.enabled=true),
  8. -- beginTime: instantTime to begin query from, example: earliest, 202305150000,
  9. -- endTime: optional instantTime to end query at, example: 202305160000,
  10. -- incrementally query data by table name
  11. -- start from earliest available commit, end at latest available commit.  
  12. SELECT * FROM hudi_table_changes('db.table', 'latest_state', 'earliest');
  13. -- start from earliest, end at 202305160000.  
  14. SELECT * FROM hudi_table_changes('table', 'latest_state', 'earliest', '202305160000');  
  15. -- start from 202305150000, end at 202305160000.
  16. SELECT * FROM hudi_table_changes('table', 'latest_state', '202305150000', '202305160000');
复制代码
解释:
  1. hudi_table_changes(table or path, queryType, beginTime [, endTime]);
复制代码


  • table or path:可以是 Hudi 表的标识符或存储路径。

    • db.tableName 或 tableName 表现表名。
    • path/to/hudiTable 表现表所在的路径。

  • queryType:增量查询模式,通常可以是以下范例:

    • latest_state:查询从指定时间点到最新提交的所有数据变更。
    • cdc:用于变化数据捕捉(Change Data Capture)查询。要启用 CDC,表必须先设置 cdc.enabled=true。

  • beginTime:查询开始的时间点。可以是:

    • earliest:从最早的可用提交开始。
    • 具体的时间戳(比方:202305150000)表现查询从某个特定的时间点开始。

  • endTime(可选):查询竣事的时间点。可以是:

    • 具体的时间戳(比方:202305160000)表现查询停止到某个特定的时间点。

  
  在 Apache Hudi 中,默认环境下,创建的表的存储范例是 Copy-on-Write (COW)
  怎么换成MOR呢:
  1. // spark-shell
  2. inserts.write.format("hudi").
  3.   ...
  4.   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  5.   ...
复制代码
Spark SQL:
  1. CREATE TABLE hudi_table (
  2.     uuid STRING,
  3.     rider STRING,
  4.     driver STRING,
  5.     fare DOUBLE,
  6.     city STRING
  7. ) USING HUDI TBLPROPERTIES (type = 'mor')
  8. PARTITIONED BY (city);
复制代码
Hudi 还答应用户指定记录键,该键将用于唯一标识 Hudi 表中的记录。这很有用,而且 对于支持索引和集群等功能至关紧张,这些功能以一致的方式分别加速 UpSert 和查询。其他一些 此处详细先容了 Key 的上风。为此,Hudi 支持 广泛的内置密钥生成器,可以轻松生成记录 键。在没有用户配置的密钥的环境下,Hudi 将主动生成高度可压缩的记录密钥。
  1. // spark-shell
  2. inserts.write.format("hudi").
  3. ...
  4. option("hoodie.datasource.write.recordkey.field", "uuid").
  5. ...
复制代码
Spark SQL:
  1. CREATE TABLE hudi_table (
  2.     ts BIGINT,
  3.     uuid STRING,
  4.     rider STRING,
  5.     driver STRING,
  6.     fare DOUBLE,
  7.     city STRING
  8. ) USING HUDI TBLPROPERTIES (primaryKey = 'uuid')
  9. PARTITIONED BY (city);
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

温锦文欧普厨电及净水器总代理

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表