ToB企服应用市场:ToB评测及商务社交产业平台
标题:
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓
[打印本页]
作者:
tsx81429
时间:
2024-8-24 14:05
标题:
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓
点一下关注吧!!!非常感谢!!持续更新!!!
现在已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节完成的内容如下:
Spark Super Word Count 步伐 Scala语言编写
将数据写入MySQL、不写入MySQL等编码方式
代码的具体解释与效果
配景介绍
这涉及到历程通讯,是必要序列化的,可以简单的以为:SparkContext代表Drive
在实际的开辟过程中会自定一些RDD的操纵,此时必要留意的是:
初始化工作是Driver端进行的
实际运行步伐是Executor端进行的
测试代码
碰到题目
class MyClass1(x: Int) {
val num = x
}
object SerializableDemo {
def main (args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SerializableDemo")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1 = sc.makeRDD(1 to 20)
def add1(x: Int) = x + 10
val add2 = add1 _
// 过程和方法 都具备序列化的能力
rdd1.map(add1(_)).foreach(println)
rdd1.map(add2(_)).foreach(println)
// 普通的类不具备序列化能力
val object1 = new MyClass1(10)
// 报错 提示无法序列化
// rdd1.map(x => object1.num + x).foreach(println)
}
}
复制代码
办理方案1
case class MyClass2(num: Int)
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)
复制代码
办理方案2
class MyClass3(x: Int) extends Serializable {
val num = x
}
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)
复制代码
办理方案3
class MyClass1(x: Int) {
val num = x
}
lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)
复制代码
完整代码
package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
class MyClass1(x: Int) {
val num = x
}
case class MyClass2(num: Int)
class MyClass3(x: Int) extends Serializable {
val num = x
}
object SerializableDemo {
def main (args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SerializableDemo")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1 = sc.makeRDD(1 to 20)
def add1(x: Int) = x + 10
val add2 = add1 _
// 过程和方法 都具备序列化的能力
rdd1.map(add1(_)).foreach(println)
rdd1.map(add2(_)).foreach(println)
// 普通的类不具备序列化能力
val object1 = new MyClass1(10)
// 报错 提示无法序列化
// rdd1.map(x => object1.num + x).foreach(println)
// 解决方案1 使用 case class
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)
// 解决方案2 实现 Serializable
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)
// 解决方法3 延迟创建
lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)
sc.stop()
}
}
复制代码
留意事项
假如在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化
延迟创建的办理方案比力简单,且实用性广
RDD依赖关系
基本概念
RDD 只支持粗粒度转换,即在大量纪录上实验的单个操纵。将创建RDD的一系列Lineage(血统)纪录下来,以便恢复丢失的分区。
RDD的Lineage会纪录RDD的元数据信息和转换举动,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。
RDD和它的依赖的父RDDs的关系有两种差别的类型:
窄依赖(narrow dependency):1:1或n:1
宽依赖(wide dependency):n:m 意味着有 shuflle
RDD任务切分中心分为:Driver program、Job、Stage(TaskSet) 和 Task
Driver program:初始化一个SparkContext即天生一个Spark应用
Job:一个Action算子就会天生一个Job
Stage:根据RDD之间的依赖关系差别将Job划分成差别的Stage,碰到一个宽依赖则划分一个Stage
Task:Stage是一个TaskSet,将Stage划分的效果发送到差别的Executor实验即为一个Task
Task是Spark中任务调治的最小单位,每个Stage包罗很多Task,这些Task实验的盘算逻辑是相同的,盘算的数据是差别的
DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系
再回WordCount
代码部分
你可以用代码实验,也可以在 SparkShell 中实验。
package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
object ReWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SparkFindFriends")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1 = sc.textFile("goodtbl.java")
val rdd2 = rdd1.flatMap(_.split("\\+"))
val rdd3 = rdd2.map((_, 1))
val rdd4 = rdd3.reduceByKey(_ + _)
val rdd5 = rdd4.sortByKey()
rdd5.count()
// 查看RDD的血缘关系
rdd1.toDebugString
rdd5.toDebugString
// 查看依赖
rdd1.dependencies
rdd1.dependencies(0).rdd
rdd5.dependencies
rdd5.dependencies(0).rdd
sc.stop()
}
}
复制代码
提出题目
上面的WordCount中,一共有几个Job,几个Stage,几个Task?
答案:1个Job,3个Stage,6个Task
RDD持久化/缓存
基本概念
涉及到的算子:persis、cache、unpersist 都是 Transformation
缓存是将盘算效果写入差别的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,现在支持内存、堆外内存、磁盘)
通过缓存,Spark避免了RDD上的重复盘算,能够极大提升盘算的速率。
RDD持久化或缓存,是Spark最告急的特征之一,Spark构建迭代算法和快速交互式查询的关键所在
Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把盘算的分片效果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速
使用 persist() 方法将一个RDD标记为持久化,之以是说“标记持久化”是因为使用persist()的地方,并不会马上盘算天生RDD并把它持久化,而是要等碰到第一个行动操纵出发真正盘算后,才会把盘算效果进行持久化。
一样平常情况下,假如多个动作必要用到某个RDD,而它的盘算代价又比力高,那么就应该把这个RDD缓存起来
缓存有大概丢失,大概存储在内存由于空间不敷而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成盘算。通过基于RDD的一系列的转换,丢失的数据会被重算。
RDD各个Partition是相对独立的,因为只必要盘算丢失的部分即可,而不是必要重算全部的Partition
持久化级别
使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即
cache() == persist(StorageLevel.Memory.ONLY)
复制代码
对于其他的存储级别,如下图:
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
DISK_ONLY_2
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4