Spark 的结构化 APIs——RDD,DataFrame, Dataset, SparkSQL 使用和原理总 ...

打印 上一主题 下一主题

主题 572|帖子 572|积分 1720

前言

在本文中,我们将探索 Spark 的结构化 APIs(DataFrames and Datasets)。我们还将看下 Spark SQL 引擎是怎样支撑高级的结构化 APIs 的。
当Spark SQL在早期的Spark 1.x 中初次引入时, 随后是DataFrames 继承了Spark 1.3中 SchemaRDDs ,此时我们第一次看到了Spark中的结构化 API。Spark SQL引入了高级表达操作函数,模拟类似SQL的语法,而Dataframe为后续版本中更多的结构奠基了底子,为Spark计算查询中的高性能操作铺平了门路。
但是,在我们讨论新的结构化API之前,让我们先来看看简单的RDD编程API模型,让我们简要了解一下Spark中没有结构是什么感觉。

RDD的底层是什么?

RDD是Spark中最根本的抽象。有三个与RDD相关的紧张特征:


  • 依赖关系
  • 分区(带有一些位置信息)
  • 计算函数artition => Iterator[T]
首先,这三个都是简单的RDD编程API模型的构成部分,全部高级功能都是在这个模型上构建的。首先,必要一个依赖项列表,该列表指示Spark怎样使用其输入构造RDD。当必要重现效果时,Spark可以根据这些依赖关系重新创建RDD,并在其上复制操作, 这个特性使RDD具有弹性。

其次,分区为Spark提供了拆分工作的本领,以便跨 executors 并行化分区上的计算。在某些环境下(例如,从hdfs读取数据),Spark 通过位置信息将任务发送至距离数据进的 executor 上。这样一来,通过网络传输的数据就少了。

最后,RDD有一个计算函数,它为将存储在RDD中的数据生成一个Iterator[T]。

简单又优雅!然而,这种原始模式存在一些题目。首先,计算函数(或计算)对Spark来说是不透明的。也就是说,Spark不知道你在计算函数中做什么。无论实行的是连接、筛选、选择还是聚合,Spark都只将其视为lambda表达式。另一个题目是Iterator[T]数据类型对于Python rdd来说也是不透明的;Spark只知道它是Python中的泛型对象。

这种不透明性显然拦阻了Spark将计算重新安排为高效查询操持的本领。那么办理方案是什么呢?

结构化 Spark

Spark 2.x 介绍告终构化 Spark 的几个关键方案。一种是在数据分析中发现的通用模式来表达计算, 这些模式表现为高级操作,如过滤、选择、计数、聚合、平均和分组,这为开辟人员提供了简单清楚的使用方式。

Spark 提供了许多通用的算子,每个算子都是有一个固定的计算逻辑,在构建 Spark 步伐的时候,我们可以使用这些通用的算子,这样 Spark 在构建查询操持的时候就知道该怎样优化。

终极这些结构化温和序的 schema 可以让你以表格的形式构建数据,比如 SQL 表或者 sheet,只要按照支持的结构化数据类型(后文会说到)。

那么结构化的利益是什么呢?

主要长处和利益

结构化带来了许多利益,包罗更好的性能和跨Spark组件的空间效率。在讨论DataFrame和Dataset api的使用时,我们将进一步探究这些长处,但现在我们将集中讨论其他长处: 表达性简单性可组合性同等性

让我们首先用一个简单的代码片断演示表达性和可组合性。在下面的示例中,我们盼望聚合每个名字的全部年龄,按名字分组,然后取年龄的平均值——这是数据分析和发现中的常见模式。假如我们要为此使用低级RDD API,代码将如下所示:
  1. # In Python
  2. # Create an RDD of tuples (name, age)
  3. dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
  4.                           ("TD", 35), ("Brooke", 25)])
  5. # Use map and reduceByKey transformations with their lambda
  6. # expressions to aggregate and then compute average
  7. agesRDD = (dataRDD
  8.            .map(lambda x: (x[0], (x[1], 1)))
  9.            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  10.            .map(lambda x: (x[0], x[1][0]/x[1][1])))
复制代码
这段代码告诉Spark怎样使用一串lambda函数聚合键并计算平均值,没有人会质疑这段代码的艰涩难懂和难以阅读。换句话说,代码指示Spark怎样计算查询,它对Spark来说是完全不透明的,因为它没有转达意图。此外,Scala中等效的RDD代码看起来与这里所示的Python代码非常差异。

相比之下,假如我们用高级DSL算子和DataFrame API来表达雷同的查询,从而指示Spark做什么更加明确:
  1. # In Python
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.functions import avg
  4. # Create a DataFrame using SparkSession
  5. spark = (SparkSession
  6.   .builder
  7.   .appName("AuthorsAges")
  8.   .getOrCreate())
  9. # Create a DataFrame
  10. data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),
  11.   ("TD", 35), ("Brooke", 25)], ["name", "age"])
  12. # Group the same names together, aggregate their ages, and compute an average
  13. avg_df = data_df.groupBy("name").agg(avg("age"))
  14. # Show the results of the final execution
  15. avg_df.show()
  16. +------+--------+
  17. |  name|avg(age)|
  18. +------+--------+
  19. |Brooke|    22.5|
  20. | Jules|    30.0|
  21. |    TD|    35.0|
  22. | Denny|    31.0|
  23. +------+--------+
复制代码
这个版本的代码比之前的版本更具表现力,也更简单,因为我们使用了高级DSL算子和api来告诉Spark该做什么。现实上,当我们使用这些算子来构建 Spark 步伐的时候,Spark 可以明确我们的计算意图,然后优化查询操持。

有些人以为,只使用高级的、表达性的DSL算子,限制了开辟人员定制查询本领。实在没必要这样,现有的算子根本上可以满足已有的查询方式,假如觉得不满足也可以在算子和低级 RDD API 间切换查询方式。

除了易于阅读外,Spark的高级api的结构化还在其组件和语言之间引入了同等性。例如,这里显示的Scala代码与前面的Python代码做同样的事变- API看起来险些雷同:
  1. // In Scala
  2. import org.apache.spark.sql.functions.avg
  3. import org.apache.spark.sql.SparkSession
  4. // Create a DataFrame using SparkSession
  5. val spark = SparkSession
  6. .builder
  7. .appName("AuthorsAges")
  8. .getOrCreate()
  9. // Create a DataFrame of names and ages
  10. val dataDF = spark.createDataFrame(Seq(("Brooke", 20), ("Brooke", 25),
  11.                                        ("Denny", 31), ("Jules", 30), ("TD", 35))).toDF("name", "age")
  12. // Group the same names together, aggregate their ages, and compute an average
  13. val avgDF = dataDF.groupBy("name").agg(avg("age"))
  14. // Show the results of the final execution
  15. avgDF.show()
  16. +------+--------+
  17. |  name|avg(age)|
  18. +------+--------+
  19. |Brooke|    22.5|
  20. | Jules|    30.0|
  21. |    TD|    35.0|
  22. | Denny|    31.0|
  23. +------+--------+
复制代码
因为Spark SQL引擎是构建高级结构化api的底子,才使得这些简单性和表达性都是大概的。正是由于这个支持全部Spark组件的引擎,我们才得到了同一的api。无论是在结构化流还是MLlib中表达对DataFrame的查询,我们总是将DataFrame作为结构化数据进行转换和操作。我们将在本文背面更深入地了解Spark SQL引擎,但现在让我们探索那些用于常见操作的api和dsl,以及怎样使用它们进行数据分析。

DataFrame API

Spark dataframe在结构、格式和一些特定操作上受到了pandas dataframe的启发,它就像是带有定名列和schema的分布式内存表,此中每个列都有特定的数据类型:整数、字符串、数组、映射、实数、日期、时间戳等。在人的眼中,Spark DataFrame就像一张表。如下表所示:

当数据被可视化为结构化表时,它不但易于明确,而且当涉及到你大概盼望对行和列实行的常见操作时,也易于使用。Dataframe是不可变的,Spark保持全部转换的血缘关系。你可以添加或更改列的名称和数据类型,在生存从前版本的同时创建新的Dataframe, DataFrame中的定名列及其关联的Spark数据类型可以在 schema 中声明。

在生成 schema 之前我们必要检查下这些结构化数据类型在 Spark 中是否可用,然后我们接下来看怎样使用 schema 生成一个 DataFrame,去验证上表中的数据。

Spark的根本数据类型

与其支持的编程语言相匹配,Spark支持根本的内部数据类型。这些数据类型可以在Spark应用步伐中声明,也可以在schema中界说。例如,在Scala中,我们可以界说或声明特定列名的类型为String、Byte、Long或Map等。这里,我们界说了与Spark数据类型相关的变量名:
  1. $SPARK_HOME/bin/spark-shell
  2. scala> import org.apache.spark.sql.types._
  3. import org.apache.spark.sql.types._
  4. scala> val nameTypes = StringType
  5. nameTypes: org.apache.spark.sql.types.StringType.type = StringType
  6. scala> val firstName = nameTypes
  7. firstName: org.apache.spark.sql.types.StringType.type = StringType
  8. scala> val lastName = nameTypes
  9. lastName: org.apache.spark.sql.types.StringType.type = StringType
复制代码
Spark支持的根本Scala数据类型如下表所示。它们都是DataTypes类的子类型,除了DecimalType:

Spark支持类似的Python根本数据类型,如下表所示:


Spark的结构化和复杂数据类型

对于复杂的数据分析,将不但仅处理简单或根本的数据类型,数据将是复杂的,通常是结构化的或嵌套的,必要Spark来处理这些复杂的数据类型。它们有多种形式:映射、数组、结构、日期、时间戳、字段等。Spark支持的Scala结构化数据类型如下表所示:

Python中Spark支持的等价结构化数据类型如下表所示:

虽然这些表展示了支持的大量类型,但在为数据界说schema时,更紧张的是了解这些类型是怎样组合在一起的。

Schemas 和创建 DataFrames

Spark中的schema 为DataFrame界说了列名和相关的数据类型。通常,当从外部数据源读取结构化数据时,schema 就会发挥作用(后续文章我会详细说明)。与采取读时schema 的方法相比,预先界说schema 有三个利益:


  • Spark 不在必要推断数据类型
  • 可以防止Spark仅仅为了读取文件的大部分内容来确定Schema 而创建单独的作业,这对于大型数据文件来说既昂贵又耗时。
  • 假如数据与 schema 不匹配,可以及早发现错误。
因此,发起从数据源读取大文件时始终预先界说Schema 。接下来举个简单例子,让我们为前边说的数据表中的数据界说一个Schema ,并使用该schema 创建一个DataFrame。
界说 Schema 的两种方法
Spark允许以两种方式界说schema。一种方法是以编程的方式界说它,另一种方法是使用数据界说语言(Data Definition Language, DDL)字符串,后者更简单,更轻易阅读。

要以编程方式为具有三个定名列的DataFrame界说schema:author, title 和 pages ,可以使用Spark DataFrame API。例如:
  1. // In Scala
  2. import org.apache.spark.sql.types._
  3. val schema = StructType(Array(StructField("author", StringType, false),
  4.                               StructField("title", StringType, false),
  5.                               StructField("pages", IntegerType, false)))
复制代码
  1. # In Python
  2. from pyspark.sql.types import *
  3. schema = StructType([StructField("author", StringType(), False),
  4.                      StructField("title", StringType(), False),
  5.                      StructField("pages", IntegerType(), False)])
复制代码
使用DDL界说雷同的schema要简单得多:
  1. // In Scala
  2. val schema = "author STRING, title STRING, pages INT"
复制代码
  1. # In Python
  2. schema = "author STRING, title STRING, pages INT"
复制代码
可以选择任何喜欢的方式来界说schema。对于许多例子,两种都会用:
  1. # In Python
  2. from pyspark.sql import SparkSession
  3. # Define schema for our data using DDL
  4. schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,
  5.   `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
  6. # Create our static data
  7. data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter",
  8. "LinkedIn"]],
  9.        [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
  10. "LinkedIn"]],
  11.        [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
  12. "twitter", "FB", "LinkedIn"]],
  13.        [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
  14. ["twitter", "FB"]],
  15.        [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
  16. "twitter", "FB", "LinkedIn"]],
  17.        [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
  18. ["twitter", "LinkedIn"]]
  19.       ]
  20. # Main program
  21. if __name__ == "__main__":
  22.    # Create a SparkSession
  23.    spark = (SparkSession
  24.      .builder
  25.      .appName("Example-3_6")
  26.      .getOrCreate())
  27.    # Create a DataFrame using the schema defined above
  28.    blogs_df = spark.createDataFrame(data, schema)
  29.    # Show the DataFrame; it should reflect our table above
  30.    blogs_df.show()
  31.    # Print the schema used by Spark to process the DataFrame
  32.    print(blogs_df.printSchema())
复制代码
从控制台中运行此步伐将产生以下输出:
  1. $ spark-submit Example-3_6.py
  2. ...
  3. +-------+---------+-------+-----------------+---------+-----+------------------+
  4. |Id     |First    |Last   |Url              |Published|Hits |Campaigns         |
  5. +-------+---------+-------+-----------------+---------+-----+------------------+
  6. |1      |Jules    |Damji  |https://tinyurl.1|1/4/2016 |4535 |[twitter,...]     |
  7. |2      |Brooke   |Wenig  |https://tinyurl.2|5/5/2018 |8908 |[twitter,...]     |
  8. |3      |Denny    |Lee    |https://tinyurl.3|6/7/2019 |7659 |[web, twitter...] |
  9. |4      |Tathagata|Das    |https://tinyurl.4|5/12/2018|10568|[twitter, FB]     |
  10. |5      |Matei    |Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter,...]|
  11. |6      |Reynold  |Xin    |https://tinyurl.6|3/2/2015 |25568|[twitter,...]     |
  12. +-------+---------+-------+-----------------+---------+-----+------------------+
  13. root
  14. |-- Id: integer (nullable = false)
  15. |-- First: string (nullable = false)
  16. |-- Last: string (nullable = false)
  17. |-- Url: string (nullable = false)
  18. |-- Published: string (nullable = false)
  19. |-- Hits: integer (nullable = false)
  20. |-- Campaigns: array (nullable = false)
  21. |    |-- element: string (containsNull = false)
复制代码
假如盼望在代码的其他地方使用此schema,只需实行blogs_df.schema,它将返回schema 界说:
  1. StructType(List(StructField("Id",IntegerType,false),
  2. StructField("First",StringType,false),
  3. StructField("Last",StringType,false),
  4. StructField("Url",StringType,false),
  5. StructField("Published",StringType,false),
  6. StructField("Hits",IntegerType,false),
  7. StructField("Campaigns",ArrayType(StringType,true),false)))
复制代码
假如要从JSON文件读取数据,而不是创建静态数据,那么schema 界说将是雷同的。让我们用一个Scala示例来说明雷同的代码,这次是从JSON文件中读取:
  1. // In Scala
  2. package main.scala.chapter3
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.types._
  5. object Example3_7 {
  6.   def main(args: Array[String]) {
  7.     val spark = SparkSession
  8.     .builder
  9.     .appName("Example-3_7")
  10.     .getOrCreate()
  11.     if (args.length <= 0) {
  12.       println("usage Example3_7 <file path to blogs.json>")
  13.       System.exit(1)
  14.     }
  15.     // Get the path to the JSON file
  16.     val jsonFile = args(0)
  17.     // Define our schema programmatically
  18.     val schema = StructType(Array(StructField("Id", IntegerType, false),
  19.                                  StructField("First", StringType, false),
  20.                                  StructField("Last", StringType, false),
  21.                                  StructField("Url", StringType, false),
  22.                                  StructField("Published", StringType, false),
  23.                                  StructField("Hits", IntegerType, false),
  24.                                  StructField("Campaigns", ArrayType(StringType), false)))
  25.     // Create a DataFrame by reading from the JSON file
  26.     // with a predefined schema
  27.     val blogsDF = spark.read.schema(schema).json(jsonFile)
  28.     // Show the DataFrame schema as output
  29.     blogsDF.show(false)
  30.     // Print the schema
  31.     println(blogsDF.printSchema)
  32.     println(blogsDF.schema)
  33.   }
  34. }
复制代码
毫不希奇,Scala步伐的输出与Python步伐的输出没有什么差异:
  1. +---+---------+-------+-----------------+---------+-----+----------------------+
  2. |Id |First    |Last   |Url              |Published|Hits |Campaigns             |
  3. +---+---------+-------+-----------------+---------+-----+----------------------+
  4. |1  |Jules    |Damji  |https://tinyurl.1|1/4/2016 |4535 |[twitter, LinkedIn]   |
  5. |2  |Brooke   |Wenig  |https://tinyurl.2|5/5/2018 |8908 |[twitter, LinkedIn]   |
  6. |3  |Denny    |Lee    |https://tinyurl.3|6/7/2019 |7659 |[web, twitter,...]    |
  7. |4  |Tathagata|Das    |https://tinyurl.4|5/12/2018|10568|[twitter, FB]         |
  8. |5  |Matei    |Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB,...]|
  9. |6  |Reynold  |Xin    |https://tinyurl.6|3/2/2015 |25568|[twitter, LinkedIn]   |
  10. +---+---------+-------+-----------------+---------+-----+----------------------+
  11. root
  12. |-- Id: integer (nullable = true)
  13. |-- First: string (nullable = true)
  14. |-- Last: string (nullable = true)
  15. |-- Url: string (nullable = true)
  16. |-- Published: string (nullable = true)
  17. |-- Hits: integer (nullable = true)
  18. |-- Campaigns: array (nullable = true)
  19. |    |-- element: string (containsNull = true)
  20. StructType(StructField("Id",IntegerType,true),
  21.     StructField("First",StringType,true),
  22.     StructField("Last",StringType,true),
  23.     StructField("Url",StringType,true),
  24.     StructField("Published",StringType,true),
  25.     StructField("Hits",IntegerType,true),
  26.     StructField("Campaigns",ArrayType(StringType,true),true))
复制代码
现在我们已经了解了怎样在DataFrame中使用结构化数据和schema,接下来让我们关注一下DataFrame的列和行,以及使用DataFrame API对它们进行操作意味着什么。

Columns 和 Expressions

如前所述,dataframe中的定名列在概念上类似于pandas或R dataframe或RDBMS表中的定名列:

它们描述一种类型的字段。可以按列的名称列出全部列,并且可以使用关系表达式或计算表达式对列的值实行操作。在Spark支持的语言中,列是具有公共方法的对象(由Column类型表现)。

还可以在列上使用逻辑或数学表达式。例如,可以使用expr("columnName * 5")或(expr("columnName - 5") > col(anothercolumnName))创建一个简单的表达式,此中columnName是Spark类型(整数,字符串等)。expr()是pyspark.sql.functions (Python)和org.apache.spark.sql.functions (Scala)包的一部分。与这些包中的任何其他函数一样,expr()接受Spark将作为表达式解析的参数,并计算效果。
   NOTE
Scala、Java和Python都有与列相关的公共方法。我们留意到Spark文档同时引用了col和Column。Column是对象的名称,而col()是返回Column的标准内置函数。
  让我们看一些例子,看看我们可以用Spark中的列做些什么。每个示例背面都有它的输出:
  1. // In Scala
  2. scala> import org.apache.spark.sql.functions._
  3. scala> blogsDF.columns
  4. res2: Array[String] = Array(Campaigns, First, Hits, Id, Last, Published, Url)
  5. // Access a particular column with col and it returns a Column type
  6. scala> blogsDF.col("Id")
  7. res3: org.apache.spark.sql.Column = id
  8. // Use an expression to compute a value
  9. scala> blogsDF.select(expr("Hits * 2")).show(2)
  10. // or use col to compute value
  11. scala> blogsDF.select(col("Hits") * 2).show(2)
  12. +----------+
  13. |(Hits * 2)|
  14. +----------+
  15. |      9070|
  16. |     17816|
  17. +----------+
  18. // Use an expression to compute big hitters for blogs
  19. // This adds a new column, Big Hitters, based on the conditional expression
  20. blogsDF.withColumn("Big Hitters", (expr("Hits > 10000"))).show()
  21. +---+---------+-------+---+---------+-----+--------------------+-----------+
  22. | Id|    First|   Last|Url|Published| Hits|           Campaigns|Big Hitters|
  23. +---+---------+-------+---+---------+-----+--------------------+-----------+
  24. |  1|    Jules|  Damji|...| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
  25. |  2|   Brooke|  Wenig|...| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
  26. |  3|    Denny|    Lee|...| 6/7/2019| 7659|[web, twitter, FB...|      false|
  27. |  4|Tathagata|    Das|...|5/12/2018|10568|       [twitter, FB]|       true|
  28. |  5|    Matei|Zaharia|...|5/14/2014|40578|[web, twitter, FB...|       true|
  29. |  6|  Reynold|    Xin|...| 3/2/2015|25568| [twitter, LinkedIn]|       true|
  30. +---+---------+-------+---+---------+-----+--------------------+-----------+
  31. // Concatenate three columns, create a new column, and show the
  32. // newly created concatenated column
  33. blogsDF
  34.   .withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))
  35.   .select(col("AuthorsId"))
  36.   .show(4)
  37. +-------------+
  38. |    AuthorsId|
  39. +-------------+
  40. |  JulesDamji1|
  41. | BrookeWenig2|
  42. |    DennyLee3|
  43. |TathagataDas4|
  44. +-------------+
  45. // These statements return the same value, showing that
  46. // expr is the same as a col method call
  47. blogsDF.select(expr("Hits")).show(2)
  48. blogsDF.select(col("Hits")).show(2)
  49. blogsDF.select("Hits").show(2)
  50. +-----+
  51. | Hits|
  52. +-----+
  53. | 4535|
  54. | 8908|
  55. +-----+
  56. // Sort by column "Id" in descending order
  57. blogsDF.sort(col("Id").desc).show()
  58. blogsDF.sort($"Id".desc).show()
  59. +--------------------+---------+-----+---+-------+---------+-----------------+
  60. |           Campaigns|    First| Hits| Id|   Last|Published|              Url|
  61. +--------------------+---------+-----+---+-------+---------+-----------------+
  62. | [twitter, LinkedIn]|  Reynold|25568|  6|    Xin| 3/2/2015|https://tinyurl.6|
  63. |[web, twitter, FB...|    Matei|40578|  5|Zaharia|5/14/2014|https://tinyurl.5|
  64. |       [twitter, FB]|Tathagata|10568|  4|    Das|5/12/2018|https://tinyurl.4|
  65. |[web, twitter, FB...|    Denny| 7659|  3|    Lee| 6/7/2019|https://tinyurl.3|
  66. | [twitter, LinkedIn]|   Brooke| 8908|  2|  Wenig| 5/5/2018|https://tinyurl.2|
  67. | [twitter, LinkedIn]|    Jules| 4535|  1|  Damji| 1/4/2016|https://tinyurl.1|
  68. +--------------------+---------+-----+---+-------+---------+-----------------+
复制代码
在最后一个例子中,表达式blogs_df.sort(col("Id").desc)和blogs_df.sort($"Id".desc)是雷同的。它们都按降序对名为Id的DataFrame列进行排序: 一个使用显式函数col("id")返回column对象,而另一个在列的名称前使用$,这是Spark中的一个函数,将名为Id的列转换为column。
   NOTE
这里我们只列举了最简单的,并且只在Column对象上使用了几个方法。要得到Column对象的全部公共方法的完备列表,请参考Spark文档。
  DataFrame中的列对象不能孤立存在.
每一个 Column 对象都是记录中一行的一部分,全部的行一起构成了一个DataFrame,正如我们将在本文背面看到的,它现实上是Scala中的Dataset[row]。

Rows

Spark中的行是一个通用的row对象,包含一个或多个列。每一列可以是雷同的数据类型(例如,整数或字符串),或者它们可以有差异的类型(整数、字符串、映射、数组等)。因为Row是Spark中的一个对象,并且是一个有序的字段聚集,以是你可以用Spark支持的每种语言实例化Row,并通过从0开始的索引访问它的字段:
  1. // In Scala
  2. import org.apache.spark.sql.Row
  3. // Create a Row
  4. val blogRow = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
  5.   Array("twitter", "LinkedIn"))
  6. // Access using index for individual items
  7. blogRow(1)
  8. res62: Any = Reynold
复制代码
  1. # In Python
  2. from pyspark.sql import Row
  3. blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
  4.                ["twitter", "LinkedIn"])
  5. # access using index for individual items
  6. blog_row[1]
  7. 'Reynold'
复制代码
假如必要快速交互和探索,可以使用Row对象来创建dataframe:
  1. # In Python   
  2. rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
  3. authors_df = spark.createDataFrame(rows, ["Authors", "State"])
  4. authors_df.show()
复制代码
  1. // In Scala
  2. val rows = Seq(("Matei Zaharia", "CA"), ("Reynold Xin", "CA"))
  3. val authorsDF = rows.toDF("Author", "State")
  4. authorsDF.show()
  5. +-------------+-----+
  6. |       Author|State|
  7. +-------------+-----+
  8. |Matei Zaharia|   CA|
  9. |  Reynold Xin|   CA|
  10. +-------------+-----+
复制代码
但是,在实践中,我们通常盼望像前面所示的那样从文件中读取dataframe。在大多数环境下,由于文件将非常庞大,因此界说schema并使用它是创建dataframe的一种更快、更有效的方法。

在创建了大型分布式DataFrame之后,我们将盼望对实在行一些常见的数据操作。让我们来看看可以使用结构化api中的高级关系操作符实行的一些Spark操作。

通用的 DataFrame 算子

要在DataFrame上实行常见的数据操作,首先必要从生存结构化数据的数据源加载一个DataFrame。Spark提供了一个接口 DataFrameReader,它使我们能够从多种数据源(如JSON, CSV, Parquet, Text, Avro, ORC等格式)中读取数据到DataFrame中。同样,要以特定格式将DataFrame写回数据源,Spark使用 DataFrameWriter。
DataFrameReader 和 dataframerwriter 的使用
在Spark中读写非常简单,因为这些高层次的抽象和社区的贡献可以连接到各种各样的数据源,包罗常见的NoSQL存储、rdbms、流引擎(如Apache Kafka和Kinesis)等等。

首先,让我们读取一个包含旧金山消防局呼叫数据的大型CSV文件,如前所述,我们将为该文件界说一个schema,并使用DataFrameReader类及其方法告诉Spark该做什么。因为这个文件包含28列和超过4,380,660条记录(原始数据集有60多个列。我们删除了一些不必要的列,删除了null或无效值的记录,并添加了一个额外的Delay列),以是界说schema 比让Spark推断 schema 更有效。
   NOTE
假如不想指定模式,Spark可以以较低的成本从示例中推断 schema。例如,可以使用samplingRatio选项:
// In Scala
val sampleDF = spark
.read
.option(“samplingRatio”, 0.001)
.option(“header”, true)
.csv(“”“/databricks-datasets/learning-spark-v2/
sf-fire/sf-fire-calls.csv”“”)
  下边是代码实现:
  1. # In Python, define a schema
  2. from pyspark.sql.types import *
  3. # Programmatic way to define a schema
  4. fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
  5.                 StructField('UnitID', StringType(), True),
  6.                 StructField('IncidentNumber', IntegerType(), True),
  7.                 StructField('CallType', StringType(), True),                  
  8.                 StructField('CallDate', StringType(), True),      
  9.                 StructField('WatchDate', StringType(), True),
  10.                 StructField('CallFinalDisposition', StringType(), True),
  11.                 StructField('AvailableDtTm', StringType(), True),
  12.                 StructField('Address', StringType(), True),      
  13.                 StructField('City', StringType(), True),      
  14.                 StructField('Zipcode', IntegerType(), True),      
  15.                 StructField('Battalion', StringType(), True),                 
  16.                 StructField('StationArea', StringType(), True),      
  17.                 StructField('Box', StringType(), True),      
  18.                 StructField('OriginalPriority', StringType(), True),      
  19.                 StructField('Priority', StringType(), True),      
  20.                 StructField('FinalPriority', IntegerType(), True),      
  21.                 StructField('ALSUnit', BooleanType(), True),      
  22.                 StructField('CallTypeGroup', StringType(), True),
  23.                 StructField('NumAlarms', IntegerType(), True),
  24.                 StructField('UnitType', StringType(), True),
  25.                 StructField('UnitSequenceInCallDispatch', IntegerType(), True),
  26.                 StructField('FirePreventionDistrict', StringType(), True),
  27.                 StructField('SupervisorDistrict', StringType(), True),
  28.                 StructField('Neighborhood', StringType(), True),
  29.                 StructField('Location', StringType(), True),
  30.                 StructField('RowID', StringType(), True),
  31.                 StructField('Delay', FloatType(), True)])
  32. # Use the DataFrameReader interface to read a CSV file
  33. sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"
  34. fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
复制代码
  1. // In Scala it would be similar
  2. val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  3.                                   StructField("UnitID", StringType, true),
  4.                                   StructField("IncidentNumber", IntegerType, true),
  5.                                   StructField("CallType", StringType, true),
  6.                                   StructField("Location", StringType, true),
  7.                                   ...
  8.                                   ...
  9.                                   StructField("Delay", FloatType, true)))
  10. // Read the file using the CSV DataFrameReader
  11. val sfFireFile="/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"
  12. val fireDF = spark.read.schema(fireSchema)
  13. .option("header", "true")
  14. .csv(sfFireFile)
复制代码
spark.read.csv()函数读入CSV文件并返回包含 schema 中指定类型的行和定名列的DataFrame。

想以特定格式将DataFrame写入外部数据源,可以使用DataFrameWriter接口。与DataFrameReader一样,它支持多个数据源。Parquet,一种流行的列示存储格式,是默认格式;它使用快速压缩来压缩数据。假如DataFrame被写成Parquet,那么 scheme 将作为Parquet元数据的一部分生存。在这种环境下,后续读取回DataFrame不必要手动提供 schema。
将DataFrame生存为Parquet文件或SQL表
常见的数据操作是查询和转换数据,然后以Parquet格式持久化DataFrame或将其生存为SQL表。持久化转换后的DataFrame与读取它一样简单。例如,要在读取数据框后将其作为文件持久化,将实行以下操作:
  1. // In Scala to save as a Parquet file
  2. val parquetPath = ...
  3. fireDF.write.format("parquet").save(parquetPath)
复制代码
  1. # In Python to save as a Parquet file
  2. parquet_path = ...
  3. fire_df.write.format("parquet").save(parquet_path)
复制代码
或者作为表生存:
  1. // In Scala to save as a table
  2. val parquetTable = ... // name of the table
  3. fireDF.write.format("parquet").saveAsTable(parquetTable)
复制代码
  1. # In Python
  2. parquet_table = ... # name of the table
  3. fire_df.write.format("parquet").saveAsTable(parquet_table)
复制代码
在读取数据之后,让我们浏览一下对dataframe实行的一些常见操作。
转换和操作
现在,在内存中有了一个由San Francisco Fire Department调用构成的分布式DataFrame,作为开辟人员,要做的第一件事就是检查数据,看看列是什么样子。它们是正确的类型吗?有必要转换成差异类型的吗?它们有空值吗?
投影和过滤
关系术语中的投影是一种方法,通过使用过滤器只返回与特定关系条件匹配的行。在Spark中,投影是用select()方法完成的,而过滤器可以用filter()或where()方法表现。我们可以使用这种技术来查询数据集的特定信息:
  1. # In Python
  2. few_fire_df = (fire_df
  3.   .select("IncidentNumber", "AvailableDtTm", "CallType")
  4.   .where(col("CallType") != "Medical Incident"))
  5. few_fire_df.show(5, truncate=False)
复制代码
  1. // In Scala
  2. val fewFireDF = fireDF
  3. .select("IncidentNumber", "AvailableDtTm", "CallType")
  4. .where($"CallType" =!= "Medical Incident")   
  5. fewFireDF.show(5, false)
  6. +--------------+----------------------+--------------+
  7. |IncidentNumber|AvailableDtTm         |CallType      |
  8. +--------------+----------------------+--------------+
  9. |2003235       |01/11/2002 01:47:00 AM|Structure Fire|
  10. |2003235       |01/11/2002 01:51:54 AM|Structure Fire|
  11. |2003235       |01/11/2002 01:47:00 AM|Structure Fire|
  12. |2003235       |01/11/2002 01:47:00 AM|Structure Fire|
  13. |2003235       |01/11/2002 01:51:17 AM|Structure Fire|
  14. +--------------+----------------------+--------------+
  15. only showing top 5 rows
复制代码
重定名、添加和删除列
有时出于样式或约定的原因必要重定名特定列,有时则是为了可读性或简洁性。有时候数据集的列中大概会有空格,例如,列名“IncidentNumber ”和“IncidentNumber”。列名中的空格大概会有题目,特别是当想要将DataFrame写入或生存为Parquet文件时(这是禁止的)。

通过使用StructField在 schema 中指定所需的列名,就像我们所做的那样,我们有效地更改了效果DataFrame中的全部名称。

或者,可以使用withColumnRenamed()方法选择性地重定名列。例如,让我们将列Delay的名称更改为ResponseDelayd inMins,并检察超过五分钟的响应时间:
  1. # In Python
  2. new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
  3. (new_fire_df
  4. .select("ResponseDelayedinMins")
  5. .where(col("ResponseDelayedinMins") > 5)
  6. .show(5, False))
复制代码
  1. // In Scala
  2. val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
  3. newFireDF
  4. .select("ResponseDelayedinMins")
  5. .where($"ResponseDelayedinMins" > 5)
  6. .show(5, false)
复制代码
将返回一个新的重定名列:
  1. +---------------------+
  2. |ResponseDelayedinMins|
  3. +---------------------+
  4. |5.233333             |
  5. |6.9333334            |
  6. |6.116667             |
  7. |7.85                 |
  8. |77.333336            |
  9. +---------------------+
  10. only showing top 5 rows
复制代码
  NOTE
因为DataFrame转换是不可变的,以是当我们使用withcolumnrename()重定名一个列时,我们会得到一个新的DataFrame,同时生存原来的列名称。
  修改列的内容或其类型是数据查询过程中常见的操作。在某些环境下,数据是原始的或脏的,或者其类型不适合作为关系操作符的参数提供。例如,在数据集中,列CallDate、WatchDate和AlarmDtTm是字符串,而不是Unix时间戳或SQL日期,这两者都是Spark支持的,并且可以在转换或操作期间(例如,在基于日期或基于时间的数据分析期间)轻松操作。

那么我们怎样将它们转换成更可用的格式呢?这非常简单,这要归功于一些高级API方法。Spark.sql.functions有一组to/from date/timestamp函数,比如to_timestamp()和to_date(),我们可以使用它们来实现这个目标:
  1. # In Python
  2. fire_ts_df = (new_fire_df
  3.               .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
  4.               .drop("CallDate")
  5.               .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
  6.               .drop("WatchDate")
  7.               .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
  8.                                                         "MM/dd/yyyy hh:mm:ss a"))
  9.               .drop("AvailableDtTm"))
  10. # Select the converted columns
  11. (fire_ts_df
  12. .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
  13. .show(5, False))
复制代码
  1. // In Scala
  2. val fireTsDF = newFireDF
  3. .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
  4. .drop("CallDate")
  5. .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
  6. .drop("WatchDate")
  7. .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
  8.                                             "MM/dd/yyyy hh:mm:ss a"))
  9. .drop("AvailableDtTm")
  10. // Select the converted columns
  11. fireTsDF
  12. .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
  13. .show(5, false)
复制代码
这些查询包含了相当大的工作量,让我们来分析一下它们做了什么:

  • 将现有列的数据类型从字符串转换为spark支持的时间戳。
  • 在适当的环境下,使用格式字符串"MM/dd/yyyy"或"MM/dd/yyyy hh: MM:ss a"指定的新格式。
  • 转换为新数据类型后,删除旧列,并将新列作为 withColumn()的第一个参数。
  • 将新修改的DataFrame 实例化为 fire_ts_df。
查询的新列效果如下:
  1. +-------------------+-------------------+-------------------+
  2. |IncidentDate       |OnWatchDate        |AvailableDtTS      |
  3. +-------------------+-------------------+-------------------+
  4. |2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:58:43|
  5. |2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:10:17|
  6. |2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:47:00|
  7. |2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:54|
  8. |2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:47:00|
  9. +-------------------+-------------------+-------------------+
  10. only showing top 5 rows
复制代码
现在我们已经修改了日期,我们可以使用spark.sql.functions中的函数进行查询,如dayofmonth()、dayofyear()和dayofweek()来进一步查询我们的数据。我们可以找出在过去七天内记录了多少电话,或者我们可以通过这个查询检察数据集中包含了多少年的消防部分电话:
  1. # In Python
  2. (fire_ts_df
  3. .select(year('IncidentDate'))
  4. .distinct()
  5. .orderBy(year('IncidentDate'))
  6. .show())
复制代码
  1. // In Scala
  2. fireTsDF
  3. .select(year($"IncidentDate"))
  4. .distinct()
  5. .orderBy(year($"IncidentDate"))
  6. .show()
  7. +------------------+
  8. |year(IncidentDate)|
  9. +------------------+
  10. |              2000|
  11. |              2001|
  12. |              2002|
  13. |              2003|
  14. |              2004|
  15. |              2005|
  16. |              2006|
  17. |              2007|
  18. |              2008|
  19. |              2009|
  20. |              2010|
  21. |              2011|
  22. |              2012|
  23. |              2013|
  24. |              2014|
  25. |              2015|
  26. |              2016|
  27. |              2017|
  28. |              2018|
  29. +------------------+
复制代码
到现在为止,我们已经探究了一些常见的数据操作:读取和写入数据框架.
界说schema 并在读取DataFrame时使用它。
将DataFrame生存为Parquet文件或表。
从现有DataFrame投射和过滤选定的列。
以及修改、重定名和删除列。
聚合
假如我们想知道最常见的火警呼叫类型是什么,或者哪个邮政编码占了最多的电话,该怎么办?这类题目在数据分析和查询中很常见。
Dataframes 上的一些转换和算子,如groupBy()、orderBy()和count(),提供了按列名进行聚合的本领,然后在它们之间聚合计数。
  1. # In Python
  2. (fire_ts_df
  3. .select("CallType")
  4. .where(col("CallType").isNotNull())
  5. .groupBy("CallType")
  6. .count()
  7. .orderBy("count", ascending=False)
  8. .show(n=10, truncate=False))
复制代码
  1. // In Scala
  2. fireTsDF
  3. .select("CallType")
  4. .where(col("CallType").isNotNull)
  5. .groupBy("CallType")
  6. .count()
  7. .orderBy(desc("count"))
  8. .show(10, false)
  9. +-------------------------------+-------+
  10. |CallType                       |count  |
  11. +-------------------------------+-------+
  12. |Medical Incident               |2843475|
  13. |Structure Fire                 |578998 |
  14. |Alarms                         |483518 |
  15. |Traffic Collision              |175507 |
  16. |Citizen Assist / Service Call  |65360  |
  17. |Other                          |56961  |
  18. |Outside Fire                   |51603  |
  19. |Vehicle Fire                   |20939  |
  20. |Water Rescue                   |20037  |
  21. |Gas Leak (Natural and LP Gases)|17284  |
  22. +-------------------------------+-------+
复制代码
  NOTE
DataFrame API也提供了collect()方法,但是对于非常大的DataFrame来说,这是资源繁重(昂贵)且伤害的,因为它大概导致内存不敷(OOM)异常。与count()差异,它向驱动步伐返回单个数字,collect()返回整个DataFrame或Dataset中全部Row对象的聚集。假如想检察一些Row记录,最好使用take(n),它将只返回DataFrame的前n个Row对象。
  其他常见的DataFrame 算子
与我们看到的全部其他方法一样,DataFrame API提供了描述性统计方法,如min()、max()、sum()和avg()。让我们看一些示例,展示怎样使用SF消防局的数据集计算它们。

在这里,我们计算警报的总和,平均响应时间,以及我们数据集中全部火警调用的最小和最大响应时间,以Pythonic 的方式导入PySpark函数,以免与内置Python函数冲突:
  1. # In Python
  2. import pyspark.sql.functions as F
  3. (fire_ts_df
  4. .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
  5.          F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
  6. .show())
复制代码
  1. // In Scala
  2. import org.apache.spark.sql.{functions => F}
  3. fireTsDF
  4. .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
  5.           F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
  6. .show()
  7. +--------------+--------------------------+--------------------------+---------+
  8. |sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(...) |
  9. +--------------+--------------------------+--------------------------+---------+
  10. |       4403441|         3.902170335891614|               0.016666668|1879.6167|
  11. +--------------+--------------------------+--------------------------+---------+
复制代码
对于数据科学工作内容中常见的更高级的统计需求,请阅读API文档,了解stat()、describe()、correlation()、covariance()、sampleBy()、approxQuantile()、frequentItems()等方法。

正如我们所看到的,用DataFrames的高级API和DSL 算子进行链式查询是很轻易的。假如我们实验对rdd做同样的事变,我们无法想象代码的不透明性和不可读性!

接下来,我们将把重点转移到Dataset API,并探索这两个API怎样为开辟人员提供同一的结构化接口,用于编程Spark。然后,我们将检察RDD、DataFrame和Dataset API之间的关系,并资助您确定何时使用哪个API以及为什么使用。

The Dataset API

Spark 2.0将DataFrame和Dataset api同一为具有类似接口的结构化api,这样开辟人员只必要学习一组api。Datasets 有两种特征: 界说类型和未界说类型的 API:

从概念上讲,可以将Scala中的DataFrame视为通用对象聚集Dataset[Row]的别名,此中Row是一个通用的无类型JVM对象,可以包含差异类型的字段。相比之下,Dataset 是Scala中强类型JVM对象的聚集或Java中的类。或者,正如Dataset文档所说,Dataset是:


  • 域特定对象的强类型聚集,可以使用函数或关系操作并行转换。每个数据集(在Scala中)也有一个称为DataFrame的未类型化视图,它是一个行数据集。
有类型 Objects、无类型 Objects 和通用 Rows

在Spark支持的语言中,Datasets 只在Java和Scala中故意义,而在Python和R中只有DataFrames 故意义。这是因为Python和R不是编译时类型安全的。类型是在实行期间动态推断或赋值的,而不是在编译期间。在Scala和Java中则相反,类型在编译时绑定到变量和对象。然而,在Scala中,DataFrame只是untyped Dataset[Row]的别名,下图进行了简单的对比:

Row是Spark中的通用对象类型,包含可以使用索引访问的混淆类型聚集。在内部,Spark操作Row对象,将它们转换为其他语言的等效类型。例如,Int作为Row中的一个字段将被映射或转换为IntegerType或IntegerType(),分别用于Scala或Java和Python:
  1. // In Scala
  2. import org.apache.spark.sql.Row
  3. val row = Row(350, true, "Learning Spark 2E", null)
复制代码
  1. # In Python
  2. from pyspark.sql import Row
  3. row = Row(350, True, "Learning Spark 2E", None)
复制代码
通过索引访问:
  1. // In Scala
  2. row.getInt(0)
  3. res23: Int = 350
  4. row.getBoolean(1)
  5. res24: Boolean = true
  6. row.getString(2)
  7. res25: String = Learning Spark 2E
复制代码
  1. # In Python
  2. row[0]
  3. Out[13]: 350
  4. row[1]
  5. Out[14]: True
  6. row[2]
  7. Out[15]: 'Learning Spark 2E'
复制代码
相比之下,有类型的对象是JVM中现实的Java或Scala类对象。Dataset中的每个元素都映射到一个JVM对象。

创建 Datasets

与从数据源创建 DataFrames 一样,在创建 Dataset 时,必须了解 schema。换句话说,您必要知道数据类型。虽然使用JSON和CSV数据可以推断 schema,但对于大型数据集,这是资源密集型的(昂贵的)。在Scala中创建数据集时,为效果数据集指定模式的最简单方法是使用case类。在Java中,使用JavaBean类。
Scala: Case classes
当我们盼望将自己的特定领域对象实例化为Dataset时,可以通过在Scala中界说case类来实现。

我们的文件有如下的JSON字符串行:
  1. {"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip":
  2. "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude":
  3. 53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21,
  4. "humidity": 65, "battery_level": 8, "c02_level": 1408,"lcd": "red",
  5. "timestamp" :1458081226051}
复制代码
为了将每个JSON条目表现为DeviceIoTData,一个特定于领域的对象,我们可以界说一个Scala case类:
  1. case class DeviceIoTData (battery_level: Long, c02_level: Long,
  2.                           cca2: String, cca3: String, cn: String, device_id: Long,
  3.                           device_name: String, humidity: Long, ip: String, latitude: Double,
  4.                           lcd: String, longitude: Double, scale:String, temp: Long,
  5.                           timestamp: Long)
复制代码
一旦界说,我们可以使用它来读取文件并将返回的Dataset[Row]转换为Dataset[DeviceIoTData](输出被截断以适合页面):
  1. // In Scala
  2. val ds = spark.read
  3. .json("/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json")
  4. .as[DeviceIoTData]
  5. ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]
  6. ds.show(5, false)
  7. +-------------|---------|----|----|-------------|---------|---+
  8. |battery_level|c02_level|cca2|cca3|cn           |device_id|...|
  9. +-------------|---------|----|----|-------------|---------|---+
  10. |8            |868      |US  |USA |United States|1        |...|
  11. |7            |1473     |NO  |NOR |Norway       |2        |...|
  12. |2            |1556     |IT  |ITA |Italy        |3        |...|
  13. |6            |1080     |US  |USA |United States|4        |...|
  14. |4            |931      |PH  |PHL |Philippines  |5        |...|
  15. +-------------|---------|----|----|-------------|---------|---+
  16. only showing top 5 rows
复制代码

Dataset 算子

正如你可以对 Dataframe 实行转换和算子操作一样,你也可以对 Dataset 实行转换和算子操作。根据操作的类型,效果会有所差异:
  1. // In Scala
  2. val filterTempDS = ds.filter({d => {d.temp > 30 && d.humidity > 70})
  3. filterTempDS: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]
  4. filterTempDS.show(5, false)
  5. +-------------|---------|----|----|-------------|---------|---+
  6. |battery_level|c02_level|cca2|cca3|cn           |device_id|...|
  7. +-------------|---------|----|----|-------------|---------|---+
  8. |0            |1466     |US  |USA |United States|17       |...|
  9. |9            |986      |FR  |FRA |France       |48       |...|
  10. |8            |1436     |US  |USA |United States|54       |...|
  11. |4            |1090     |US  |USA |United States|63       |...|
  12. |4            |1072     |PH  |PHL |Philippines  |81       |...|
  13. +-------------|---------|----|----|-------------|---------|---+
  14. only showing top 5 rows
复制代码
在这个查询中,我们使用一个函数作为Dataset方法filter()的参数。这是一个具有许多署名的重载方法。我们使用的版本是filter(func: (T) > Boolean): Dataset[T],它接受一个lambda函数func: (T) > Boolean作为参数。

lambda函数的参数是DeviceIoTData类型的JVM对象。因此,我们可以使用点(.)表现法访问它的各个数据字段,就像在Scala类或JavaBean中一样。
下面是另一个例子,效果是另一个更小的 Dataset:
  1. // In Scala
  2. case class DeviceTempByCountry(temp: Long, device_name: String, device_id: Long,
  3.   cca3: String)
  4. val dsTemp = ds
  5.   .filter(d => {d.temp > 25})
  6.   .map(d => (d.temp, d.device_name, d.device_id, d.cca3))
  7.   .toDF("temp", "device_name", "device_id", "cca3")
  8.   .as[DeviceTempByCountry]
  9. dsTemp.show(5, false)
  10. +----+---------------------+---------+----+
  11. |temp|device_name          |device_id|cca3|
  12. +----+---------------------+---------+----+
  13. |34  |meter-gauge-1xbYRYcj |1        |USA |
  14. |28  |sensor-pad-4mzWkz    |4        |USA |
  15. |27  |sensor-pad-6al7RTAobR|6        |USA |
  16. |27  |sensor-pad-8xUD6pzsQI|8        |JPN |
  17. |26  |sensor-pad-10BsywSYUF|10       |USA |
  18. +----+---------------------+---------+----+
  19. only showing top 5 rows
复制代码
或者可以只检索数据集的第一行:
  1. val device = dsTemp.first()
  2. println(device)
  3. device: DeviceTempByCountry =
  4. DeviceTempByCountry(34,meter-gauge-1xbYRYcj,1,USA)
复制代码
或者,可以使用列名表达雷同的查询,然后逼迫转换为Dataset[DeviceTempByCountry]:
  1. // In Scala
  2. val dsTemp2 = ds
  3.   .select($"temp", $"device_name", $"device_id", $"device_id", $"cca3")
  4.   .where("temp > 25")
  5.   .as[DeviceTempByCountry]
复制代码
  NOTE
从语义上讲,select()类似于前一个查询中的map(),因为这两个查询都选择字段并生成雷同的效果。
  当我们使用数据集时,底层Spark SQL引擎处理JVM对象的创建、转换、序列化和反序列化。它还在Dataset编码器的资助下处理java之外的堆内存管理。

DataFrames 和 Datasets 比较

到现在为止,我们大概想知道为什么以及何时应该使用 DataFrames 或 Datasets。在许多环境下,两者都可以,这取决于正在使用的语言,但在某些环境下,此中一种比另一种更可取。下面是一些例子:


  • 假如你想让 Spark 做什么,而不是怎么做,使用 DataFrames 或 Datasets。
  • 假如您想要丰富的语义、高级抽象和DSL操作符,使用 DataFrames 或 Datasets。
  • 假如你想要严格的编译时类型安全,并且不介怀为特定的Dataset[T]创建多个case类,请使用Datasets。
  • 假如你的处理必要高级表达式、过滤器、映射、聚合、计算平均值或总和、SQL查询、列访问或在半结构化数据上使用关系操作符,请使用 DataFrames 或 Datasets。
  • 假如你的步伐要求关系转换类似于sql的查询,请使用dataframe。
  • 假如你想利用和受益于钨编码器的高效序列化,使用 Datasets。
  • 假如盼望同一、代码优化和简化Spark组件之间的api,请使用dataframe。
  • 假如你是R语言用户,请使用DataFrames。
  • 假如你是Python用户,请使用dataframe,假如必要更多控制,可以使用rdd。
  • 假如必要空间和速度效率,请使用dataframe。
  • 假如盼望在编译期间而不是在运行时捕捉错误,请选择下图所示的适当API。


什么时候使用 RDD

你大概会问: rdd被贬为二等公民了吗?它们被弃用了吗?
答案是一个响亮的否定!
RDD API将继承得到支持,尽管全部未来的开辟工作都将在Spark 2.x和Spark 3.0 中进行,将继承使用DataFrame接口和语义,而不是使用rdd。
在某些环境下,你大概必要思量使用rdd,例如:


  • 是否正在使用rdd编写的第三方软件包
  • 可以放弃Dataframs和Dataset提供的代码优化、有效的空间利用和性能优势吗
  • 想要精确地指示Spark怎样做查询
构建高效查询和生成紧凑代码的过程是Spark SQL引擎的工作。它是我们不停在研究的结构化api构建的底子。

Spark SQL 和底层引擎

在编程层面,Spark SQL允许开辟人员使用 schema 对结构化数据发出与ANSI SQL:2003兼容的查询。自从在Spark 1.3中引入以来,Spark SQL已经发展成为一个强大的引擎,许多高级结构化功能都建立在它的底子上。除了允许你对你的数据发出类似SQL的查询,Spark SQL引擎:


  • 同一Spark组件,并允许 Java、Scala、Python和R中的 DataFrames/Datasets 的抽象,从而简化告终构化数据集的处理。
  • 连接Apache Hive metastore和表。
  • 从结构化文件格式(JSON, CSV, Text, Avro, Parquet, ORC等)中以特定模式读写结构化数据,并将数据转换为暂时表。
  • 提供交互式Spark SQL shell,用于快速数据查询。
  • 通过标准数据库JDBC/ODBC连接器提供与外部工具之间的桥接。
  • 为JVM生成优化的查询操持和紧凑的代码,以便终极实行。
下图显示了Spark SQL与其他组件间的交互:

Spark SQL引擎的焦点是Catalyst优化器和Project Tungsten。它们一起支持高级的DataFrame和Dataset api以及SQL查询。

The Catalyst Optimizer

Catalyst优化器接受计算查询并将其转换为实行操持。它经历了四个变化阶段,如下图所示:

  • 分析
  • 逻辑优化
  • 生成物理操持
  • 代码生成

下面两个示例代码块将经历雷同的过程,终极得到类似的查询操持和雷同的实行字节码。也就是说,不管你使用哪种语言,你的计算过程都是一样的,产生的字节码大概是一样的:
  1. # In Python
  2. count_mnm_df = (mnm_df
  3.                 .select("State", "Color", "Count")
  4.                 .groupBy("State", "Color")
  5.                 .agg(sum("Count")
  6.                      .alias("Total"))
  7.                 .orderBy("Total", ascending=False))
复制代码
  1. -- In SQL
  2. SELECT State, Color, sum(Count) AS Total
  3. FROM MNM_TABLE_NAME
  4. GROUP BY State, Color
  5. ORDER BY Total DESC
复制代码
要检察Python代码经历的差异阶段,可以在DataFrame上使用count_mnm_df.explain(True)方法。或者,要检察差异的逻辑和物理操持,可以在Scala中调用df.queryExecution.logical或df.queryExecution.optimizedPlan:
  1. count_mnm_df.explain(True)
  2. == Parsed Logical Plan ==
  3. 'Sort ['Total DESC NULLS LAST], true
  4. +- Aggregate [State#10, Color#11], [State#10, Color#11, sum(Count#12) AS...]
  5.    +- Project [State#10, Color#11, Count#12]
  6.       +- Relation[State#10,Color#11,Count#12] csv
  7. == Analyzed Logical Plan ==
  8. State: string, Color: string, Total: bigint
  9. Sort [Total#24L DESC NULLS LAST], true
  10. +- Aggregate [State#10, Color#11], [State#10, Color#11, sum(Count#12) AS...]
  11.    +- Project [State#10, Color#11, Count#12]
  12.       +- Relation[State#10,Color#11,Count#12] csv
  13. == Optimized Logical Plan ==
  14. Sort [Total#24L DESC NULLS LAST], true
  15. +- Aggregate [State#10, Color#11], [State#10, Color#11, sum(Count#12) AS...]
  16.    +- Relation[State#10,Color#11,Count#12] csv
  17. == Physical Plan ==
  18. *(3) Sort [Total#24L DESC NULLS LAST], true, 0
  19. +- Exchange rangepartitioning(Total#24L DESC NULLS LAST, 200)
  20.    +- *(2) HashAggregate(keys=[State#10, Color#11], functions=[sum(Count#12)],
  21. output=[State#10, Color#11, Total#24L])
  22.       +- Exchange hashpartitioning(State#10, Color#11, 200)
  23.          +- *(1) HashAggregate(keys=[State#10, Color#11],
  24. functions=[partial_sum(Count#12)], output=[State#10, Color#11, count#29L])
  25.             +- *(1) FileScan csv [State#10,Color#11,Count#12] Batched: false,
  26. Format: CSV, Location:
  27. InMemoryFileIndex[file:/Users/jules/gits/LearningSpark2.0/chapter2/py/src/...
  28. dataset.csv], PartitionFilters: [], PushedFilters: [], ReadSchema:
  29. struct<State:string,Color:string,Count:int>
复制代码
让我们思量另一个DataFrame计算示例。下面的Scala代码经历了类似的过程,底层引擎优化了它的逻辑和物理操持:
  1. // In Scala
  2. // Users DataFrame read from a Parquet table
  3. val usersDF  = ...
  4. // Events DataFrame read from a Parquet table
  5. val eventsDF = ...
  6. // Join two DataFrames
  7. val joinedDF = users
  8. .join(events, users("id") === events("uid"))
  9. .filter(events("date") > "2015-01-01")
复制代码
在颠末初始分析阶段后,查询操持由Catalyst优化器进行转换和重新分列, 如下图:

让我们逐一检察四个查询优化阶段。
阶段一:分析
Spark SQL引擎首先为SQL或DataFrame查询生成一个抽象语法树(AST)。在这个初始阶段,任何列或表的名称都将通过查询内部Catalog来解析,Catalog是Spark SQL的编程接口,包含列、数据类型、函数、表、数据库等名称的列表。一旦它们都被成功地办理了,查询就会进入下一个阶段。
阶段二:逻辑优化
该阶段包罗两个内部阶段,应用基于标准规则的优化方法,Catalyst优化器将首先构建一组多个操持,然后使用其基于成本的优化器(CBO)为每个操持分配成本。这些操持以操作员树的形式睁开,它们大概包罗,例如常数折叠、谓词下推、投影剪枝、布尔表达式简化等过程。这个逻辑操持是物理操持的输入。
阶段三:生成物理操持
在此阶段,Spark SQL使用与Spark实行引擎中可用的物理操作符匹配的物理操作符,为所选逻辑操持生成最优物理操持。
阶段四:生成代码
查询优化的最后阶段涉及生成在每台呆板上运行的高效Java字节码。由于Spark SQL可以对内存中加载的数据集进行操作,因此Spark可以使用最先进的编译器技术来生成代码以加快实行速度。换句话说,它就像一个编译器。促进整个阶段代码生成的Project Tungsten在这里发挥了作用。

什么是全阶段代码生成?这是一个物理查询优化阶段,它将整个查询压缩为单个函数,消除了虚拟函数调用,并为中心数据使用CPU寄存器。Spark 2.0中引入的第二代Tungsten引擎使用这种方法生成紧凑的RDD代码以供终极实行。这种流线型策略显着进步了CPU效率和性能。

总结

在本文中,我们深入探究了Spark的结构化api,从Spark结构的历史和长处开始。
通过说明性的常见数据操作和代码示例,我们演示了高级DataFrame和Dataset API比低级RDD API更具表现力和直观性。结构化api旨在简化大型数据集的处理,为常见的数据操作提供特定于领域的操作符,从而进步代码的可读性。
我们根据差异用例场景探究了何时使用rdd、数据框架和数据集。
最后,我们深入了解了Spark SQL引擎的主要组件——Catalyst优化器和Project tungsten——是怎样支持结构化高级api和DSL操作符的。正如你所看到的,无论使用哪种支持Spark的语言,Spark查询都要经历雷同的优化过程,从逻辑和物理操持构建到终极的紧凑代码生成。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表