CDP集成Hudi实战-spark shell

打印 上一主题 下一主题

主题 506|帖子 506|积分 1518

[〇]关于本文

本文主要解释spark shell操纵Hudi表的案例
软件版本
Hudi1.0.0
Hadoop Version3.1.1.7.3.1.0-197
Hive Version3.1.3000.7.3.1.0-197
Spark Version3.4.1.7.3.1.0-197
CDP7.3.1
[一]使用Spark-shell

1-设置hudi Jar包

  1. [root@cdp73-1 ~]# for i in $(seq 1 6); do scp /opt/software/hudi-1.0.0/packaging/hudi-spark-bundle/target/hudi-spark3.4-bundle_2.12-1.0.0.jar   cdp73-$i:/opt/cloudera/parcels/CDH/lib/spark3/jars/; done
  2. hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 418.2MB/s   00:00
  3. hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 304.8MB/s   00:00
  4. hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 365.0MB/s   00:00
  5. hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 406.1MB/s   00:00
  6. hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 472.7MB/s   00:00
  7. hudi-spark3.4-bundle_2.12-1.0.0.jar                                                                                                                                                                                          100%  105MB 447.1MB/s   00:00
  8. [root@cdp73-1 ~]#
复制代码
2-进入Spark-shell

  1. spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0 \
  2. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  3. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  4. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  5. --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
复制代码

3-初始化项目

  1. // spark-shell
  2. import scala.collection.JavaConversions._
  3. import org.apache.spark.sql.SaveMode._
  4. import org.apache.hudi.DataSourceReadOptions._
  5. import org.apache.hudi.DataSourceWriteOptions._
  6. import org.apache.hudi.common.table.HoodieTableConfig._
  7. import org.apache.hudi.config.HoodieWriteConfig._
  8. import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
  9. import org.apache.hudi.common.model.HoodieRecord
  10. import spark.implicits._
  11. val tableName = "trips_table"
  12. val basePath = "hdfs:///tmp/trips_table"
复制代码

4-创建表

初次提交将主动初始化表,假如指定的根本路径中尚不存在该表。
5-导入数据

  1. // spark-shell
  2. val columns = Seq("ts","uuid","rider","driver","fare","city")
  3. val data =
  4.   Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  5.     (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  6.     (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  7.     (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
  8.     (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
  9. var inserts = spark.createDataFrame(data).toDF(columns:_*)
  10. inserts.write.format("hudi").
  11.   option("hoodie.datasource.write.partitionpath.field", "city").
  12.   option("hoodie.embed.timeline.server", "false").
  13.   option("hoodie.table.name", tableName).
  14.   mode(Overwrite).
  15.   save(basePath)
复制代码

   【映射到Hudi写操纵】​​​​​​​Hudi提供了多种写操纵——包括批量和增量写操纵——以将数据写入Hudi表,这些操纵具有不同的语义和性能。当未设置记载键(请参见下面的键)时,将选择bulk_insert作为写操纵,这与Spark的Parquet数据源的非默认活动相匹配。
  6-查询数据

  1. // spark-shell
  2. val tripsDF = spark.read.format("hudi").load(basePath)
  3. tripsDF.createOrReplaceTempView("trips_table")
  4. spark.sql("SELECT uuid, fare, ts, rider, driver, city FROM  trips_table WHERE fare > 20.0").show()
  5. spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM  trips_table").show()
复制代码

7-更新数据

  1. // Lets read data from target Hudi table, modify fare column for rider-D and update it.
  2. val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-D").withColumn("fare", col("fare") * 10)
  3. updatesDf.write.format("hudi").
  4.   option("hoodie.datasource.write.operation", "upsert").
  5.   option("hoodie.embed.timeline.server", "false").
  6.   option("hoodie.datasource.write.partitionpath.field", "city").
  7.   option("hoodie.table.name", tableName).
  8.   mode(Append).
  9.   save(basePath)
复制代码

8-合并数据

  1. // spark-shell
  2. val adjustedFareDF = spark.read.format("hudi").
  3.   load(basePath).limit(2).
  4.   withColumn("fare", col("fare") * 10)
  5. adjustedFareDF.write.format("hudi").
  6. option("hoodie.embed.timeline.server", "false").
  7. mode(Append).
  8. save(basePath)
  9. // Notice Fare column has been updated but all other columns remain intact.
  10. spark.read.format("hudi").load(basePath).show()
复制代码


9-删除数据

  1. / spark-shell
  2. // Lets  delete rider: rider-D
  3. val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === "rider-F")
  4. deletesDF.write.format("hudi").
  5.   option("hoodie.datasource.write.operation", "delete").
  6.   option("hoodie.datasource.write.partitionpath.field", "city").
  7.   option("hoodie.table.name", tableName).
  8.   option("hoodie.embed.timeline.server", "false").
  9.   mode(Append).
  10.   save(basePath)
复制代码
​​​​​​​

10-数据索引
  1. import scala.collection.JavaConversions._
  2. import org.apache.spark.sql.SaveMode._
  3. import org.apache.hudi.DataSourceReadOptions._
  4. import org.apache.hudi.DataSourceWriteOptions._
  5. import org.apache.hudi.common.table.HoodieTableConfig._
  6. import org.apache.hudi.config.HoodieWriteConfig._
  7. import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
  8. import org.apache.hudi.common.model.HoodieRecord
  9. import spark.implicits._
  10. val tableName = "trips_table_index"
  11. val basePath = "hdfs:///tmp/hudi_indexed_table"
  12. val columns = Seq("ts","uuid","rider","driver","fare","city")
  13. val data =
  14.   Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
  15.     (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
  16.     (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
  17.     (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
  18.     (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
  19. var inserts = spark.createDataFrame(data).toDF(columns:_*)
  20. inserts.write.format("hudi").
  21.   option("hoodie.datasource.write.partitionpath.field", "city").
  22.   option("hoodie.table.name", tableName).
  23.   option("hoodie.write.record.merge.mode", "COMMIT_TIME_ORDERING").
  24.   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  25.   option("hoodie.embed.timeline.server", "false").
  26.   mode(Overwrite).
  27.   save(basePath)
  28. // Create record index and secondary index for the table
  29. spark.sql(s"CREATE TABLE hudi_indexed_table USING hudi LOCATION '$basePath'")
  30. // Create bloom filter expression index on driver column
  31. spark.sql(s"CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING bloom_filters(driver) OPTIONS(expr='identity')");
  32. // It would show bloom filter expression index
  33. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  34. // Query on driver column would prune the data using the idx_bloom_driver index
  35. spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S'");
  36. // Create column stat expression index on ts column
  37. spark.sql(s"CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts) OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd')");
  38. // Shows both expression indexes
  39. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  40. // Query on ts column would prune the data using the idx_column_ts index
  41. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-09-24'");
  42. // To create secondary index, first create the record index
  43. spark.sql(s"SET hoodie.metadata.record.index.enable=true");
  44. spark.sql(s"CREATE INDEX record_index ON hudi_indexed_table (uuid)");
  45. // Create secondary index on rider column
  46. spark.sql(s"CREATE INDEX idx_rider ON hudi_indexed_table (rider)");
  47. // Expression index and secondary index should show up
  48. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  49. // Query on rider column would leverage the secondary index idx_rider
  50. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E'");
  51. // Update a record and query the table based on indexed columns
  52. spark.sql(s"UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts = '1697516137' WHERE rider = 'rider-A'");
  53. // Data skipping would be performed using column stat expression index
  54. spark.sql(s"SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') = '2023-10-17'");
  55. // Data skipping would be performed using bloom filter expression index
  56. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N'");
  57. // Data skipping would be performed using secondary index
  58. spark.sql(s"SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B'");
  59. // Drop all the indexes
  60. spark.sql(s"DROP INDEX secondary_index_idx_rider on hudi_indexed_table");
  61. spark.sql(s"DROP INDEX record_index on hudi_indexed_table");
  62. spark.sql(s"DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table");
  63. spark.sql(s"DROP INDEX expr_index_idx_column_ts on hudi_indexed_table");
  64. // No indexes should show up for the table
  65. spark.sql(s"SHOW INDEXES FROM hudi_indexed_table");
  66. spark.sql(s"SET hoodie.metadata.record.index.enable=false");
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张裕

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表