【大数据】基于Spark WebUI定位数据倾斜与Spark调优
- 文章脉络
- 数据倾斜现象
- 数据倾斜出现原因
- 数据倾斜问题排查与定位
- 基本思路
- (首选尝试)解决方案一:进步Shuffle操作的并行度
- (其次尝试) 解决方案二:利用 Hive ETL 预处置惩罚数据
- (数据非常时)解决方案二:过滤少数导致倾斜的key
- (聚合Shuffle倾斜时)解决方案四:两阶段聚合(局部聚合+全局聚合)
- (大表Join小表)解决方案五:将reduce join转为map join 【广播】
- (大量倾斜Key)解决方案六:利用随机前缀和扩容RDD进行join【加盐】
作为一名算法工程师,把握Spark框架的调优与原理是必备技能之一,而Spark任务的调优通常与**“数据倾斜”**这个概念相关。
在阅读本文之前,各人最好具备以下知识:
Spark基本原理:相识Spark的运行机制、核心概念(如RDD、DataFrame、Dataset)、基本操作(如transformation和action)。
Spark集群架构:认识Spark集群的组件(如Driver、Executor、Master和Worker)及其相互关系。
Spark WebUI:相识Spark WebUI的基本功能和界面结构,能够通过WebUI监控和调试Spark作业。
【留意】Spark与Spark WebUI相关知识可以参考以下两篇文章:
《MapReduce的“内存增强版”——Spark》、《Spark–一文相识WebUI》
文章脉络
图1 文章脉络
数据倾斜现象
首先要明确一点:一个Spark任务,会拆分为多个Job,一个Job会拆分为多个Stage,一个Stage又会由若干Task来实行。我们所说的数据倾斜现象,都是在Task层面观测与优化的。
在实行Spark任务时,任务流通常由多个SQL查询或代码模块组成,这些模块会被划分为独立的作业(Jobs)。比方,数据的读取和写入操作通常分别属于不同的Jobs。由于数据在分布式文件体系中是以分区(Partition)的情势存储的,Shuffle操作涉及到跨分区的“宽依赖”关系。因此,任务的实行计划通常以Shuffle操作为界,将任务划分为不同的实行阶段(Stages)。在每个实行阶段内,会并发实行多个任务(Tasks),这些任务是并行处置惩罚数据的基本工作单位。
对 Spark/Hadoop 如许的大数据体系来讲,数据量大并不可骇,可骇的是数据倾斜。何谓数据倾斜?数据倾斜指的是,并行处置惩罚的数据集中,某一部分(如 Spark 或 Kafka的一个 Partition)的数据显着多于其它部分,从而使得该部分的处置惩罚速度成为整个数据集处置惩罚的瓶颈。
【留意】可以理解为少数Task被分配了更多的数据以待处置惩罚,导致其他Task早早实行完盘算资源空闲了下来。
若不解决数据倾斜问题,任何性能优化步伐都将是徒劳,乃至可被视为无效的尝试。数据倾斜的解决能力最能体现大数据工程师在Spark领域的技术深度和调优技巧。
数据倾斜可能带来以下两个直接且严重的结果:
1. 内存溢出(OOM):由于某个任务处置惩罚的数据量过大,可能导致实行任务的节点内存不足,进而引发任务崩溃。
2. 非常性能下降:数据处置惩罚速度将变得极其缓慢,远低于预期,达到不可担当的低服从水平,严重影响整体作业的完成时间。
数据倾斜出现原因
我们首先来先容Spark中RDD的几种依赖模式:
图2 Spark中RDD的几种依赖模式
参照图2,将上三种称之为窄依赖,下两种为宽依赖。
我们可以观察到,窄依赖的每个父RDD的分区都至多被一个子RDD的分区使操作,不涉及Shuffle操作。如map、filter、union和co-partitioned join在RDD之间创建了简朴的一对一(One-to-One)依赖关系,因此不会引起数据倾斜。
然而,宽依赖的多个子RDD的分区依赖一个父RDD的分区,涉及Shuffle操作。如图2所示的其他操作,比方groupBy、reduceByKey或join with not co-partitioned,会创建更为复杂的依赖关系,其中单个父RDD的分区可能被多个子RDD的分区所依赖。这种情况下,如果某个Task对应的父RDD分区中的数据量显着大于其他分区,就会触发数据倾斜现象。
图3 某个Job划分Stage的图例
如图3,一个Job被划分为3个Stage。
参考图3中的Stage 0与Stage 1,它们内部没有Shuffle操作,Task可以并行地实行,因此并不会发生数据倾斜。数据倾斜只会在Shuffle时才可能发生。
也正是由于Shuffle操作会跨分区处置惩罚数据,它没法简朴的像窄依赖一样简朴地并行实行,因此在划分Stage时才会根据Shuffle操作为界限。
【留意】其实宽依赖窄依赖都是较为简朴的概念,业界有许多对RDD依赖处置惩罚的论文,实现都很复杂,各人对RDD依赖有个初步认识、能看懂Spark WebUI里面的DAG即可。除此之外,到底什么样的写法会产生Shuffle,照旧以Spark WebUI生成的DAG图为准。
截至目前,我们先容了数据倾斜只可能发生在Shuffle时,那么Shuffle时导致数据倾斜的原理是什么呢?
其实很简朴:在进行Shuffle的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处置惩罚,比如按照key进行聚合或 join 等操作。此时如果某个 key对应的数据量特殊大的话,就会发生数据倾斜。比如大部分 key 对应 10 条数据,但是个别key却对应了 100 万条数据,那么大部分 task 可能就只会分配到 10 条数据,然后 1 秒钟就运行完了;但是个别 task 可能分配到了 100 万数据,要运行一两个小时。因此,整个 Spark 作业的运行进度是由运行时间最长的那个 task 决定的。
因此出现数据倾斜的时候,Spark 作业看起来会运行得非常缓慢,乃至可能由于某个 task处置惩罚的数据量过大导致内存溢出。
数据倾斜问题排查与定位
初步认知
数据倾斜现象重要出现在涉及shuffle过程的盘算任务中。以下是一些常见且可能触发shuffle操作的Spark算子列表:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup以及repartition等。当Spark应用程序遭遇数据倾斜问题时,极有可能是由于这些算子中的某一个在代码逻辑中被不当利用所致。
问题定位
简而言之,多数 task 都实行得非常快,但个别 task 实行非常慢,导致整个任务不能结束。乃至用 SparkStreaming 做及时算法时候,可能不停会有 Executor 出现 OOM 的错误,但是其余的 Executor 内存利用率却很低。
如何定位,可以 Spark 的 web UI 检察,如果看到类似这种,那就基本上可以确定是数据倾斜了。
图4 疑似数据倾斜Stage实行样例
也可以详细的看到如下的盘算4分位数。
图5 疑似数据倾斜Stage4分位数样例
通过UI乃至可以直接定位到是哪个过程发生的数据倾斜。为了进一步定位数据倾斜是有哪些 key 导致的,可以对key进行统计,如果数据量大,可以进行抽样的情势。如果发现多数数据分布都较为均匀,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。计数top就是造成数据倾斜的元凶。
【留意】发生数据倾斜后,我们先定位到Stage,再去看其对应的DAG图与实行时间图,能够很快定位到问题所在(不管你写的是Java、Python照旧Spark SQL等)。
检察key的数据分布情况
在辨认了数据倾斜发生的详细位置后,通常必要对实行了Shuffle操作并引发数据倾斜的RDD或Hive表进行深入分析,以考察其中键(key)的分布状况。目的是为后续选择得当的解决方案提供决策依据。鉴于不同的键分布特性以及与各种shuffle算子的组合可能,解决数据倾斜的策略需因地制宜。根据实行的操作类型,有多种方法可用于调查键的分布:
- 如果数据倾斜是由Spark SQL中的GROUP BY或JOIN语句引起,则应分析相关SQL语句中涉及的表的键分布情况。
- 对于由Spark RDD上实行的shuffle算子导致的数据倾斜,可以在Spark作业中嵌入代码以查抄键的分布,比方利用RDD.countByKey()方法。随后,可以将统计得到的键出现频次通过collect或take操作提取至客户端,以便打印并观测Key的分布状况。
解决数据倾斜(Spark调优)
基本思路
既然数据倾斜是由于相同Key的值被分配到少数几个节点上造成的单点问题,那么尽可能的的让Key均匀分配,问题就解决了。
【留意】以下解决方案参考了知乎文章 《万字详解 Spark 数据倾斜及解决方案》
(首选尝试)解决方案一:进步Shuffle操作的并行度
方案实用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先利用这种方案,由于这是处置惩罚数据倾斜最简朴的一种方案。
方案实现思路:在对RDD实行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子实行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,必要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于许多场景来说都有点过小。
方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处置惩罚比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处置惩罚50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处置惩罚10条数据,那么自然每个task的实行时间都会变短了。
方案优点:实现起来比较简朴,可以有效缓解和减轻数据倾斜的影响。
方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
方案实践经验:该方案通常无法彻底解决数据倾斜,由于如果出现一些非常情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定照旧会分配到一个task中行止理,因此注定照旧会发生数据倾斜的。以是这种方案只能说是在发现数据倾斜时尝试利用的第一种手段,尝试去用最简朴的方法缓解数据倾斜而已,或者是和其他方案团结起来利用。
(其次尝试) 解决方案二:利用 Hive ETL 预处置惩罚数据
方案实用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不匀称(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景必要频繁利用Spark对Hive表实行某个分析操作,那么比较得当利用这种技术方案。
方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处置惩罚(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处置惩罚后的Hive表。此时由于数据已经预先辈行过聚合或join操作了,那么在Spark作业中也就不必要利用原先的shuffle类算子实行这类操作了。
方案实现原理:这种方案从根源上解决了数据倾斜,由于彻底制止了在Spark中实行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下各人,这种方式属于治标不治本。由于究竟数据本身就存在分布不匀称的问题,以是Hive ETL中进行group by或者join等shuffle操作时,照旧会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,制止Spark程序发生数据倾斜而已。
方案优点:实现起来简朴便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提拔。
方案缺点:治标不治本,Hive ETL中照旧会发生数据倾斜。
方案实践经验:在一些Java体系与Spark团结利用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的实行性能要求很高,就比较得当利用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅实行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,实行速度都会很快,能够提供更好的用户体验。
项目实践经验:在美团·点评的交互式用户行为分析体系中利用了这种方案,该体系重要是答应用户通过Java Web体系提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。以是我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接利用预处置惩罚的Hive中央表,尽可能地减少Spark的shuffle操作,大幅度提拔了性能,将部分作业的性能提拔了6倍以上。
(数据非常时)解决方案二:过滤少数导致倾斜的key
方案实用场景:如果发现导致倾斜的key就少数几个,而且对盘算本身的影响并不大的话,那么很得当利用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。
方案实现思路:如果我们判断那少数几个数据量特殊多的key,对作业的实行和盘算结果不是特殊紧张的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以利用where子句过滤掉这些key或者在Spark Core中对RDD实行filter算子过滤掉这些key。如果必要每次作业实行时,动态判断哪些key的数据量最多然后再进行过滤,那么可以利用sample算子对RDD进行采样,然后盘算出每个key的数量,取数据量最多的key过滤掉即可。
方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与盘算了,自然不可能产生数据倾斜。
方案优点:实现简朴,而且效果也很好,可以完全规避掉数据倾斜。
方案缺点:实用场景不多,大多数情况下,导致倾斜的key照旧许多的,并不是只有少数几个。
方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据非常,导致数据量暴增。因此就接纳每次实行前先辈行采样,盘算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。
(聚合Shuffle倾斜时)解决方案四:两阶段聚合(局部聚合+全局聚合)
方案实用场景:对RDD实行reduceByKey等聚合类shuffle算子或者在Spark SQL中利用group by语句进行分组聚合时,比较实用这种方案。
方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就酿成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会酿成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,实行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会酿成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会酿成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最闭幕果了,比如(hello, 4)。
方案实现原理:将原本相同的key通过附加随机前缀的方式,酿成多个不同的key,就可以让原本被一个task处置惩罚的数据分散到多个task上去做局部聚合,进而解决单个task处置惩罚数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。详细原理见下图。
方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提拔数倍以上。
方案缺点:仅仅实用于聚合类的shuffle操作,实用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
(大表Join小表)解决方案五:将reduce join转为map join 【广播】
方案实用场景:在对RDD利用join类操作,或者是在Spark SQL中利用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较实用此方案。
方案实现思路:倒霉用join算子进行毗连操作,而利用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底制止数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD实行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照毗连key进行比对,如果毗连key相同的话,那么就将两个RDD的数据用你必要的方式毗连起来必要设置spark.sql.autoBroadcastJoinThreshold,将 Broadcast 的阈值设置得充足大。
方案实现原理:平凡的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。详细原理如下图所示。
方案优点:对join操作导致的数据倾斜,效果非常好,由于根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点:实用场景较少,由于这个方案只实用于一个大表和一个小表的情况。究竟我们必要将小表进行广播,此时会比较斲丧内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不得当两个都是大表的情况。
(大量倾斜Key)解决方案六:利用随机前缀和扩容RDD进行join【加盐】
方案实用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能利用最后一种方案来解决问题了。
方案实现思路:首先检察RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。然后将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条-数据都依次打上一个0~n的前缀。最后将两个处置惩罚后的RDD进行join即可。
方案实现原理:将原先一样的key通过附加随机前缀酿成不一样的key,然后就可以将这些处置惩罚后的“不同key”分散到多个task中行止理,而不是让一个task处置惩罚大量的相同key。这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处置惩罚,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
方案优点:对join类型的数据倾斜基本都可以处置惩罚,而且效果也相对比较显着,性能提拔效果非常不错。
方案缺点:该方案更多的是缓解数据倾斜,而不是彻底制止数据倾斜。而且必要对整个RDD进行扩容,对内存资源要求很高。
方案实践经验:曾经开辟一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的实行时间大约是60分钟左右;利用该方案优化之后,实行时间紧缩到10分钟左右,性能提拔了6倍。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |