ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Spark-RDD的宽窄依赖以及Shuffle优化
[打印本页]
作者:
鼠扑
时间:
2024-10-22 19:34
标题:
Spark-RDD的宽窄依赖以及Shuffle优化
目次
RDD宽窄依赖的区别
DAG有向无环图
窄依赖
宽依赖
shuffle过程
Shuffle介绍
Spark优化-克制shuffle
RDD宽窄依赖的区别
窄依赖
每个父RDD的一个Partition最多被子RDD的一个Partition所利用
map
flatMap
filter
宽依赖
一个父RDD的Partition会被多个子RDD的Partition所利用
groupbykey
reducebykey
sortBykey
在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle
只要是宽依赖必然发生shuffle
在宽依赖进行数据交换时,只有等待所有分区数据交换完成后,才能进行后续的盘算,非常影响盘算速度
DAG有向无环图
DAG 管理维护rdd之间依赖关系,保证代码的执行顺序, DAG会根据依赖关系分别stage,每个stage都是一个独立的盘算步调,当发生宽依赖时,会单独拆分一个盘算步调(stage),进行相关数据盘算,可以保证每个单独的stage可以并行执行在发生宽依赖进行shuffle时,会独立的方法执行shuffle盘算,拆分盘算步调的本质是为了保证数据盘算的并行执行.
查看spark的盘算过程,通过DAG判断算子是宽依赖照旧窄依赖
拆分了盘算stage是宽依赖,没有拆分是窄依赖
启动spark的汗青日志
start-history-server.sh
窄依赖
# 判断宽窄依赖
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
# 演示
rdd3 = rdd.map(lambda x:x*2)
rdd4 = rdd2.groupByKey()
# 查看结果
res = rdd3.collect()
print(res)
复制代码
观察汗青服务:192.1168.88.100:18080
宽依赖
# 判断宽窄依赖
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
# 演示
rdd3 = rdd.map(lambda x:x*2)
rdd4 = rdd2.groupByKey()
# 查看结果
# res = rdd3.collect()
# print(res)
res = rdd4.collect()
print(res)
复制代码
shuffle过程
无论是Spark照旧MapReduce,shuffle的本质就是通报交换数据
MapReduce
mapreduce的shuffle作用: 将map盘算后的数据通报给redue利用
mapreduce的shuffle过程: 分区(将相同key的数据放在一个分区,采取hash),排序,合并(规约)
将map盘算的数据通报给reduce
Spark
spark中也有shuffle
当执行宽依赖的算子就会进行shuffle
将rdd的数据通报给下一个rdd,进行数据交换
Shuffle介绍
spark的shuffle的两个部分
shuffle wirte 写
shuffle read 读
会进行文件的读写,影响spark的盘算速度
spark的shuffle方法类
是spark封装好的处理shuffle的方法
hashshuffle 类
进行的是hash盘算
spark1.2版本前主要利用,之后引入了sortshuffle
spark2.0之后,删除了hashshuffle ,从2.0版本开始利用sortshuffle类
优化的hashshufulle和未优化
sortshuffle类
排序方式将相同key值数据放在一起
sortshuffle类利用时,有两个方法实现shuffle
bypass模式版本和平凡模式版本
bypass模式版本不会排序,会进行hash操作
平凡模式版本会排序进行shuffle
可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于即是200 采取bypass,task数量超过200个则利用平凡模式的方法进行shuffle
一个分区对应一个task,所以task数量由分区数决定
平凡模式和bypass模式的主要区别在于如何将相同key值的数据放在一起?
排序 平凡模式采取的计谋
哈希取余 bypass模式采取的计谋
Spark优化-克制shuffle
案例一
# 优化计算,减少shuffle
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([('男',20),('男',22),('女',19),('女',18)])
# 求不同性别的年龄和
# reduceByKey 是宽依赖算子
rdd2 = rdd.reduceByKey(lambda x,y:x+y)
# 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
boy = sc.accumulator(0)
girl = sc.accumulator(0)
def func(x):
if x[0] == '男':
boy.add(x[1])
else:
girl.add(x[1])
return None
rdd3 = rdd.map(func)
# res = rdd2.collect()
# print(res)
# 触发rdd3计算
rdd3.collect()
print(boy.value)
print(girl.value)
复制代码
案例二
from pyspark import SparkContext
sc = SparkContext()
rdd_kv1 =sc.parallelize([('a',1),('b',2),('c',2),('d',2),('f',2),('w',2)])
rdd_kv2 =sc.parallelize([('a',1),('c',2),('q',2),('o',2)])
# join关联
rdd_join = rdd_kv1.join(rdd_kv2)
# 将rdd_kv数据量较少转为字典数据,然后用多的rdd数据匹配字典
rdd_dict = rdd_kv2.collectAsMap()
print(rdd_dict)
# 匹配字典
def func(x):
return (x[0],rdd_dict.get(x[0]))
rdd6 = rdd_kv1.map(func).filter(lambda x:x[1] is not None)
# res = rdd_join.collect()
# print(res)
res = rdd6.collect()
print(res)
复制代码
可以本身定义函数,克制利用宽依赖的groupByKey,reduceByKey,sortByKey
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4