29、Spark写数据到Hudi时,同步hive表的一些坑

打印 上一主题 下一主题

主题 852|帖子 852|积分 2556

1.hudi的同步hive表没有comment

原以为hudi同步的hive表是根据数据写入的dataframe的schema创建的。就和spark write hive时类似,查察源码后发现不是。
1.1 hudi同步hive的模式

HMS , JDBC , HIVESQL。我这儿常用的是HMS和JDBC

各个同步模式对应的实行器:

1.2 schema天生

我们可以看到schema天生的代码块。先从提交的commit中获取元数据信息,没有的话则从数据文件中获取schema。两种方式获取到的schema都是没有comment信息的。
org.apache.hudi.common.table.TableSchemaResolver#getTableParquetSchema



  1.   /**
  2.    * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
  3.    * commit. We will assume that the schema has not changed within a single atomic write.
  4.    *
  5.    * @return Parquet schema for this table
  6.    * @throws Exception
  7.    */
  8.   private MessageType getTableParquetSchemaFromDataFile() throws Exception {
  9.     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
  10.     try {
  11.       switch (metaClient.getTableType()) {
  12.         case COPY_ON_WRITE:
  13.           // If this is COW, get the last commit and read the schema from a file written in the
  14.           // last commit
  15.           HoodieInstant lastCommit =
  16.               activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
  17.           HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
  18.               .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
  19.           String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
  20.               .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
  21.                   + lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
  22.                   + commitMetadata));
  23.           return readSchemaFromBaseFile(new Path(filePath));
  24.         case MERGE_ON_READ:
  25.           // If this is MOR, depending on whether the latest commit is a delta commit or
  26.           // compaction commit
  27.           // Get a datafile written and get the schema from that file
  28.           Option<HoodieInstant> lastCompactionCommit =
  29.               metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
  30.           LOG.info("Found the last compaction commit as " + lastCompactionCommit);
  31.           Option<HoodieInstant> lastDeltaCommit;
  32.           if (lastCompactionCommit.isPresent()) {
  33.             lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
  34.                 .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();
  35.           } else {
  36.             lastDeltaCommit =
  37.                 metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
  38.           }
  39.           LOG.info("Found the last delta commit " + lastDeltaCommit);
  40.           if (lastDeltaCommit.isPresent()) {
  41.             HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
  42.             // read from the log file wrote
  43.             commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
  44.                 HoodieCommitMetadata.class);
  45.             Pair<String, HoodieFileFormat> filePathWithFormat =
  46.                 commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
  47.                     .filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
  48.                     .map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
  49.                       // No Log files in Delta-Commit. Check if there are any parquet files
  50.                       return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
  51.                           .filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
  52.                           .findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
  53.                               new IllegalArgumentException("Could not find any data file written for commit "
  54.                               + lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
  55.                               + ", CommitMetadata :" + commitMetadata));
  56.                     });
  57.             switch (filePathWithFormat.getRight()) {
  58.               case HOODIE_LOG:
  59.                 return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
  60.               case PARQUET:
  61.                 return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
  62.               default:
  63.                 throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
  64.                     + " for file " + filePathWithFormat.getLeft());
  65.             }
  66.           } else {
  67.             return readSchemaFromLastCompaction(lastCompactionCommit);
  68.           }
  69.         default:
  70.           LOG.error("Unknown table type " + metaClient.getTableType());
  71.           throw new InvalidTableException(metaClient.getBasePath());
  72.       }
  73.     } catch (IOException e) {
  74.       throw new HoodieException("Failed to read data schema", e);
  75.     }
  76.   }
复制代码
1.3建表DDL

获取到schema后,我们再看建表行为。
org.apache.hudi.hive.ddl.DDLExecutor#createTable 定义了这个接口建表方法。有两个实现类,一个是
org.apache.hudi.hive.ddl.HMSDDLExecutor。另一个是 org.apache.hudi.hive.ddl.QueryBasedDDLExecutor

首先,看org.apache.hudi.hive.ddl.HMSDDLExecutor#createTable方法:
ddl操作中利用的字段信息在HiveSchemaUtil.convertMapSchemaToHiveFieldSchema天生,可以直接在这个方法里看到字段的comment信息是直接写死为空字符串的。


再看,org.apache.hudi.hive.ddl.QueryBasedDDLExecutor#createTable方法。
方法里是通过HiveSchemaUtil.generateCreateDDL方法直接天生的ddl建表语句的。这个方法里generateSchemaString方法来天生字段信息的。在这个方法里,也是没有涉及comment信息的。



1.4结论

同步hive表是在 数据写入hudi目次后,根据目次里的schema来创建的hive表,所以创建的hive表没有带着dataframe的comment信息。需要手动实行修改字段comment。
2.追加comment

2.1.利用spark.sql的方式修改comment

用spark.sql()的方式实行 修改comment的sql语句,会调用hudi里的AlterHoodieTableChangeColumnCommand类。这个里面会比较schema,革新sparksession里的catalog信息,会让使命hang住。(为什么hang住没去排查)大概操作就是写一个利用新的schema的空数据集到hudi来实现schema更新。
org.apache.spark.sql.hudi.command.AlterHoodieTableChangeColumnCommand。


2.2利用hive-sql的方式修改comment

用hive-jdbc的方式实行修改sql语句。这个方式不会更新hive表里的 TBLPROPERTIES 的 'spark.sql.sources.schema.part.0’信息。
利用dataframe的schame.tojson ,去修改 ‘spark.sql.sources.schema.part.0’ 信息
  1.   /**
  2.    * 将 dataframe 中的comment加到 hudi的hive表中
  3.    *
  4.    * @param df      dataframe
  5.    * @param dbTable hive表
  6.    * @param spark   spark session
  7.    */
  8.   def addCommentForSyncHive(df: DataFrame, dbTable: String, spark: SparkSession, writeOptions: mutable.Map[String, String]): Unit = {
  9.     val comment: Map[String, String] = df.schema.map(sf => (sf.name, sf.getComment().getOrElse(""))).toMap
  10.     info(s"数据集的字段名->备注为:\n${comment.mkString("\n")}")
  11.     val jdbcUrlOption = writeOptions.get(DataSourceWriteOptions.HIVE_URL.key())
  12.     val jdbcUserOption = writeOptions.get(DataSourceWriteOptions.HIVE_USER.key())
  13.     val jdbcPassOption = writeOptions.get(DataSourceWriteOptions.HIVE_PASS.key())
  14.     assert(jdbcUrlOption.isDefined, s"${DataSourceWriteOptions.HIVE_URL.key()} 必须被指定")
  15.     val connection = DbUtil.createHiveConnection(
  16.       jdbcUrlOption.get, jdbcUserOption.getOrElse(""), jdbcPassOption.getOrElse("")
  17.     )
  18.     val stmt = connection.createStatement()
  19.     //需要手动更新hive表中的spark.sql.sources.schema.part.0信息
  20.     stmt.execute(s"ALTER TABLE $dbTable SET TBLPROPERTIES ('spark.sql.sources.schema.part.0' = '${df.schema.json}')")
  21.     // 获取表字段和类型
  22.     val tableSchema = spark.sql(s"DESCRIBE $dbTable")
  23.       .select("col_name", "data_type")
  24.       .collect()
  25.       .map(row => (row.getString(0), row.getString(1)))
  26.     tableSchema.foreach { case (column, dataType) =>
  27.       if (comment.contains(column) && !Seq("ym", "ymd").contains(column)) {
  28.         val newComment = comment.getOrElse(column, "")
  29.         val sql = s"""ALTER TABLE $dbTable CHANGE COLUMN $column $column $dataType COMMENT '$newComment'"""
  30.         info(s"添加备注执行sql:$sql")
  31.         try {
  32.           stmt.execute(sql)
  33.         } catch {
  34.           case e:Throwable =>
  35.             warn("添加备注sql执行失败")
  36.         }
  37.       }
  38.     }
  39.     stmt.close()
  40.     connection.close()
  41.   }
复制代码
修改’spark.sql.sources.schema.part.0’时,因为schema带有备注,会很长,导致超过hive表元数据mysql表字段的长度限制。去mysql中修改这个长度限制(table_params表PARAM_VALUE字段)。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小秦哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表