ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 Spar [打印本页]

作者: 我爱普洱茶    时间: 2024-8-28 20:38
标题: 大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 Spar
点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:


章节内容

上节完成的内容如下:

SparkSession

在 Spark2.0 之前


在 Spark2.0 后


RDD(Resilient Distributed Dataset,弹性分布式数据集)

RDD 是 Spark 的基础抽象,它表示一个不可变的、分布式的数据集。
特点:

DataFrame

DataFrame 是一种基于 RDD 的分布式数据集,它具有定名的列。
特点:

DataSet

DataSet 是 Spark 1.6 引入的一个新的数据抽象,它结合了 RDD 的强类型优势和 DataFrame 的优化能力。
特点:

DataFrame & Dataset 创建

不要刻意区分: DF & DS,DF是一种特殊的DS:ds.transformation => ds
由 Range 生成 Dataset

在 spark-shell 中进行测试
  1. val numDS = spark.range(5, 100, 5)
  2. // orderBy 转换操作
  3. numDS.orderBy(desc("id")).show(5)
  4. // 统计信息
  5. numDS.describe().show
  6. // 显示 Schema 信息
  7. numDS.printSchema
  8. // 使用RDD执行同样的操作
  9. numDS.rdd.map(_.toInt).stats
  10. // 检查分区数
  11. numDS.rdd.getNumPartitions
复制代码
运行测试的过程如下图所示:

有聚集生成Dataset

Dataset = RDD[case class],在 spark-shell 中进行测试
  1. case class Person(name: String, age: Int, height: Int)
  2. // 注意 Seq 中元素的类型
  3. val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
  4. val ds1 = spark.createDataset(seq1)
  5. ds1.printSchema
  6. ds1.show
复制代码
实行的结果:

再来一个测试:
  1. val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
  2. val ds2 = spark.createDataset(seq2)
  3. ds2.printSchema
  4. ds2.show
复制代码
实行的结果:

由聚集生成DataFrame

DataFrame = RDD[Row] + Schema
继承进行测试:
  1. val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
  2. val df1 = spark.createDataFrame(lst).withColumnRenamed("_1", "name1").withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1")
  3. df1.orderBy("age1").show(10)
复制代码
实行的结果如下图所示:

RDD转成DataFrame

DataFrame = RDD[Row] + Schema
  1. val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
  2. val rdd1 = sc.makeRDD(arr).map(f => Row(f._1, f._2, f._3))
  3. val schema = StructType(
  4.   StructField("name", StringType, false) ::
  5.   StructField("age", IntegerType, false) ::
  6.   StructField("height", IntegerType, false) ::
  7.   Nil
  8. )
  9. val schema1 = (new StructType).add("name", "string", false).add("age", "int", false).add("height", "int", false)
  10. val rddToDF = spark.createDataFrame(rdd1, schema)
  11. rddToDF.orderBy(desc("name")).show(false)
复制代码
实行的结果如下图:

RDD转Dataset

Dataset = RDD[case class]
DataFrame = RDD[Row] + Schema
  1. val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
  2. val rdd1 = sc.makeRDD(arr)
  3. val ds2 = spark.createDataset(rdd1)
  4. ds2.show(10)
复制代码
实行的结果如下图:

从文件创建DataFrame

CSV文件

我们生成了一个CSV文件,大致内容如下:

运行测试

  1. val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
  2. df1.printSchema()
  3. df1.show()
复制代码
运行结果如下图所示:

三者转换


Spark SQL 提供了一个领域特定语言(DSL)以方便操纵布局化数据,核心头脑还是SQL,仅仅是一个语法问题。
RDD 与 DataFrame 之间的转换

RDD 转换为 DataFrame

将 RDD 转换为 DataFrame 需要提供数据的模式信息。通常你会利用 toDF() 方法将 RDD 转换为 DataFrame。
这里有两种主要方法:

DataFrame 转换为 RDD:

DataFrame 与 DataSet 之间的转换

DataFrame 转换为 DataSet


DataSet 转换为 DataFrame


RDD 与 DataSet 之间的转换

RDD 转换为 DataSet


DataSet 转换为 RDD


终极汇总



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4