Spark核心之02:RDD、算子分类、常用算子

[复制链接]
发表于 2025-10-21 08:10:10 | 显示全部楼层 |阅读模式
spark内存盘算框架


一、目的


  • 深入明白RDD弹性分布式数据集底层原理
  • 把握RDD弹性分布式数据集的常用算子操纵
二、要点

⭐️1. RDD是什么


  • RDD(Resilient Distributed Dataset)叫做**弹性分布式数据集,是Spark中最根本的数据抽象,它代表一个不可变、可分区、内里的元素可并行盘算**的聚集.

    • Dataset:          就是一个聚集,存储许多数据.
    • Distributed:它内部的元素举行了分布式存储,方便于后期举行分布式盘算.
    • Resilient:     体现弹性,rdd的数据是可以生存在内存大概是磁盘中.

⭐️2. RDD的五大属性



  • (1)A list of partitions

    • 一个分区(Partition)列表,数据集的根本构成单位。

  1.         这里表示一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据,
  2. spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。
  3.         用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值。
  4.     val rdd=sparkContext.textFile("/words.txt")
  5.     如果该文件的block块个数小于等于2,这里生产的RDD分区数就为2
  6.     如果该文件的block块个数大于2,这里生产的RDD分区数就与block块个数保持一致
  7.            
复制代码

  • (2)A function for computing each split

    • 一个盘算每个分区的函数

  1.         Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute计算函数以达到这个目的.
复制代码

  • (3)A list of dependencies on other RDDs

    • 一个rdd会依靠于其他多个rdd

  1.   这里就涉及到rdd与rdd之间的依赖关系,spark任务的容错机制就是根据这个特性(血统)而来。
  2.   
复制代码

  • (4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

    • 一个Partitioner,即RDD的分区函数(可选项)

  1. 当前Spark中实现了两种类型的分区函数,
  2. 一个是基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)
  3. 另外一个是基于范围的RangePartitioner。
  4. 只有对于key-value的RDD,并且产生shuffle,才会有Partitioner,
  5. 非key-value的RDD的Parititioner的值是None。
复制代码

  • (5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

    • 一个列表,存储每个Partition的优先位置(可选项)

  1. 这里涉及到数据的本地性,数据块位置最优。
  2. spark任务在调度的时候会优先考虑存有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。
复制代码
3. 基于spark的单词统计步伐分析rdd的五大属性


  • 需求
    1. HDFS上有一个大小为300M的文件,通过spark实现文件单词统计,最后把结果数据保存到HDFS上
    复制代码
  • 代码
    1. sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")
    复制代码
  • 流程分析

4. RDD的创建方式


  • 1、通过已经存在的scala聚集去构建
    1. val rdd1=sc.parallelize(List(1,2,3,4,5))
    2. val rdd2=sc.parallelize(Array("hadoop","hive","spark"))
    3. val rdd3=sc.makeRDD(List(1,2,3,4))
    复制代码
  • 2、加载外部的数据源去构建
    1. val rdd1=sc.textFile("/words.txt")
    复制代码
  • 3、从已经存在的rdd举行转换天生一个新的rdd
    1. val rdd2=rdd1.flatMap(_.split(" "))
    2. val rdd3=rdd2.map((_,1))
    复制代码
⭐️5. RDD的算子分类


  • 1、transformation(转换)

    • 根据已经存在的rdd转换天生一个新的rdd,  它是延伸加载,它不会立刻实验
    • 比方

      • map / flatMap / reduceByKey 等


  • 2、action (动作)

    • 它会真正触发任务的运行

      • 将rdd的盘算的效果数据返回给Driver端,大概是生存效果数据到外部存储介质中

    • 比方

      • collect / saveAsTextFile 等


6. RDD常见的算子操纵分析

6.1 transformation算子

转换寄义map(func)返回一个新的RDD,该RDD由每一个输入元素颠末func函数转换后构成filter(func)返回一个新的RDD,该RDD由颠末func函数盘算后返回值为true的输入元素构成flatMap(func)雷同于map,但是每一个输入元素可以被映射为0或多个输出元素(以是func应该返回一个序列,而不是单一元素)mapPartitions(func)雷同于map,但独立地在RDD的每一个分片上运行,因此在范例为T的RDD上运行时,func的函数范例必须是Iterator[T] => Iterator[U]mapPartitionsWithIndex(func)雷同于mapPartitions,但func带有一个整数参数体现分片的索引值,因此在范例为T的RDD上运行时,func的函数范例必须是(Int, Interator[T]) => Iterator[U]union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDDintersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDDdistinct([numTasks]))对源RDD举行去重后返回一个新的RDDgroupByKey([numTasks])在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDDreduceByKey(func, [numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,利用指定的reduce函数,将雷同key的值聚合到一起,与groupByKey雷同,reduce任务的个数可以通过第二个可选的参数来设置sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key举行排序的(K,V)的RDDsortBy(func,[ascending], [numTasks])与sortByKey雷同,但是更机动join(otherDataset, [numTasks])在范例为(K,V)和(K,W)的RDD上调用,返回一个雷同key对应的全部元素对在一起的(K,(V,W))的RDDcogroup(otherDataset, [numTasks])在范例为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))范例的RDDcoalesce(numPartitions)镌汰 RDD 的分区数到指定值。repartition(numPartitions)重新给 RDD 分区repartitionAndSortWithinPartitions(partitioner)重新给 RDD 分区,而且每个分区内以记录的 key 排序6.2 action算子

动作寄义reduce(func)reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)构成两个元素,再被传给输入函数,直到末了只有一个值为止。collect()在驱动步伐中,以数组的情势返回数据集的全部元素count()返回RDD的元素个数first()返回RDD的第一个元素(雷同于take(1))take(n)返回一个由数据集的前n个元素构成的数组takeOrdered(n, [ordering])返回天然次序大概自界说次序的前 n 个元素saveAsTextFile(path)将数据集的元素以textfile的情势生存到HDFS文件体系大概其他支持的文件体系,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本saveAsSequenceFile(path)将数据会合的元素以Hadoop sequencefile的格式生存到指定的目次下,可以使HDFS大概其他Hadoop支持的文件体系。saveAsObjectFile(path)将数据集的元素,以 Java 序列化的方式生存到指定的目次下countByKey()针对(K,V)范例的RDD,返回一个(K,Int)的map,体现每一个key对应的元素个数。⭐️foreach(func)在数据集的每一个元素上,运行函数func⭐️foreachPartition(func)在数据集的每一个分区上,运行函数func7. RDD常用的算子操纵演示


  • 为了方便前期的测试和学习,可以利用spark-shell举行演示
    1. spark-shell --master local[2]
    复制代码
7.1 map(Trans转换算子)

**map(func)**返回一个新的RDD,该RDD由每一个输入元素颠末func函数转换后构成
  1. val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
  2. //把rdd1中每一个元素乘以10
  3. rdd1.map(_*10).collect
复制代码
7.2 filter(Trans转换算子)

**filter(func)**返回一个新的RDD,该RDD由颠末func函数盘算后返回值为true的输入元素构成
  1. val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
  2. //把rdd1中大于5的元素进行过滤
  3. rdd1.filter(x => x >5).collect
复制代码
7.3 flatMap(Trans转换算子)

flatMap(func)  雷同于map,但是每一个输入元素可以被映射为0或多个输出元素(以是func应该返回一个序列,而不是单一元素)
  1. val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
  2. //获取rdd1中元素的每一个字母
  3. rdd1.flatMap(_.split(" ")).collect
复制代码
7.4 intersection、union(Trans转换算子)

union(otherDataset)  对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset)  对源RDD和参数RDD求交集后返回一个新的RDD
  1. val rdd1 = sc.parallelize(List(5, 6, 4, 3))
  2. val rdd2 = sc.parallelize(List(1, 2, 3, 4))
  3. //求交集
  4. rdd1.intersection(rdd2).collect
  5. //求并集
  6. rdd1.union(rdd2).collect
复制代码
7.5 distinct(Trans转换算子)

distinct([numTasks]))  对源RDD举行去重后返回一个新的RDD
  1. val rdd1 = sc.parallelize(List(1,1,2,3,3,4,5,6,7))
  2. //去重
  3. rdd1.distinct
复制代码
7.6 join、groupByKey(Trans转换算子)

join(otherDataset, [numTasks])  在范例为(K,V)和(K,W)的RDD上调用,返回一个雷同key对应的全部元素对在一起的(K,(V,W))的RDD
groupByKey([numTasks])  在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
  1. val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
  2. val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
  3. //求join
  4. val rdd3 = rdd1.join(rdd2)
  5. rdd3.collect
  6. //求并集
  7. val rdd4 = rdd1 union rdd2
  8. rdd4.groupByKey.collect
复制代码
7.7 cogroup(Trans转换算子)

cogroup(otherDataset, [numTasks])  在范例为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))范例的RDD
collect()  在驱动步伐中,以数组的情势返回数据集的全部元素
  1. val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
  2. val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
  3. //分组
  4. val rdd3 = rdd1.cogroup(rdd2)
  5. rdd3.collect
  6. //
  7. //res0: Array[(String, (Iterable[Int], Iterable[Int]))] =
  8. //Array(
  9. //    (jim,(CompactBuffer(),CompactBuffer(2))),
  10. //    (tom,(CompactBuffer(1, 2),CompactBuffer(1))),
  11. //    (jerry,(CompactBuffer(3),CompactBuffer(2))),
  12. //    (kitty,(CompactBuffer(2),CompactBuffer()))
  13. //  )
复制代码
7.8 reduce (Action动作算子)

reduce(func)  reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)构成两个元素,再被传给输入函数,直到末了只有一个值为止。
  1. val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
  2. //reduce聚合
  3. val rdd2 = rdd1.reduce(_ + _)
  4. rdd2.collect
  5. val rdd3 = sc.parallelize(List("1","2","3","4","5"))
  6. rdd3.reduce(_+_)
  7. 这里可能会出现多个不同的结果,由于元素在不同的分区中,每一个分区都是一个独立的task线程去运行。这些task运行有先后关系
复制代码
7.9 reduceByKey、sortByKey(Trans转换算子)

groupByKey([numTasks])  在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])  在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,利用指定的reduce函数,将雷同key的值聚合到一起,与groupByKey雷同,reduce任务的个数可以通过第二个可选的参数来设置 ,差别于groupByKey(),reduceByKey会在map端join
sortByKey([ascending], [numTasks])  在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key举行排序的(K,V)的RDD
  1. val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
  2. val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
  3. val rdd3 = rdd1.union(rdd2)
  4. //按key进行聚合
  5. val rdd4 = rdd3.reduceByKey(_ + _)
  6. rdd4.collect
  7. //按value的降序排序
  8. val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
  9. rdd5.collect
复制代码
7.10 repartition、coalesce(Trans转换算子)

coalesce(numPartitions)  镌汰 RDD 的分区数到指定值,默认不会产生shuffle,传入true开启shuffle
repartition(numPartitions)  重新给 RDD 分区,会产生shuffle 相当于coalesce(numPatitions,true)**
  1. val rdd1 = sc.parallelize(1 to 10,3)
  2. //打印rdd1的分区数
  3. rdd1.partitions.size
  4. //利用repartition改变rdd1分区数
  5. //减少分区
  6. rdd1.repartition(2).partitions.size
  7. //增加分区
  8. rdd1.repartition(4).partitions.size
  9. //利用coalesce改变rdd1分区数
  10. //减少分区
  11. rdd1.coalesce(2).partitions.size
  12. //repartition:  重新分区, 有shuffle
  13. //coalesce:     合并分区 / 减少分区         默认不shuffle   
  14. //默认 coalesce 不能扩大分区数量。除非添加true的参数,或者使用repartition。
  15. //适用场景:
  16.     //1、如果要shuffle,都用 repartition
  17.     //2、不需要shuffle,仅仅是做分区的合并,coalesce
  18.     //3、repartition常用于扩大分区。
复制代码
⭐️7.11 map、mapPartitions   、mapPartitionsWithIndex(Trans转换算子)

map(func)  返回一个新的RDD,该RDD由每一个输入元素颠末func函数转换后构成
mapPartitions(func)  雷同于map,但独立地在RDD的每一个分片上运行,因此在范例为T的RDD上运行时,func的函数范例必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)  雷同于mapPartitions,但func带有一个整数参数体现分片的索引值,因此在范例为T的RDD上运行时,func的函数范例必须是(Int, Interator[T]) => Iterator[U]
  1. val rdd1=sc.parallelize(1 to 10,5)
  2. rdd1.map(x => x*10)).collect
  3. rdd1.mapPartitions(iter => iter.map(x=>x*10)).collect
  4. //index表示分区号  可以获取得到每一个元素属于哪一个分区
  5. rdd1.mapPartitionsWithIndex((index,iter)=>iter.map(x=>(index,x)))
  6. map:用于遍历RDD,将函数f应用于每一个元素,返回新的RDD(transformation算子)。
  7. mapPartitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)。
  8. 总结:
  9. 如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效
  10. 比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
复制代码
⭐️7.12 foreach、foreachPartition (Action动作算子)

foreach(func)  在数据集的每一个元素上,运行函数func
foreachPartition(func)  在数据集的每一个分区上,运行函数func
  1. val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
  2. //foreach实现对rdd1里的每一个元素乘10然后打印输出
  3. rdd1.foreach(x=>println(x * 10))
  4. //foreachPartition实现对rdd1里的每一个元素乘10然后打印输出
  5. rdd1.foreachPartition(iter => iter.foreach(x=>println(x * 10)))
  6. foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。
  7. foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。
  8. 总结:
  9. 一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表