图书《数据资产管理核心技能与应用》核心章节节选-3.1.2. 从Spark 执行计划 ...

打印 上一主题 下一主题

主题 900|帖子 900|积分 2700

本文节选自清华大学出版社出版的图书《数据资产管理核心技能与应用》,作者为张永清等著。
从Spark 执行计划中获取数据血缘
因为数据处理任务会涉及到数据的转换和处理,所以从数据任务中解析血缘也是获取数据血缘的渠道之一,Spark 是大数据中数据处理最常用的一个技能组件,既可以做及时任务的处理,也可以做离线任务的处理。Spark在执行每一条SQL语句的时候,都会生成一个执行计划,这一点和很多数据库的做法很类似,都是SQL语句在执行时,先生成执行计划。如下图3-1-10所示,在Spark的官方文档链接https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html#content中,有明确提到,可以根据EXPLAIN关键字来获取执行计划,这和很多数据库查看执行计划的方式很类似。

图3-1-10
Spark底层生成执行计划以及处理执行计划的过程如下图3-1-11所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技能与应用》,作者为张永清等著。
 

图3-1-11

从图中可以看到,
1、 执行SQL语句大概Data Frame时,会先生成一个Unresolved Logical Plan,就是没有做过任那边理和分析的逻辑执行计划,仅仅会从SQL语法的角度做一些基础性的校验。
2、 之后通过获取Catalog的数据,对需要执行的SQL语句做表名、列名的进一步分析校验,从而生成一个可以直接运行的逻辑执行计划。
3、 但是Spark底层会有个优化器来生成一个最优的执行操作方式,从而生成一个优化后的最佳逻辑执行计划。
4、 将最终确定下来的逻辑执行计划转换为物理执行计划,转换为最终的代码举行执行。
Spark的执行计划其实就是数据处理的过程计划,会将SQL语句大概DataFrame 做解析,并且结合Catalog一起,生成最终数据转换和处理的代码。所以可以从Spark的执行计划中,获取到数据的转换逻辑,从而解析到数据的血缘。但是spark的执行计划都是在spark底层内部自动处理的,如何获取到每次Spark任务的执行计划的信息呢?其着实Spark底层有一套Listener的架构设计,可以通过Spark Listener 来获取到spark 底层很多执行的数据信息。
在spark的源码中,以Scala的形式提供了一个org.apache.spark.sql.util.QueryExecutionListener  trait (类似Java 语言的接口),来作为Spark SQL等任务执行的监听器。在org.apache.spark.sql.util.QueryExecutionListener  中提供了如下表3-1-2所示的两个方法。
表3-1-2

方法名
描述
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
执行乐成时,调用的方法,其中包括了执行计划参数,这里的执行计划可以是逻辑计划大概物理计划
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
执行失败时,调用的方法,其中同样也包括了执行计划参数,这里的执行计划可以是逻辑计划大概物理计划
因此可以借用QueryExecutionListener  来主动让Spark在执行任务时,将执行计划信息推送到自己的系统大概数据库中,然后再做进一步的解析,如下图3-1-12所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技能与应用》,作者为张永清等著。

图3-1-12
  1. import org.apache.spark.internal.Logging
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.execution.QueryExecution
  4. import org.apache.spark.sql.util.QueryExecutionListener
  5. case class PlanExecutionListener(sparkSession: SparkSession) extends QueryExecutionListener with Logging{
  6.   override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
  7.     // 执行成功时,调用解析执行计划的方法
  8.     planParser(qe)
  9.   }
  10.   override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {
  11.   }
  12.   private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
  13.     try
  14.       body
  15.     catch {
  16.       case NonFatal(e) =>
  17.         val ctx = qe.sparkSession.sparkContext
  18.         logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
  19.     }
  20.   }
  21.   def planParser(qe: QueryExecution): Unit = {
  22.     logInfo("----------- start to get spark  analyzed LogicPlan--------")
  23.       //解析执行计划,并且将执行计划的数据发送到自有的系统或者数据库中
  24.       ......
  25.   }
  26. }
复制代码
上面的代码中,实现了QueryExecutionListener 这个trait中的onSuccess和onFailure这两个方法,只有在onSuccess时,才需要获取执行计划的数据,因为只有onSuccess时的血缘才是有用的。
实现好了自界说的QueryExecutionListener后,可以通过sparkSession.listenerManager.register来将自己实现的PlanExecutionListener 注册到Spark会话中,listenerManager是Spark中Listener的管理器。
在获取到执行计划时,需要再结合Catalog一起,来进一步解析血缘的数据,如下图3-1-13所示

图3-1-13

Spark 中常见的执行计划实现类如下表3-1-3所示,获取数据血缘时,就是需要从如下的这些执行计划中解析血缘关系。本文节选自清华大学出版社出版的图书《数据资产管理核心技能与应用》,作者为张永清等著。
表3-1-3

执行计划实现类
描述
org.apache.spark.sql.execution.datasources.LogicalRelation
一样平常用于解析字段级的关联关系
org.apache.spark.sql.catalyst.catalog.HiveTableRelation
Hive 表关联关系的执行计划,一样平常用于SQL执行时,存在关联查询的情况会出现该执行计划。
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
一样平常是在执行insert into 的SQL 语句时才会产生的执行计划,比方insert into xxx_table(colum1,column2) values("4","zhangsan")
org.apache.spark.sql.execution.datasources
.InsertIntoHadoopFsRelationCommand
一样平常用于执行类似    sparkSession
      .read
      .table("xx_source_table ")
      .limit(10)
      .write
      .mode(SaveMode.Append)
      .insertInto("xx_target_table ")产生的执行计划。
org.apache.spark.sql.hive.execution.
CreateHiveTableAsSelectCommand
一样平常是在执行create table xxx_table as的SQL 语句时才会产生的执行计划,比方create table xx_target_table as select * from xx_source_table
org.apache.spark.sql.execution.command
.CreateDataSourceTableAsSelectCommand
一样平常用于执行类似sparkSession
      .read
      .table("xx_source_table")
      .limit(10)
      .write
      .mode(SaveMode.Append)
      .saveAsTable("xx_target_table")产生的执行计划。
org.apache.spark.sql.execution.datasources
.InsertIntoDataSourceCommand
一样平常用于将SQL查询结果写入到一张表中,比如insert into xxx_target_table select * from xxx_source_table
 
如下是以org.apache.spark.sql.execution.datasources
.InsertIntoHadoopFsRelationCommand 为例的spark 执行计划的数据,如下数据已经将原始的执行计划转换为了json格式的数据,方便做展示。
.................更多内容,请参考清华大学出版社出版的图书《数据资产管理核心技能与应用》,作者为张永清等著
 

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

欢乐狗

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表