ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark [打印本页]

作者: 千千梦丶琪    时间: 2023-7-28 08:47
标题: Spark
SparkCore

RDD基础

定义

​        在 Spark 的编程接口中,每一个数据集都被表示为一个对象,称为 RDD。RDD 是 Resillient Distributed Dataset(弹性分布式数据集)的简称,是一个只读的(不可变的)、分区的(分布式的)、容错的、延迟计算的、类型推断的和可缓存的记录集合。
结构

​        RDD 由以下五部分组成:
特性

SC与SS

​        SparkContext 从 Spark 1.x 引入的,在 2.0 中引入 SparkSession之前,用来作为 Spark 和 PySpark 的入口点。使用 RDD 编程和连接到 Spark Cluster 的第一步就是创建SparkContext。SparkContext 是在 org.apache.spark 包中定义的,它用于在集群中通过编程方式创建 SparkRDD、累加器和广播变量。
​        虽然 SparkContext 是 2.0 之前一个入口点,但它并没有被 SparkSession 完全取代,SparkContext 的许多特性仍然可用,并在 Spark 2.0 和以后的版本中使用。
​        SparkContext 构造函数在 2.0 中已经弃用,因此建议使用静态方法 getOrCreate()来创建 SparkContext。该函数用于获取或实例SparkContext,并将其注册为一个单例对象。
  1. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  2. //val sc = new SparkContext(sparkConf)
  3. val sc = SparkContext.getOrCreate(sparkConf)
复制代码
创建

  1. val rdd1 = sc.parallelize(
  2. List(1,2,3,4)
  3. )
  4. val rdd2 = sc.makeRDD(
  5. List(1,2,3,4)
  6. )
复制代码
  1. val fileRDD: RDD[String] = sc.textFile("input")
复制代码
  1. // 字符转为大写,得到一个新的 RDD
  2. val rdd5 = rdd4.map(line => line.toUpperCase)
复制代码
并行度与分区

​        默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。分区的数量等于任务的数量,不一定等于并行度。分区数量可以在构建 RDD 时指定。
  1. val dataRDD: RDD[Int] =
  2. sc.makeRDD(
  3. List(1,2,3,4),
  4. 4)
复制代码
​        读取内存数据时,数据分区规则的Spark 核心源码如下。i为第几个分区(从零开始),length为集合长度,numSlices为分区数量。start和end为某一个分区内要存储的集合元素的范围,左闭右开。
  1. val start = ((i*length) / numSlices).toInt
  2. val end = (((i + 1)*length) / numSlices).toInt
复制代码
​        读取文件时,spark读取文件的方式和hadoop类似,一行一行读取。数据读取时以偏移量为单位,区分内存分区中以一个元素为单位,数据读取是全闭。偏移量不会重复读取。如果数据源为多个文件,那么计算分区时以文件为单位进行分区。具体Spark核心源码如下。totalSize表示读取文件的字节数的总和,goalSize表示每个分区应该存放多少个字节。如果除不尽的话,比较剩余字节数占goalSize的多少,如果大于%10,则需要产生新的分区,参考Hadoop。
  1. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits)
复制代码
操作RDD

​        RDD 操作分为两种类型:转换(Transformation)和动作(action)。转换(Transformation)是用来创建RDD的方法,而动作(action)是使用RDD的方法。
转换算子

map

​        将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
  1. val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
  2. val dataRDD1: RDD[Int] = dataRDD.map(
  3. num => {
  4.         num * 2
  5. }
  6. )
  7. val dataRDD2: RDD[String] = dataRDD1.map(
  8. num => {
  9.         "" + num
  10. }
  11. )     
复制代码
​        map 和 foreach 的区别?
mapPartitions

​        将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
  1. val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
  2. datas => {
  3. datas.filter(x => x!=2)
  4. }
  5. )
复制代码
​        思考一个问题:map 和 mapPartitions 的区别?
​        Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
​        Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
​        Map 算子因为类似于串行操作,所以性能比较低,而mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
mapPartitionsWithIndex

​        将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
  1. val mpiRDD = rdd.mapPartitionsWithIndex(
  2.         (index, iter) => {
  3.                 if ( index == 1 ) {
  4.                         iter
  5.                 } else {
  6.                         Nil.iterator
  7.                 }
  8.         }
  9. )
复制代码
flatMap

​        对集合中每个元素进行操作然后再扁平化。,所以算子也称之为扁平映射。
  1. val dataRDD = sparkContext.makeRDD(List(
  2. List(1,2),List(3,4)
  3. ),1)
  4. val dataRDD1 = dataRDD.flatMap(
  5. list => list
  6. )
复制代码
glom

​        将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 。
  1. val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
  2. val glomRDD: RDD[Array[Int]] = rdd.glom()
  3. val maxRDD: RDD[Int] = glomRDD.map(
  4.         array => {
  5.                 array.max
  6.         }
  7. )
复制代码
groupBy

​        将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中,一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。
  1. val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
  2. val groupRDD = rdd.groupBy(num => {num%2})
复制代码
filter

​        将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
  1. val rdd = sc.makeRDD(List(1,2,3,4))
  2. val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)
复制代码
sample

​        根据指定的规则从数据集中抽取数据。
  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4
  3. ),1)
  4. // 抽取数据不放回(伯努利算法)
  5. // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
  6. // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
  7. // 第一个参数:抽取的数据是否放回,false:不放回
  8. // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
  9. // 第三个参数:随机数种子
  10. val dataRDD1 = dataRDD.sample(false, 0.5)
  11. // 抽取数据放回(泊松算法)
  12. // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
  13. // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
  14. // 第三个参数:随机数种子
  15. val dataRDD2 = dataRDD.sample(true, 2)
复制代码
distinct

​        将数据集中重复的数据去重。
  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),1)
  4. val dataRDD1 = dataRDD.distinct()
复制代码
coalesce

​        根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。
  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),6)
  4. val dataRDD1 = dataRDD.coalesce(2)
复制代码
repartition

​        该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。repartition常用于增加分区,coalesce常用于减小分区。
  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),2)
  4. val dataRDD1 = dataRDD.repartition(4)
复制代码
sortBy

​        该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
  1. val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)
  2. val newRDD = rdd.sortBy(t=>t._1.toInt, false)
复制代码
intersection

​        对源 RDD 和参数 RDD 求交集后返回一个新的 RDD。
  1. val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
  3. val dataRDD = dataRDD1.intersection(dataRDD2)
复制代码
union

​        对源 RDD 和参数 RDD 求并集后返回一个新的 RDD。
  1. val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
  3. val dataRDD = dataRDD1.union(dataRDD2)
复制代码
subtract

​        以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集。
  1. val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
  3. val dataRDD = dataRDD1.subtract(dataRDD2)
复制代码
zip

​        将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
  1. val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
  3. val dataRDD = dataRDD1.subtract(dataRDD2)
复制代码
partitionBy

​        将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。
  1. val rdd: RDD[(Int, String)] =
  2. sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
  3. val rdd2: RDD[(Int, String)] =
  4. rdd.partitionBy(new HashPartitioner(2))
复制代码
reduceByKey

​        可以将数据按照相同的 Key 对 Value 进行聚合。
  1. val rdd = sc.makeRDD(List(
  2.         ("a", 1), ("a", 2), ("a", 3), ("b", 4)
  3. ))
  4. val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {
  5.         x + y
  6. } )
复制代码
groupByKey

​        将数据源的数据根据 key 对 value 进行分组。
  1. val rdd = sc.makeRDD(List(
  2.         ("a", 1), ("a", 2), ("a", 3), ("b", 4)
  3. ))
  4. val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
  5. //打印
  6. //(a,CompactBuffer(1, 2, 3))
  7. //(b,CompactBuffer(4))
复制代码

​        reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
​        reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey。
​                (2)后者的返回值是一个迭代器,且迭代器中的元素是value,而前者迭代器中的元素没有改变。
​                (3)后者只能针对Key进行分组,前者没有限制。
aggregateByKey

​        将数据根据不同的规则进行分区内计算和分区间计算。
  1. val rdd = sc.makeRDD(List(
  2.         ("a", 1), ("a", 2), ("a", 3), ("a", 4)
  3. ),2)
  4. // 第一个参数列表,需要传递一个参数,表示为初始值
  5. //             主要用于当碰见第一个key的时候,和value进行分区内计算
  6. // 第二个参数列表需要传递2个参数
  7. //      第一个参数表示分区内计算规则
  8. //      第二个参数表示分区间计算规则
  9. rdd.aggregateByKey(10)(
  10.         (x, y) => math.max(x, y),
  11.         (x, y) => x + y
  12. ).collect.foreach(println)
复制代码
foldByKey

​        当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey。
  1. val rdd = sc.makeRDD(List(
  2.         ("a", 1), ("a", 2), ("b", 3),
  3.         ("b", 4), ("b", 5), ("a", 6)
  4. ),2)
  5. rdd.foldByKey(0)((x, y) => x+y).collect.foreach(println)
复制代码
combineByKey

​        最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。
  1. val rdd = sc.makeRDD(List(
  2.         ("a", 1), ("a", 2), ("b", 3),
  3.         ("b", 4), ("b", 5), ("a", 6)
  4. ),2)
  5. // combineByKey : 方法需要三个参数
  6. // 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
  7. // 第二个参数表示:分区内的计算规则
  8. // 第三个参数表示:分区间的计算规则
  9. val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(
  10.         v => (v, 1),
  11.         ( t:(Int, Int), v ) => {
  12.                 (t._1 + v, t._2 + 1)
  13.         },
  14.         (t1:(Int, Int), t2:(Int, Int)) => {
  15.                 (t1._1 + t2._1, t1._2 + t2._2)
  16.         }
  17. )
复制代码
​        思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
​        相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同。
​        相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同 。
​        相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同。
​        当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
sortByKey

​        在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的。
  1. val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
  2. val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
  3. val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
复制代码
join

​        在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的。
  1. val rdd1 = sc.makeRDD(List(
  2.         ("a", 1), ("a", 2), ("c", 3)
  3. ))
  4. val rdd2 = sc.makeRDD(List(
  5.         ("a", 5), ("c", 6),("a", 4)
  6. ))
  7. val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
  8. joinRDD.collect().foreach(println)
  9. // (a,(1,5))
  10. // (a,(1,4))
  11. // (a,(2,5))
  12. // (a,(2,4))
  13. // (c,(3,6))
复制代码
leftOuterJoin

​        类似于 SQL 语句的左外连接 。
  1. val rdd1 = sc.makeRDD(List(
  2.         ("a", 1), ("b", 2)
  3. ))
  4. val rdd2 = sc.makeRDD(List(
  5.         ("a", 4), ("b", 5), ("b", 3), ("c", 6)
  6. ))
  7. val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
  8. rightJoinRDD.collect().foreach(println)
  9. // (a,(Some(1),4))
  10. // (b,(Some(2),5))
  11. // (b,(Some(2),3))
  12. // (c,(None,6))
复制代码
cogroup

​        在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD。
  1. val rdd1 = sc.makeRDD(List(
  2.         ("a", 1), ("b", 2)
  3. ))
  4. val rdd2 = sc.makeRDD(List(
  5.         ("a", 4), ("b", 5), ("c", 7), ("c", 6)
  6. ))
  7. val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
  8. cgRDD.collect().foreach(println)
  9. // (a,(CompactBuffer(1),CompactBuffer(4)))
  10. // (b,(CompactBuffer(2),CompactBuffer(5)))
  11. // (c,(CompactBuffer(),CompactBuffer(6, 7)))
复制代码
动作算子

reduce

​        聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. // 聚合数据
  3. val reduceResult: Int = rdd.reduce((x, y) => x+y)
复制代码
collect

​        在驱动程序中,以数组 Array 的形式返回数据集的所有元素。
  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. rdd.collect().foreach(println)
复制代码
count

​        返回 RDD 中元素的个数。
  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. val countResult: Long = rdd.count()
复制代码
first

​        返回 RDD 中的第一个元素。
  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. val firstResult: Int = rdd.first()
  3. println(firstResult)
复制代码
take

​        返回一个由 RDD 的前 n 个元素组成的数组。
  1. vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. val takeResult: Array[Int] = rdd.take(2)
  3. println(takeResult.mkString(","))
复制代码
takeOrdered

​        返回该 RDD 排序后的前 n 个元素组成的数组。
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object RddTakeOrderedDemo {
  3.   def main(args: Array[String]): Unit = {
  4.     val conf = new SparkConf().setMaster("local").setAppName("RddTakeOrderedDemo")
  5.     val sc = new SparkContext(conf)
  6.     val rdd = sc.makeRDD(1 to 10)
  7.     val ord = new MyOrdering()
  8.     rdd.takeOrdered(3)(ord).foreach(println)
  9.     sc.stop()
  10.   }
  11.   class MyOrdering extends Ordering[Int] {
  12.     override def compare(x: Int, y: Int): Int = {
  13.       if (x < y) 1 else if (x == y) 0 else -1
  14.     }
  15.   }
  16. }
复制代码
aggregate

​        分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
  1. val rdd = sc.makeRDD(List(1,2,3,4),2)
  2. val result = rdd.aggregate(10)((x, y) => x+y, (x, y) => x+y)
  3. // 40
复制代码
fold

​        折叠操作,aggregate 的简化版操作。
  1. val rdd = sc.makeRDD(List(1,2,3,4),2)
  2. val result = rdd.fold(10)((x, y) => x+y)
  3. // 40
复制代码
countByKey

​        统计每种 key 的个数。
  1. val rdd = sc.makeRDD(List(
  2.         ("a", 1),("a", 2),("a", 3)
  3. ))
  4. val stringToLong: collection.Map[String, Long] =rdd.countByKey()
  5. println(stringToLong)
复制代码
countByValue

​        和countByKey类似。
  1. def wordcount8(sc : SparkContext): Unit = {
  2.     val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
  3.     val words = rdd.flatMap(_.split(" "))
  4.     val wordCount: collection.Map[String, Long] = words.countByValue()
  5. }
复制代码
mapValues

​        不管Key,直接计算value。
  1. val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
  2.     case (num, cnt) => {
  3.         num / cnt
  4.     }
  5. }
复制代码
RDD序列化

​        从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object Spark07_RDD_Operator_Action {
  3.     def main(args: Array[String]): Unit = {
  4.         val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  5.         val sc = new SparkContext(sparkConf)
  6.         val rdd = sc.makeRDD(List[Int](1,2,3))
  7.         val user = new User()
  8.         // SparkException: Task not serializable
  9.         // NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User
  10.         // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
  11.         // 闭包检测
  12.         rdd.foreach(
  13.             num => {
  14.                 println("age = " + (user.age + num))
  15.             }
  16.         )
  17.         sc.stop()
  18.     }
  19.     //class User extends Serializable {
  20.     // 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
  21.     //case class User() {
  22.     class User {
  23.         var age : Int = 30
  24.     }
  25. }
复制代码
​        Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
  1. object serializable_Kryo {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf()
  4. .setAppName("SerDemo")
  5. .setMaster("local[*]")
  6. // 替换默认的序列化机制
  7. .set("spark.serializer",
  8. "org.apache.spark.serializer.KryoSerializer")
  9. // 注册需要使用 kryo 序列化的自定义类
  10. .registerKryoClasses(Array(classOf[Searcher]))
  11. val sc = new SparkContext(conf)
  12. val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu",
  13. "atguigu", "hahah"), 2)
  14. val searcher = new Searcher("hello")
  15. val result: RDD[String] = searcher.getMatchedRDD1(rdd)
  16. result.collect.foreach(println)
  17. }
  18. }
  19. case class Searcher(val query: String) {
  20. def isMatch(s: String) = {
  21. s.contains(query)
  22. }
  23. def getMatchedRDD1(rdd: RDD[String]) = {
  24. rdd.filter(isMatch)
  25. }
  26. def getMatchedRDD2(rdd: RDD[String]) = {
  27. val q = query
  28. rdd.filter(_.contains(q))
  29. }
  30. }
复制代码
RDD依赖关系

​        相邻的两个RDD的关系称之为依赖关系,多个连续的RDD的依赖关系,称之为血缘关系。
​        宽依赖:宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
​        窄依赖:窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
​        RDD阶段划分:DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

​        任务划分:RDD 任务切分中间分为:Application、Job、Stage 和 Task
​        注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
RDD持久化

Cache、Persist缓存

​        RDD的对象可以重用,但是数据无法重用,如果一个RDD需要重复使用,那么根据血缘关系,需要从头再次执行来获取数据。
​        RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
  1. val list = List("Hello Scala", "Hello Spark")
  2. val rdd = sc.makeRDD(list)
  3. val flatRDD = rdd.flatMap(_.split(" "))
  4. val mapRDD = flatRDD.map(word=>{
  5.         (word,1)
  6. })
  7. mapRDD.persist(StorageLevel.DISK_ONLY)
  8. val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
  9. reduceRDD.collect().foreach(println)
  10. println("**************************************")
  11. val groupRDD = mapRDD.groupByKey()
  12. groupRDD.collect().foreach(println)
复制代码
​        存储级别如下如所示。

​        缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
​        Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
CheckPoint检查点

​        所谓的检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
  1. // 设置检查点路径
  2. sc.setCheckpointDir("./checkpoint1")
  3. // 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
  4. val lineRdd: RDD[String] = sc.textFile("input/1.txt")
  5. // 业务逻辑
  6. val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
  7. val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
  8. word => {
  9. (word, System.currentTimeMillis())
  10. }
  11. }
  12. // 增加缓存,避免再重新跑一个 job 做 checkpoint
  13. wordToOneRdd.cache()
  14. // 数据检查点:针对 wordToOneRdd 做检查点计算
  15. wordToOneRdd.checkpoint()
  16. // 触发执行逻辑
  17. wordToOneRdd.collect().foreach(println)
复制代码
区别



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4