ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark-RDD的宽窄依赖以及Shuffle优化 [打印本页]

作者: 鼠扑    时间: 2024-10-22 19:34
标题: Spark-RDD的宽窄依赖以及Shuffle优化
目次
RDD宽窄依赖的区别
DAG有向无环图
窄依赖
宽依赖
shuffle过程
 Shuffle介绍
 Spark优化-克制shuffle

RDD宽窄依赖的区别


 ​​​​​​

DAG有向无环图

   
  窄依赖

  1. # 判断宽窄依赖
  2. from pyspark import SparkContext
  3. sc = SparkContext()
  4. rdd = sc.parallelize([1,2,3,4,5])
  5. rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
  6. # 演示
  7. rdd3 = rdd.map(lambda x:x*2)
  8. rdd4 = rdd2.groupByKey()
  9. # 查看结果
  10. res = rdd3.collect()
  11. print(res)
复制代码
观察汗青服务:192.1168.88.100:18080


宽依赖

  1. # 判断宽窄依赖
  2. from pyspark import SparkContext
  3. sc = SparkContext()
  4. rdd = sc.parallelize([1,2,3,4,5])
  5. rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
  6. # 演示
  7. rdd3 = rdd.map(lambda x:x*2)
  8. rdd4 = rdd2.groupByKey()
  9. # 查看结果
  10. # res = rdd3.collect()
  11. # print(res)
  12. res = rdd4.collect()
  13. print(res)
复制代码

shuffle过程

无论是Spark照旧MapReduce,shuffle的本质就是通报交换数据
MapReduce

Spark



 

 Shuffle介绍


   
  平凡模式和bypass模式的主要区别在于如何将相同key值的数据放在一起?
  排序 平凡模式采取的计谋
  哈希取余 bypass模式采取的计谋
   Spark优化-克制shuffle

案例一
  1. # 优化计算,减少shuffle
  2. from pyspark import SparkContext
  3. sc = SparkContext()
  4. rdd = sc.parallelize([('男',20),('男',22),('女',19),('女',18)])
  5. # 求不同性别的年龄和
  6. # reduceByKey 是宽依赖算子
  7. rdd2 = rdd.reduceByKey(lambda x,y:x+y)
  8. # 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
  9. boy = sc.accumulator(0)
  10. girl = sc.accumulator(0)
  11. def func(x):
  12.     if x[0] == '男':
  13.         boy.add(x[1])
  14.     else:
  15.         girl.add(x[1])
  16.     return None
  17. rdd3 = rdd.map(func)
  18. # res  = rdd2.collect()
  19. # print(res)
  20. # 触发rdd3计算
  21. rdd3.collect()
  22. print(boy.value)
  23. print(girl.value)
复制代码
案例二
  1. from pyspark import SparkContext
  2. sc = SparkContext()
  3. rdd_kv1 =sc.parallelize([('a',1),('b',2),('c',2),('d',2),('f',2),('w',2)])
  4. rdd_kv2 =sc.parallelize([('a',1),('c',2),('q',2),('o',2)])
  5. # join关联
  6. rdd_join = rdd_kv1.join(rdd_kv2)
  7. # 将rdd_kv数据量较少转为字典数据,然后用多的rdd数据匹配字典
  8. rdd_dict = rdd_kv2.collectAsMap()
  9. print(rdd_dict)
  10. # 匹配字典
  11. def func(x):
  12.     return (x[0],rdd_dict.get(x[0]))
  13. rdd6 = rdd_kv1.map(func).filter(lambda x:x[1] is not None)
  14. # res = rdd_join.collect()
  15. # print(res)
  16. res = rdd6.collect()
  17. print(res)
复制代码
  可以本身定义函数,克制利用宽依赖的groupByKey,reduceByKey,sortByKey

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4