点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(正在更新!)
章节内容
上节完成的内容如下:
- SparkSQL介绍
- SparkSQL特点
- SparkSQL数据抽象
- SparkSQL数据类型
SparkSession
在 Spark2.0 之前
- SQLContext 是创建 DataFrame 和 实行SQL的入口
- HiveContext 通过HiveSQL语句操纵Hive数据,兼Hive操纵,HiveContext继承自SQLContext
在 Spark2.0 后
- 这些入口点统一到了SparkSession,SparkSession封装了SQLContext及HiveContext
- 实现了SQLContext即HiveContext所有功能
- 通过SparkSession可以获取到SparkContext
RDD(Resilient Distributed Dataset,弹性分布式数据集)
RDD 是 Spark 的基础抽象,它表示一个不可变的、分布式的数据集。
特点:
- 不可变性:RDD 是不可变的,一旦创建就不能修改。任何对 RDD 的操纵都会生成一个新的 RDD。
- 弹性:RDD 可以自动从节点失败中恢复数据,通过将计算逻辑重新应用到原始数据来重修丢失的数据。
- 分布式:RDD 可以分布在多个节点上实行操纵,充分利用集群的计算能力。
- 延迟计算:RDD 的操纵是延迟实行的(lazy evaluation),即只有在触发行动操纵(如 count()、collect())时,Spark 才会实际实行计算。
- 类型安全:RDD 是类型化的,但它的 API 是松散类型(loosely typed)的,这意味着编译器不会在编译时查抄数据的类型,而是在运行时才会发现类型错误。
DataFrame
DataFrame 是一种基于 RDD 的分布式数据集,它具有定名的列。
特点:
- 布局化数据:DataFrame 是一个二维表格,具有定名的列和行,类似于关系数据库中的表或 Pandas 的 DataFrame。
- 优化引擎:DataFrame 受益于 Spark SQL 引擎的优化,如 Catalyst 优化器,可以自动优化查询并生成高效的实行计划。
- 丰富的 API:DataFrame 提供了一个高层次的 API,支持复杂的查询、过滤、聚合和连接操纵。
- 类型不安全:与 RDD 差别,DataFrame 是动态类型(dynamic typing)的,数据类型查抄是在运行时进行的,因此它在编译时不进行类型查抄。
DataSet
DataSet 是 Spark 1.6 引入的一个新的数据抽象,它结合了 RDD 的强类型优势和 DataFrame 的优化能力。
特点:
- 类型安全:DataSet 是强类型的,它利用编译时类型查抄,确保在编译时检测类型错误。
- 优化和性能:DataSet 受益于 Catalyst 优化器和 Tungsten 实行引擎,提供与 DataFrame 相同的优化能力,同时保留了类型安全性。
- 更丰富的 API:DataSet 提供了 RDD 的大部门 API,如 map、filter 等,同时也支持 SQL 查询。
- 统一 API:DataSet API 统一了 RDD 和 DataFrame,提供了一种更具表现力和安全性的编程模型。
DataFrame & Dataset 创建
不要刻意区分: DF & DS,DF是一种特殊的DS:ds.transformation => ds
由 Range 生成 Dataset
在 spark-shell 中进行测试
- val numDS = spark.range(5, 100, 5)
- // orderBy 转换操作
- numDS.orderBy(desc("id")).show(5)
- // 统计信息
- numDS.describe().show
- // 显示 Schema 信息
- numDS.printSchema
- // 使用RDD执行同样的操作
- numDS.rdd.map(_.toInt).stats
- // 检查分区数
- numDS.rdd.getNumPartitions
复制代码 运行测试的过程如下图所示:
有聚集生成Dataset
Dataset = RDD[case class],在 spark-shell 中进行测试
- case class Person(name: String, age: Int, height: Int)
- // 注意 Seq 中元素的类型
- val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
- val ds1 = spark.createDataset(seq1)
- ds1.printSchema
- ds1.show
复制代码 实行的结果:
再来一个测试:
- val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
- val ds2 = spark.createDataset(seq2)
- ds2.printSchema
- ds2.show
复制代码 实行的结果:
由聚集生成DataFrame
DataFrame = RDD[Row] + Schema
继承进行测试:
- val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
- val df1 = spark.createDataFrame(lst).withColumnRenamed("_1", "name1").withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1")
- df1.orderBy("age1").show(10)
复制代码 实行的结果如下图所示:
RDD转成DataFrame
DataFrame = RDD[Row] + Schema
- val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
- val rdd1 = sc.makeRDD(arr).map(f => Row(f._1, f._2, f._3))
- val schema = StructType(
- StructField("name", StringType, false) ::
- StructField("age", IntegerType, false) ::
- StructField("height", IntegerType, false) ::
- Nil
- )
- val schema1 = (new StructType).add("name", "string", false).add("age", "int", false).add("height", "int", false)
- val rddToDF = spark.createDataFrame(rdd1, schema)
- rddToDF.orderBy(desc("name")).show(false)
复制代码 实行的结果如下图:
RDD转Dataset
Dataset = RDD[case class]
DataFrame = RDD[Row] + Schema
- val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
- val rdd1 = sc.makeRDD(arr)
- val ds2 = spark.createDataset(rdd1)
- ds2.show(10)
复制代码 实行的结果如下图:
从文件创建DataFrame
CSV文件
我们生成了一个CSV文件,大致内容如下:
运行测试
- val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
- df1.printSchema()
- df1.show()
复制代码 运行结果如下图所示:
三者转换
Spark SQL 提供了一个领域特定语言(DSL)以方便操纵布局化数据,核心头脑还是SQL,仅仅是一个语法问题。
RDD 与 DataFrame 之间的转换
RDD 转换为 DataFrame
将 RDD 转换为 DataFrame 需要提供数据的模式信息。通常你会利用 toDF() 方法将 RDD 转换为 DataFrame。
这里有两种主要方法:
- 利用隐式转换:需要导入 spark.implicits._,这允许你在不显式提供模式的情况下将常见的 RDD(如元组)转换为 DataFrame。
- 利用 StructType 定义模式:假如 RDD 的数据布局比较复杂,大概你需要精确控制 DataFrame 的模式,可以利用 StructType 和 Row。
DataFrame 转换为 RDD:
- 将 DataFrame 转换为 RDD 非常简单,只需调用 rdd 方法即可
DataFrame 与 DataSet 之间的转换
DataFrame 转换为 DataSet
- DataFrame 是无类型的,而 DataSet 是类型化的。为了将 DataFrame 转换为 DataSet,你需要定义一个对应的数据类型(通常是一个 case class)并利用 as[T] 方法
DataSet 转换为 DataFrame
- 将 DataSet 转换为 DataFrame 非常简单,只需调用 toDF() 方法即可
RDD 与 DataSet 之间的转换
RDD 转换为 DataSet
- 将 RDD 转换为 DataSet 需要将 RDD 的元素类型与 DataSet 的类型同等。与将 RDD 转换为 DataFrame 类似,通常利用隐式转换或显式提供模式信息
DataSet 转换为 RDD
- DataSet 本质上是类型化的 RDD,因此转换为 RDD 非常直接,只需调用 rdd 方法
终极汇总
- RDD 转换为 DataFrame:利用 toDF(),或利用 createDataFrame() 提供模式。
- DataFrame 转换为 RDD:利用 rdd 方法,转换后元素类型为 Row。
- DataFrame 转换为 DataSet:利用 as[T] 方法,需提供对应的 case class。
- DataSet 转换为 DataFrame:利用 toDF() 方法。
- RDD 转换为 DataSet:利用 toDS(),需提供对应的 case class。
- DataSet 转换为 RDD:利用 rdd 方法。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |