ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓 [打印本页]

作者: tsx81429    时间: 2024-8-24 14:05
标题: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓
点一下关注吧!!!非常感谢!!持续更新!!!

现在已经更新到了:


章节内容

上节完成的内容如下:


配景介绍

这涉及到历程通讯,是必要序列化的,可以简单的以为:SparkContext代表Drive
在实际的开辟过程中会自定一些RDD的操纵,此时必要留意的是:

测试代码

碰到题目

  1. class MyClass1(x: Int) {
  2.   val num = x
  3. }
  4. object SerializableDemo {
  5.   def main (args: Array[String]): Unit = {
  6.     val conf = new SparkConf()
  7.       .setAppName("SerializableDemo")
  8.       .setMaster("local[*]")
  9.     val sc = new SparkContext(conf)
  10.     sc.setLogLevel("WARN")
  11.     val rdd1 = sc.makeRDD(1 to 20)
  12.     def add1(x: Int) = x + 10
  13.     val add2 = add1 _
  14.     // 过程和方法 都具备序列化的能力
  15.     rdd1.map(add1(_)).foreach(println)
  16.     rdd1.map(add2(_)).foreach(println)
  17.     // 普通的类不具备序列化能力
  18.     val object1 = new MyClass1(10)
  19.     // 报错 提示无法序列化
  20.     // rdd1.map(x => object1.num + x).foreach(println)
  21.   }
  22. }
复制代码
办理方案1

  1. case class MyClass2(num: Int)
  2. val object2 = MyClass2(20)
  3. rdd1.map(x => object2.num + x).foreach(println)
复制代码
办理方案2

  1. class MyClass3(x: Int) extends Serializable {
  2.   val num = x
  3. }
  4. val object3 = new MyClass3(30)
  5. rdd1.map(x => object3.num + x).foreach(println)
复制代码
办理方案3

  1. class MyClass1(x: Int) {
  2.   val num = x
  3. }
  4. lazy val object4 = new MyClass1(40)
  5. rdd1.map(x => object4.num + x).foreach(println)
复制代码
完整代码

  1. package icu.wzk
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. class MyClass1(x: Int) {
  4.   val num = x
  5. }
  6. case class MyClass2(num: Int)
  7. class MyClass3(x: Int) extends Serializable {
  8.   val num = x
  9. }
  10. object SerializableDemo {
  11.   def main (args: Array[String]): Unit = {
  12.     val conf = new SparkConf()
  13.       .setAppName("SerializableDemo")
  14.       .setMaster("local[*]")
  15.     val sc = new SparkContext(conf)
  16.     sc.setLogLevel("WARN")
  17.     val rdd1 = sc.makeRDD(1 to 20)
  18.     def add1(x: Int) = x + 10
  19.     val add2 = add1 _
  20.     // 过程和方法 都具备序列化的能力
  21.     rdd1.map(add1(_)).foreach(println)
  22.     rdd1.map(add2(_)).foreach(println)
  23.     // 普通的类不具备序列化能力
  24.     val object1 = new MyClass1(10)
  25.     // 报错 提示无法序列化
  26.     // rdd1.map(x => object1.num + x).foreach(println)
  27.     // 解决方案1 使用 case class
  28.     val object2 = MyClass2(20)
  29.     rdd1.map(x => object2.num + x).foreach(println)
  30.     // 解决方案2 实现 Serializable
  31.     val object3 = new MyClass3(30)
  32.     rdd1.map(x => object3.num + x).foreach(println)
  33.     // 解决方法3 延迟创建
  34.     lazy val object4 = new MyClass1(40)
  35.     rdd1.map(x => object4.num + x).foreach(println)
  36.     sc.stop()
  37.   }
  38. }
复制代码
留意事项


RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量纪录上实验的单个操纵。将创建RDD的一系列Lineage(血统)纪录下来,以便恢复丢失的分区。
RDD的Lineage会纪录RDD的元数据信息和转换举动,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。

RDD和它的依赖的父RDDs的关系有两种差别的类型:


RDD任务切分中心分为:Driver program、Job、Stage(TaskSet) 和 Task

再回WordCount

代码部分

你可以用代码实验,也可以在 SparkShell 中实验。
  1. package icu.wzk
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object ReWordCount {
  4.   def main(args: Array[String]): Unit = {
  5.     val conf = new SparkConf()
  6.       .setAppName("SparkFindFriends")
  7.       .setMaster("local[*]")
  8.     val sc = new SparkContext(conf)
  9.     sc.setLogLevel("WARN")
  10.     val rdd1 = sc.textFile("goodtbl.java")
  11.     val rdd2 = rdd1.flatMap(_.split("\\+"))
  12.     val rdd3 = rdd2.map((_, 1))
  13.     val rdd4 = rdd3.reduceByKey(_ + _)
  14.     val rdd5 = rdd4.sortByKey()
  15.     rdd5.count()
  16.     // 查看RDD的血缘关系
  17.     rdd1.toDebugString
  18.     rdd5.toDebugString
  19.     // 查看依赖
  20.     rdd1.dependencies
  21.     rdd1.dependencies(0).rdd
  22.     rdd5.dependencies
  23.     rdd5.dependencies(0).rdd
  24.    
  25.     sc.stop()
  26.   }
  27. }
复制代码
提出题目

上面的WordCount中,一共有几个Job,几个Stage,几个Task?

答案:1个Job,3个Stage,6个Task
RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即
  1. cache() == persist(StorageLevel.Memory.ONLY)
复制代码
对于其他的存储级别,如下图:



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4