目次
编辑
二,Spark使命UI监控
三,Spark调优案例
二,Spark使命UI监控
Spark使命启动后,可以在欣赏器中输入 http://localhost:4040/ 进入到spark web UI 监控 界面。
该界面中可以从多个维度以直观的方式非常细粒度地查察Spark使命的实行情况,包罗使命进度,耗时分析,存储分析,shuffle数据量巨细等。
最常查察的页面是 Stages页面和Excutors页面。
Jobs: 每一个Action操纵对应一个Job,以Job粒度表现Application进度。偶然间轴Timeline。
Stages: Job在碰到shuffle切开Stage,表现每个Stage进度,以及shuffle数据量。可以点击某个Stage进入详情页,查察其下面每个Task的实行情况以及各个partition实行的费时统计。
Storage:
监控cache大概persist导致的数据存储巨细。
Environment:
表现spark和scala版本,依赖的各种jar包及其版本。
Excutors : 监控各个Excutors的存储和shuffle情况。
SQL: 表现各种SQL下令在那些Jobs中被实行。
三,Spark调优案例
下面先容几个调优的范例案例:
1,资源设置优化
2,利用缓存镌汰重复盘算
3,数据倾斜调优
4,broadcast+map代替join
5,reduceByKey/aggregateByKey代替groupByKey
1,资源设置优化
下面是一个资源设置的例子:
优化前:
- #提交python写的任务
- spark-submit --master yarn \
- --deploy-mode cluster \
- --executor-memory 12G \
- --driver-memory 12G \
- --num-executors 100 \
- --executor-cores 8 \
- --conf spark.yarn.maxAppAttempts=2 \
- --conf spark.task.maxFailures=10 \
- --conf spark.stage.maxConsecutiveAttempts=10 \
- --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
- --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
- --archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
- --files data.csv,profile.txt
- --py-files pkg.py,tqdm.py
- pyspark_demo.py
复制代码 优化后:这里紧张减小了 executor-cores数目,一样寻常设置为1~4,过大的数目大概会造成每个core盘算和存储资源不敷产生OOM,也会增长GC时间。 别的也将默认分区数调到了1600,并设置了2G的堆外内存。
- #提交python写的任务
- spark-submit --master yarn \
- --deploy-mode cluster \
- --executor-memory 12G \
- --driver-memory 12G \
- --num-executors 100 \
- --executor-cores 2 \
- --conf spark.yarn.maxAppAttempts=2 \
- --conf spark.default.parallelism=1600 \
- --conf spark.sql.shuffle.partitions=1600 \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=2g\
- --conf spark.task.maxFailures=10 \
- --conf spark.stage.maxConsecutiveAttempts=10 \
- --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
- --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
- --archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
- --files data.csv,profile.txt
- --py-files pkg.py,tqdm.py
- pyspark_demo.py
复制代码 2, 利用缓存镌汰重复盘算
- %%time
- # 优化前:
- import math
- rdd_x = sc.parallelize(range(0,2000000,3),3)
- rdd_y = sc.parallelize(range(2000000,4000000,2),3)
- rdd_z = sc.parallelize(range(4000000,6000000,2),3)
- rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
- s = rdd_data.reduce(lambda a,b:a+b+0.0)
- n = rdd_data.count()
- mean = s/n
- print(mean)
- %%time
- # 优化后:
- import math
- from pyspark.storagelevel import StorageLevel
- rdd_x = sc.parallelize(range(0,2000000,3),3)
- rdd_y = sc.parallelize(range(2000000,4000000,2),3)
- rdd_z = sc.parallelize(range(4000000,6000000,2),3)
- rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)
- s = rdd_data.reduce(lambda a,b:a+b+0.0)
- n = rdd_data.count()
- mean = s/n
- rdd_data.unpersist()
- print(mean)
复制代码 3, 数据倾斜调优
- %%time
- # 优化前:
- rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
- rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
- rdd_one = rdd_word.map(lambda x:(x,1))
- rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
- print(rdd_count.collect())
- %%time
- # 优化后:
- import random
- rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
- rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
- rdd_one = rdd_word.map(lambda x:(x,1))
- rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
- rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
- rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
- print(rdd_count.collect())
- #作者按:此处仅示范原理,单机上该优化方案难以获得性能优势
复制代码 4, broadcast+map代替join
该优化计谋一样寻常限于有一个到场join的rdd的数据量不大的情况。
- %%time
- # 优化前:
- rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
- rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
- rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))
- print(rdd_students.collect())
- %%time
- # 优化后:
- rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
- rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
- ages = rdd_age.collect()
- broads = sc.broadcast(ages)
- def get_age(it):
- result = []
- ages = dict(broads.value)
- for x in it:
- name = x[0]
- age = ages.get(name,0)
- result.append((x[0],age,x[1]))
- return iter(result)
- rdd_students = rdd_gender.mapPartitions(get_age)
- print(rdd_students.collect())
复制代码 5,reduceByKey/aggregateByKey代替groupByKey
groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的归并操纵,大大镌汰了shuffle的数据量。
- %%time
- # 优化前:
- rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
- ("class1","Ann"),("class1","Jim"),("class2","Lily")])
- rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
- names = rdd_names.collect()
- print(names)
- %%time
- # 优化后:
- rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
- ("class1","Ann"),("class1","Jim"),("class2","Lily")])
- rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)
- names = rdd_names.collect()
- print(names)
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金 |