Spark-RDD的宽窄依赖以及Shuffle优化

鼠扑  金牌会员 | 2024-10-22 19:34:29 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 928|帖子 928|积分 2784

目次
RDD宽窄依赖的区别
DAG有向无环图
窄依赖
宽依赖
shuffle过程
 Shuffle介绍
 Spark优化-克制shuffle

RDD宽窄依赖的区别



  • 窄依赖

    • 每个父RDD的一个Partition最多被子RDD的一个Partition所利用

      • map
      • flatMap
      • filter


  • 宽依赖

    • 一个父RDD的Partition会被多个子RDD的Partition所利用

      • groupbykey
      • reducebykey
      • sortBykey

    • 在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle

      • 只要是宽依赖必然发生shuffle
      • 在宽依赖进行数据交换时,只有等待所有分区数据交换完成后,才能进行后续的盘算,非常影响盘算速度


 ​​​​​​

DAG有向无环图

   

  • DAG 管理维护rdd之间依赖关系,保证代码的执行顺序, DAG会根据依赖关系分别stage,每个stage都是一个独立的盘算步调,当发生宽依赖时,会单独拆分一个盘算步调(stage),进行相关数据盘算,可以保证每个单独的stage可以并行执行在发生宽依赖进行shuffle时,会独立的方法执行shuffle盘算,拆分盘算步调的本质是为了保证数据盘算的并行执行.
  • 查看spark的盘算过程,通过DAG判断算子是宽依赖照旧窄依赖
  • 拆分了盘算stage是宽依赖,没有拆分是窄依赖
  • 启动spark的汗青日志
  • start-history-server.sh
  窄依赖

  1. # 判断宽窄依赖
  2. from pyspark import SparkContext
  3. sc = SparkContext()
  4. rdd = sc.parallelize([1,2,3,4,5])
  5. rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
  6. # 演示
  7. rdd3 = rdd.map(lambda x:x*2)
  8. rdd4 = rdd2.groupByKey()
  9. # 查看结果
  10. res = rdd3.collect()
  11. print(res)
复制代码
观察汗青服务:192.1168.88.100:18080


宽依赖

  1. # 判断宽窄依赖
  2. from pyspark import SparkContext
  3. sc = SparkContext()
  4. rdd = sc.parallelize([1,2,3,4,5])
  5. rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
  6. # 演示
  7. rdd3 = rdd.map(lambda x:x*2)
  8. rdd4 = rdd2.groupByKey()
  9. # 查看结果
  10. # res = rdd3.collect()
  11. # print(res)
  12. res = rdd4.collect()
  13. print(res)
复制代码

shuffle过程

无论是Spark照旧MapReduce,shuffle的本质就是通报交换数据
MapReduce


  • mapreduce的shuffle作用: 将map盘算后的数据通报给redue利用
  • mapreduce的shuffle过程: 分区(将相同key的数据放在一个分区,采取hash),排序,合并(规约)

    • 将map盘算的数据通报给reduce

Spark


  • spark中也有shuffle

    • 当执行宽依赖的算子就会进行shuffle
    • 将rdd的数据通报给下一个rdd,进行数据交换



 

 Shuffle介绍



  • spark的shuffle的两个部分

    • shuffle wirte 写
    • shuffle read 读
    • 会进行文件的读写,影响spark的盘算速度

  • spark的shuffle方法类

    • 是spark封装好的处理shuffle的方法
    • hashshuffle 类

      • 进行的是hash盘算
      • spark1.2版本前主要利用,之后引入了sortshuffle
      • spark2.0之后,删除了hashshuffle ,从2.0版本开始利用sortshuffle类
      • 优化的hashshufulle和未优化

    • sortshuffle类

      • 排序方式将相同key值数据放在一起
      • sortshuffle类利用时,有两个方法实现shuffle

        • bypass模式版本和平凡模式版本
        • bypass模式版本不会排序,会进行hash操作
        • 平凡模式版本会排序进行shuffle

      • 可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于即是200 采取bypass,task数量超过200个则利用平凡模式的方法进行shuffle
      • 一个分区对应一个task,所以task数量由分区数决定


   
  平凡模式和bypass模式的主要区别在于如何将相同key值的数据放在一起?
  排序 平凡模式采取的计谋
  哈希取余 bypass模式采取的计谋
   Spark优化-克制shuffle

案例一
  1. # 优化计算,减少shuffle
  2. from pyspark import SparkContext
  3. sc = SparkContext()
  4. rdd = sc.parallelize([('男',20),('男',22),('女',19),('女',18)])
  5. # 求不同性别的年龄和
  6. # reduceByKey 是宽依赖算子
  7. rdd2 = rdd.reduceByKey(lambda x,y:x+y)
  8. # 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
  9. boy = sc.accumulator(0)
  10. girl = sc.accumulator(0)
  11. def func(x):
  12.     if x[0] == '男':
  13.         boy.add(x[1])
  14.     else:
  15.         girl.add(x[1])
  16.     return None
  17. rdd3 = rdd.map(func)
  18. # res  = rdd2.collect()
  19. # print(res)
  20. # 触发rdd3计算
  21. rdd3.collect()
  22. print(boy.value)
  23. print(girl.value)
复制代码
案例二
  1. from pyspark import SparkContext
  2. sc = SparkContext()
  3. rdd_kv1 =sc.parallelize([('a',1),('b',2),('c',2),('d',2),('f',2),('w',2)])
  4. rdd_kv2 =sc.parallelize([('a',1),('c',2),('q',2),('o',2)])
  5. # join关联
  6. rdd_join = rdd_kv1.join(rdd_kv2)
  7. # 将rdd_kv数据量较少转为字典数据,然后用多的rdd数据匹配字典
  8. rdd_dict = rdd_kv2.collectAsMap()
  9. print(rdd_dict)
  10. # 匹配字典
  11. def func(x):
  12.     return (x[0],rdd_dict.get(x[0]))
  13. rdd6 = rdd_kv1.map(func).filter(lambda x:x[1] is not None)
  14. # res = rdd_join.collect()
  15. # print(res)
  16. res = rdd6.collect()
  17. print(res)
复制代码
  可以本身定义函数,克制利用宽依赖的groupByKey,reduceByKey,sortByKey

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表