【pyspark学习从入门到精通7】DataFrames_2

打印 上一主题 下一主题

主题 924|帖子 924|积分 2772

目录

创建 DataFrames
生成我们本身的 JSON 数据
创建 DataFrame
创建暂时表
简单的 DataFrame 查询
DataFrame API 查询
SQL 查询


创建 DataFrames

通常,您会通过利用 SparkSession(或在 PySpark shell 中调用 spark)导入数据来创建 DataFrame。
我们将讨论如何将数据导入到当地文件系统、Hadoop 分布式文件系统(HDFS)或其他云存储系统(比方,S3 或 WASB)。在本文中,我们将专注于在 Spark 内直接生成您本身的 DataFrame 数据或利用 Databricks 社区版中已经可用的数据源。
首先,我们将不访问文件系统,而是通过生成数据来创建 DataFrame。在这种情况下,我们将首先创建 stringJSONRDD RDD,然后将其转换为 DataFrame。这段代码片段创建了一个包含游泳者(他们的 ID、姓名、年龄和眼睛颜色)的 JSON 格式的 RDD。
生成我们本身的 JSON 数据

下面,我们将最初生成 stringJSONRDD RDD:
  1. stringJSONRDD = sc.parallelize(("""
  2. { "id": "123",
  3. "name": "Katie",
  4. "age": 19,
  5. "eyeColor": "brown"
  6. }""",
  7. """{
  8. "id": "234",
  9. "name": "Michael",
  10. "age": 22,
  11. "eyeColor": "green"
  12. }""",
  13. """{
  14. "id": "345",
  15. "name": "Simone",
  16. "age": 23,
  17. "eyeColor": "blue"
  18. }""")
  19. )
复制代码
现在我们已经创建了 RDD,我们将利用 SparkSession 的 read.json 方法(即 spark.read.json(...))将其转换为 DataFrame。我们还将利用 .createOrReplaceTempView 方法创建一个暂时表。
创建 DataFrame

以下是创建 DataFrame 的代码:
  1. swimmersJSON = spark.read.json(stringJSONRDD)
复制代码
创建暂时表

以下是创建暂时表的代码:
  1. swimmersJSON.createOrReplaceTempView("swimmersJSON")
复制代码
如前文所述,许多 RDD 操作是转换,这些转换直到执办法作操作时才执行。比方,在前面的代码片段中,sc.parallelize 是一个转换,当利用 spark.read.json 从 RDD 转换为 DataFrame 时执行。注意,在这段代码的条记本截图中(左下角附近),直到包含 spark.read.json 操作的第二个单元格,Spark 作业才执行。
为了进一步夸大这一点,在下图的右侧窗格中,我们展示了执行的 DAG 图。
在下面的截图中,您可以看到 Spark 作业的 parallelize 操作来自生成 RDD stringJSONRDD 的第一个单元格,而 map 和 mapPartitions 操作是创建 DataFrame 所需的操作:

需要注意的是,parallelize、map 和 mapPartitions 都是 RDD 转换。在 DataFrame 操作 spark.read.json(在本例中)中,不仅有 RDD 转换,另有将 RDD 转换为 DataFrame 的动作。这是一个重要的说明,由于纵然您正在执行 DataFrame 操作,要调试您的操作,您需要记住您将在 Spark UI 中明白 RDD 操作。
请注意,创建暂时表是一个 DataFrame 转换,而且在执行 DataFrame 动作之前不会执行(比方,要执行的 SQL 查询)。
简单的 DataFrame 查询

现在您已经创建了 swimmersJSON DataFrame,我们将可以大概在其上运行 DataFrame API 以及 SQL 查询。让我们从一个简单的查询开始,显示 DataFrame 中的所有行。
DataFrame API 查询

要利用 DataFrame API 执行此操作,您可以利用 show(<n>) 方法,该方法将前 n 行打印到控制台:
  1. # DataFrame API
  2. swimmersJSON.show()
复制代码
这将给出以下输出:

SQL 查询

如果您更倾向于编写 SQL 语句,您可以编写以下查询:
  1. spark.sql("select * from swimmersJSON").collect()
复制代码
这将给出以下输出:

我们利用了 .collect() 方法,它返回所有记录作为一个行对象(Row objects)的列表。请注意,您可以对 DataFrames 和 SQL 查询利用 collect() 或 show() 方法。只要确保,如果您利用 .collect(),这是针对小 DataFrame 的,由于它将返回 DataFrame 中的所有行,并将它们从执行器移回驱动步伐。您可以改用 take(<n>) 或 show(<n>),这答应您通过指定 <n> 来限制返回的行数:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表