【pyspark速成专家】11_Spark性能调优方法2

[复制链接]
发表于 2026-2-8 05:55:27 | 显示全部楼层 |阅读模式


目次
​编辑

二,Spark使命UI监控监控
三,Spark调优案例


二,Spark使命UI监控监控

Spark使命启动后,可以在欣赏器中输入 http://localhost:4040/ 进入到spark web UI 监控监控界面。
该界面中可以从多个维度以直观的方式非常细粒度地查察Spark使命的实行情况,包罗使命进度,耗时分析,存储分析,shuffle数据量巨细等。
最常查察的页面是 Stages页面和Excutors页面。
Jobs: 每一个Action操纵对应一个Job,以Job粒度表现Application进度。偶然间轴Timeline。
Stages: Job在碰到shuffle切开Stage,表现每个Stage进度,以及shuffle数据量。
可以点击某个Stage进入详情页,查察其下面每个Task的实行情况以及各个partition实行的费时统计。

Storage:
监控cache大概persist导致的数据存储巨细。
Environment:
表现spark和scala版本,依赖的各种jar包及其版本
Excutors : 监控各个Excutors的存储和shuffle情况。
SQL: 表现各种SQL下令在那些Jobs中被实行。
三,Spark调优案例

下面先容几个调优的范例案例:
1,资源设置优化
2,利用缓存镌汰重复盘算
3,数据倾斜调优
4,broadcast+map代替join
5,reduceByKey/aggregateByKey代替groupByKey
1,资源设置优化
下面是一个资源设置的例子:
优化前:
  1. #提交python写的任务
  2. spark-submit --master yarn \
  3. --deploy-mode cluster \
  4. --executor-memory 12G \
  5. --driver-memory 12G \
  6. --num-executors 100 \
  7. --executor-cores 8 \
  8. --conf spark.yarn.maxAppAttempts=2 \
  9. --conf spark.task.maxFailures=10 \
  10. --conf spark.stage.maxConsecutiveAttempts=10 \
  11. --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
  12. --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
  13. --archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
  14. --files  data.csv,profile.txt
  15. --py-files  pkg.py,tqdm.py
  16. pyspark_demo.py
复制代码
优化后:这里紧张减小了 executor-cores数目,一样寻常设置为1~4,过大的数目大概会造成每个core盘算和存储资源不敷产生OOM,也会增长GC时间。 别的也将默认分区数调到了1600,并设置了2G的堆外内存。
  1. #提交python写的任务
  2. spark-submit --master yarn \
  3. --deploy-mode cluster \
  4. --executor-memory 12G \
  5. --driver-memory 12G \
  6. --num-executors 100 \
  7. --executor-cores 2 \
  8. --conf spark.yarn.maxAppAttempts=2 \
  9. --conf spark.default.parallelism=1600 \
  10. --conf spark.sql.shuffle.partitions=1600 \
  11. --conf spark.memory.offHeap.enabled=true \
  12. --conf spark.memory.offHeap.size=2g\
  13. --conf spark.task.maxFailures=10 \
  14. --conf spark.stage.maxConsecutiveAttempts=10 \
  15. --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
  16. --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
  17. --archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
  18. --files  data.csv,profile.txt
  19. --py-files  pkg.py,tqdm.py
  20. pyspark_demo.py
复制代码
2, 利用缓存镌汰重复盘算
  1. %%time
  2. # 优化前:
  3. import math
  4. rdd_x = sc.parallelize(range(0,2000000,3),3)
  5. rdd_y = sc.parallelize(range(2000000,4000000,2),3)
  6. rdd_z = sc.parallelize(range(4000000,6000000,2),3)
  7. rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
  8. s = rdd_data.reduce(lambda a,b:a+b+0.0)
  9. n = rdd_data.count()
  10. mean = s/n
  11. print(mean)
  12. %%time
  13. # 优化后:
  14. import math
  15. from  pyspark.storagelevel import StorageLevel
  16. rdd_x = sc.parallelize(range(0,2000000,3),3)
  17. rdd_y = sc.parallelize(range(2000000,4000000,2),3)
  18. rdd_z = sc.parallelize(range(4000000,6000000,2),3)
  19. rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)
  20. s = rdd_data.reduce(lambda a,b:a+b+0.0)
  21. n = rdd_data.count()
  22. mean = s/n
  23. rdd_data.unpersist()
  24. print(mean)
复制代码
3, 数据倾斜调优
  1. %%time
  2. # 优化前:
  3. rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
  4. rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
  5. rdd_one = rdd_word.map(lambda x:(x,1))
  6. rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
  7. print(rdd_count.collect())
  8. %%time
  9. # 优化后:
  10. import random
  11. rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
  12. rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
  13. rdd_one = rdd_word.map(lambda x:(x,1))
  14. rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
  15. rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
  16. rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
  17. print(rdd_count.collect())  
  18. #作者按:此处仅示范原理,单机上该优化方案难以获得性能优势
复制代码
4, broadcast+map代替join
该优化计谋一样寻常限于有一个到场join的rdd的数据量不大的情况。
  1. %%time
  2. # 优化前:
  3. rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
  4. rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
  5. rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))
  6. print(rdd_students.collect())
  7. %%time
  8. # 优化后:
  9. rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
  10. rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
  11. ages = rdd_age.collect()
  12. broads = sc.broadcast(ages)
  13. def get_age(it):
  14.     result = []
  15.     ages = dict(broads.value)
  16.     for x in it:
  17.         name = x[0]
  18.         age = ages.get(name,0)
  19.         result.append((x[0],age,x[1]))
  20.     return iter(result)
  21. rdd_students = rdd_gender.mapPartitions(get_age)
  22. print(rdd_students.collect())
复制代码
5,reduceByKey/aggregateByKey代替groupByKey
groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的归并操纵,大大镌汰了shuffle的数据量。

  1. %%time
  2. # 优化前:
  3. rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
  4.                                ("class1","Ann"),("class1","Jim"),("class2","Lily")])
  5. rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
  6. names = rdd_names.collect()
  7. print(names)
  8. %%time
  9. # 优化后:
  10. rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
  11.                                ("class1","Ann"),("class1","Jim"),("class2","Lily")])
  12. rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)
  13. names = rdd_names.collect()
  14. print(names)
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表