魏晓东 发表于 2024-12-1 23:16:35

【互联网大厂练习履历】Spark SQL数据倾斜调优圣经(含实战分享)

国科大学习生存(期末复习资料、课程大作业解析、大厂练习履历心得等): 文章专栏(点击跳转)
大数据开发学习文档(分布式文件体系的实现,大数据生态圈学习文档等): 文章专栏(点击跳转)


1. Spark 理论基石:RDD

RDD概念:
RDD,学名弹性分布式数据集(Resilient Distributed Dataset)。是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中实行一系列盘算,而不用将中间效果落盘。而这正是之前 MR 抽象的一个紧张痛点,每一个步骤都必要落盘,使得不必要的开销很高。
RDD特性:
分布式:RDD可以分布在集群的多个节点上存储和处理。
弹性:RDD能够从节点故障中恢复,确保数据不丢失。
不可变性:RDD是只读的,一旦创建就不能修改。要改变RDD,只能通过转换操作(如map、filter等)来创建一个新的RDD。
并行化:Spark可以并行地处理RDD,以提高性能。
容错性:通过使用数据的多个副本和盘算的依赖关系即:血缘谱系(lineage),RDD可以在节点失败时重新盘算丢失的数据。一样平常来说,重建的粒度是分区(Partition)而非整个数据集,一来代价更小,二来差异分区大概在差异呆板上。
RDD提供了两种主要的操作:


[*]窄依赖(Narrow Dependencies):如果子RDD的每个分区只依赖于父RDD的一个分区,这种依赖关系称为窄依赖。窄依赖操作包括map、filter和union等,它们可以并行实行,因为每个输出分区只依赖于一个输入分区。
[*]宽依赖(Wide Dependencies):如果子RDD的每个分区依赖于父RDD的多个分区,这种依赖关系称为宽依赖。宽依赖操作包括groupByKey、reduceByKey和join等,它们必要进行数据的shuffle(重新分区)操作。
RDD的创建可以通过以下几种方式:

[*]从HDFS、S3、本地文件体系等存储体系中读取数据。
[*]并行化集合:在Driver程序中创建的集合可以通过parallelize方法转换为RDD。
[*]外部数据源:通过输入格式读取外部数据源,如Hive表、HBase等。
RDD的变换(Transformations)算子包括:
   map:对RDD的每个元素应用函数,返回一个新的RDD。
filter:根据条件选择RDD中的元素,返回一个新的RDD。
groupByKey:根据键对数据进行分组。
reduceByKey:对每个键对应的值进行聚合。
join:与其他RDD进行join操作。
RDD的动作(Action)算子包括:
   count:盘算RDD中的元素数量。
collect:收集所有元素到Driver程序中。
take:获取RDD中的前n个元素。
saveAsTextFile:将RDD的元素以文本情势保存到文件。
2. Spark Shuffle:调优路中的性能杀手

2.1 Shuffle概念

Apache Spark中的Shuffle是一个涉及数据重新分配的过程,它通常发生在宽依赖操作中,如repartition、groupByKey、reduceByKey和join等操作。Shuffle是大数据中的性能杀手,其来源于大数据中的元老级的组件Hadoop。


[*]Shuffle过程涉及将数据在差异节点和分区之间重新分布,以确保雷同键的数据被聚集在一起。这个过程通常伴随着大量的磁盘I/O、网络I/O和序列化/反序列化操作,因此会成为分布式盘算任务的性能瓶颈 。
2.2 Spark SQL中的Shuffle

在Spark SQL中,Shuffle操作通常发生在必要跨分区进行数据重组的场景。以下是一些常见的大概触发Shuffle的Spark SQL操作:


[*]聚合操作:如GROUP BY、COUNT、DISTINCT等,这些操作必要对雷同键的数据进行汇总,因此必要将具有雷同键的数据拉取到同一个分区中进行处理。
[*]连接操作:特殊是宽连接(如LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN),当两个表的连接键在差异分区上存在时,必要Shuffle数据以确保连接操作的精确性。
[*]排序操作:全局排序(如ORDER BY)大概必要Shuffle,因为必要将所有数据收集到一起以进行全局排序。
[*]分布式团结:使用UNION ALL时,如果涉及的表分区不在同一节点上,大概必要Shuffle数据以确保团结操作的精确性。
[*]窗口函数:某些窗口函数大概必要对数据进行全局排序,这大概触发Shuffle操作。
[*]广播连接:当使用广播连接时,如果广播变量很大,或者参与连接的数据量很大,也大概触发Shuffle。
2.3 Spark SQL中的数据倾斜题目

在Spark SQL中,数据倾斜是常见的性能题目,通常发生在数据在差异节点或分区之间分布不匀称时,由于Shuffle操作的存在,可以说也直接导致了数据倾斜题目的出现。因此上文中提到的 大概触发Shuffle的Spark SQL操作 同样也会导致数据倾斜题目。
下面详细介绍最容易出现数据倾斜的两种场景:聚合操作、连接操作:


[*]聚合操作:如GROUP BY、COUNT、DISTINCT等,如果某些key的数据量特殊大,会导致数据倾斜。可以通过使用repartition()或coalesce()重新分配数据,或者使用reduceByKey局部聚合结合groupBy全局聚合的方式来优化
[*]连接操作(⭐紧张⭐):特殊是当两个表通过JOIN操作连接时,如果连接键在数据中分布不匀称,也很容易导致数据倾斜。优化方法包括使用Broadcast Join来处理小表,或者通过调解spark.sql.shuffle.partitions参数来增加Shuffle过程中的分区数,从而减轻单个分区的数据压力。
2.3.1 常见的Join策略解析

Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。
https://i-blog.csdnimg.cn/direct/d2506be6e227433baf6e788cfc67b648.png
大表Join小表:


[*] Shuffle Hash Join:把大表和小表按照雷同的分区算法和分区数进行分区(Join 的 keys 进行分区),包管了 hash 值一样(雷同key)的数据都分发到同一个分区中(分区内不排序),然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。
https://i-blog.csdnimg.cn/direct/eb620cef6ae8453fb417907320af0254.png
       注意,和broadcast hash join的区别,这里并没有广播小表,在双方shuffle后的分区内,小表转成Hash桶与大表进行hash join。
    特点:
仅支持等值连接,join key不必要排序;
支持除了全外连接(full outer joins)之外的所有join范例;
必要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比力大,大概会造成OOM,不得当严重倾斜的join;
对于FullOuter Join,必要建立双向hash表,代价太大。因此FullOuterJoin默认都是基于SortJoin来实现。
[*] Broadcast Hash Join:将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一样平常而言,Broadcast Hash Join 会比其他 Join 策略实行的要快。因为他直接在一个map中完成了,也称之为map join。
https://i-blog.csdnimg.cn/direct/e3b066f5998a4788aa806dd135842c1b.png
[*] Broadcast Nested Loop Join:该方式是在没有符合的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > Cartesian Join > Broadcast Nested Loop Join.
最小的数据集被广播到另一个数据集的每个分区上,实行一个嵌套循环来实行join, 也就是说数据集1的每条记载都尝试join数据集2的每条记载(最笨的方法),服从比力低。既可以做等值join也可以做非等值join,而且是非等值join的默认策略。
没有排序,就是广播小表到每个分区上,尝试join每条记载,服从低!
大表之间Join:


[*] Sort Merge Join:先hash到同一个分区且排好序,然后再在分区内顺序查找比对
对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 雷同的就输出,如果差异,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记载后只必要从上一次竣事的位置开始查找,SortMergeJoinExec实行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。
https://i-blog.csdnimg.cn/direct/7b431d8ba2f24a2a8e4e129400db7ce6.png
Join步骤:
       shuffle: 将两张表按照 join key 进行shuffle,包管join key值雷同的记载会被分在相应的分区
sort: 对每个分区内的数据进行排序
merge: 排序后再对相应的分区内的记载进行连接
[*] Cartesian Join:笛卡尔积
如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K = n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle。
Join策略选择:
https://i-blog.csdnimg.cn/direct/60d2b7f2b3354fe5b39c63765113a5bc.png
2.3.2 解决数据倾斜常见方法



[*] Map端

[*]Map端文件不支持切片;由于文件采取了不支持splittable的压缩算法或者文件大小不一致,导致map端的数据倾斜。此时必要做数据预处理,切换数据存储格式以使源数据文件可切片。

[*] Reduce端

[*] 异常数据导致的数据倾斜,包括null(空值)、无效数据(对效果影响不大的有效数据或是大量重复的测试数据)、正常数据(业务导致的数据分布)。对于前两种环境,只必要对数据进行预处理,过滤掉空值或是按一定规则淘汰数据量;对于最后一种环境下文会介绍解决方法。
[*] 大小表关联(Join) 时,某些小表数据key大量出现在大表中,导致join操作在对数据进行Shuffle时发生数据倾斜题目。此时我们要清楚,数据倾斜发生的原因是由于Shuffle造成的,那么有没有一种方式可以避免Shuffle操作呢?我们可以使用广播操作(Broadcast),来将小表广播到大表所在的节点上,此时就可以将Reduce side join变革为Map side join,消除了Shuffle过程,即消除了数据倾斜题目。
[*] 大表与大表关联时,此时不可以使用广播操作,因为广播操作仅得当广播较小的表,表格太大大概导致内存压力(OOM),且存在大量的网络I/O,反而会低落作业性能。此时我们应该能想到通过增加Reduce节点来淘汰Reduce端的盘算压力,即改变Reduce的并行度从而改变Shuffle过程中数据的去向,进而消除数据倾斜。具体可以通过设置SET spark.sql.shuffle.partitions=300参数或者使用repartition或coalesce来指定Shuffle的分区数来提高作业的并行处理能力。
[*] 业务导致的数据倾斜,此时数据倾斜是由少数key的数据量大造成的:少数key雷同的数据,数据量大,会集在部门分区。此时我们可以通过加盐操作(salting)+两阶段聚合来解决题目。具体操作:首先,加盐打散key。给每个key都加一个随机数(如10以内的随机数)【加盐】,此时 key 被打散。局部聚合:对打上随机数的key,实行一次聚合操作,得到效果。全局聚合:将 key 的前缀/后缀去掉,再进行一次聚合操作,得到最闭幕果。
https://i-blog.csdnimg.cn/direct/582a100f2ff2406da82144e4ae673c28.png
从上图可以看到,如果不进行两阶段聚合的话,所有源数据均会被Shuffle到同一个节点进行盘算,但通过加盐处理后使得数据在三个节点上盘算,数据量减小了3倍,第二阶段聚合的数据量也减小了一倍。
[*] 中间层建立(通用),在进行长血缘谱系盘算时,通常会建立一些中间层(物化视图),目标就是通过预盘算,提高查询性能,以淘汰任务盘算压力。但在建立中间层是必要考虑的题目不仅仅是这单个的需求,必须从宏观角度上来分析这个中间层是否有建立的意义,即后续是否可以被其他需求所使用等等。

[*]物化视图(Materialized View) 是一个包括查询效果的数据库对像,可以用于预先盘算并保存表连接或聚集等耗时较多的操作的效果。在实行查询时,就可以避免进行这些耗时的操作,而从快速的得到效果。
 

具体操作在下文 Spark调优实战案例 另有详细介绍。

3. Spark调优实战案例

前言:在滴滴练习期间所做的一个b补弹性运力 数据需求,由于数据量较大存在数据倾斜题目,于是接触了Spark调优实战,现总结如下。

[*] 调优效果
o 确保代码逻辑精确的条件下运行代码,耗时5h+
o 加盐操作处理后,耗时2h+
o 广播操作处理后,耗时1h+
o 中加层提取处理后,耗时1h内
经过上述三个处理,调度任务的运行时间从340min左右缩小到55min左右(约6倍)。
https://i-blog.csdnimg.cn/direct/433c67688ed44a9c8754ae7fa45253c2.png
[*] 业务简述
要盘算某都会某天活动期间司机在线时长,就要用活动表与司机在线表进行连接,由于活动表数据量较小(几千数量级)但司机在线表数据量巨大(十亿数量级:小时增量表,且每分钟记载一条数据),导致出现大小表join操作环境,这样就很有大概会造成数据倾斜。
且真实业务逻辑正是两个表通过city_id连接,但是北京上海两个都会会在工作日下发大量的 <门路包> 活动,就导致这两个都会的city_id在大表中反复大量出现,于是,数据倾斜题目发生了。
[*] 题目及解决方案


[*] 在确保代码逻辑精确的环境下运行,频繁出现executor内存溢出

[*]原因排查:
当数据分布不匀称时,某些Executor大概处理的数据量远大于其他Executor,从而导致内存负载过大。特殊是当连接操作(如join)涉及到大表和小表时,如果小表中的某些值在大表中频繁出现,即数据倾斜,就大概引起严重的内存压力。
[*]题目分析:
数据倾斜不是过多的无效数据或null值造成而是业务逻辑导致的数据分布,所以无法对数据进行预处理和过滤来解决题目。经分析可知两表正常做join操作时,由于某些city_id对应了大量的待连接数据,势必会造成数据倾斜。
[*]解决方案:
**加盐(Salting操作)**打散key(city_id),给两个表要进行连接操作的每个key都加上一个随机数(0~100)作new_city_id,此时key被打散,消除了数据倾斜的题目;对打散后的数据通过新key进行局部聚合;对聚合完成后的数据通过原有的key再进行一次全局聚合即解决了数据倾斜的题目。
https://i-blog.csdnimg.cn/direct/42db3a9b9d7a46458e9d616c153874f9.png

[*] 修改完后可正常运行,但观察运行进程发现调度会在某些task处实行迟钝

[*] 原因排查:
大表和小表正常做join操作时,会对数据进行分区,不可避免的带来了shuffle,而且业务代码中会对活动表(小表)进行多次表连接操作,每次都会有大量数据的网络传输和shuffle开销,导致task运行迟钝。 https://i-blog.csdnimg.cn/direct/003809a9ae464840bb229715e5ceb188.png
[*] 题目分析:
做连接操作的另一张表数据量庞大依然大概会导致数据倾斜的题目(即使是加盐处理后),所以考虑怎样能淘汰大量的网络传输和shuffle开销,如果网络带宽有限或者数据传输量过大,就大概导致数据传输迟钝,使task运行迟钝,而且会使得Executor内存占用持续上升,最终引发内存溢出。
[*] 解决方案:
舍弃reduce join ,将数据在map阶段就进行join操作,这样就避免了大量数据的网络传输与shuffle开销。实现方法是通过Spark的一种用于优化查询性能的技能:广播操作,通过将小型数据集复制到必要进行与之连接操作的各个盘算节点,以淘汰网络IO、避免reduce join。具体来说,广播变量会将Driver端的变量的值事先广播到每一个Worker端,这样在以后的盘算过程中只必要从本地拿取该值即可,避免网络IO,提高盘算服从。
https://i-blog.csdnimg.cn/direct/694f5c987ce14072bf7769bf0a3d96f0.png

[*] 司机在线表数据精确到分钟粒度,数据量极其庞大(十亿数量级)

[*]题目分析:
活动表数据与司机在线表做join操作时必要先将司机在线表聚合成半小时粒度的数据,否则直接做join会是数量级扩大越30倍;且考虑到战区 部门业务许多涉及不到分钟粒度,每次使用到司机在线表数据时都会再进行聚合操作(以缩小数据量级),所以考虑将半小时粒度的司机在线数据抽取出dwm表到中间层(此处考虑不处理成一小时粒度的原因是许多活动是精确到半小时粒度的)。
[*]解决方案:
抽取司机在线30min时间围栏在线数据dwm表到中间层,然后在代码中只必要直接关联中间层dwm表即可。


[*]调优实战总结
对于大表join小表的数据操作,常常会因为业务或者null值等原因产生数据倾斜题目。而处理数据倾斜题目,并没有一劳永逸的方法,许多时间产生数据倾斜的原因不是单个业务造成的,这时间就要求我们通过上游数据表的特点、业务逻辑的特点等来分析数据倾斜产生的原因对症下药,将多种方案组合起来使用,实在不可的话…无脑加资源叭 (更改资源设置这种方法的优先级最低,实在没办法了才考虑使用)!
参考文章:
Spark 理论基石 —— RDD
面试题目 之 Spark Shuffle概述
【Spark的五种Join策略解析】
【实战干货】解决Spark数据倾斜之消除数据倾斜的六大秘籍
Hive视图与物化视图使用详解
17点23分 2024年8月29日
总结整理内容,如有错误,欢迎大家评论区指正!
不积跬步无以至千里!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【互联网大厂练习履历】Spark SQL数据倾斜调优圣经(含实战分享)