ToB企服应用市场:ToB评测及商务社交产业平台

标题: SPARK调优:AQE特性(含脑图总结) [打印本页]

作者: 王國慶    时间: 2024-12-29 14:48
标题: SPARK调优:AQE特性(含脑图总结)
学完AQE需要能够回答如下的几个题目:
    

SparkAQE是spark 3.0引入的一大告急功能,今天我们来聊一聊AQE的实现原理。
了解一个功能,先来了解其面临的题目。当涉及到大型集群中的复杂查询性能时,处理的并行度和精确Join策略选择已被证明是影响性能的关键因素。但Spark SQL在易用性和性能方面仍然存在极具挑衅的题目:
一 应运而生,没有AQE时候的挑衅

SparkSQL只能设置固定的Shuffle 分区数:
在 Spark SQL 中,shuffle 分区数是通过 spark.sql.shuffle.partition 配置的,默认值为 200。它决定了 reduce 任务的数量,对查询性能影响很大。
当我们配置spark.sql.shuffle.partition 后会默认给全部的join或agg过程中的shuffle设置统一的分区数,这是不合适的。相同的shuffle分区数不能适合单个查询的全部stage,因为每个stage都有差别的输出数据大小和分配。
如果分区数太小,则并行度较低,每个reduce任务必须处理更多数据。由于内存可能无法保存全部数据,乃至会发生溢出到磁盘。在最坏的情况下,它可能会导致严峻的 GC 题目或 OOM。此外,如果某些任务处理的数据比其他任务多得多(即数据倾斜),或者由于磁盘较旧、CPU 频率较低等原因运行时间较长,整个集群的资源利用率就会低落,当前阶段的执行时间会更长。这时候就需要通过增加分区数,淘汰某个分区处理的数据量,可以减轻这类题目的影响。
当然,如果分区值太大,可能会增加调理的开销,因为会有太多的小reduce任务和许多小文件生成,也会导致性能题目。
总之,shuffle 分区数既不能太小也不能太大。为了获得最佳性能,我们常常需要在非生产情况中为多次调整 shuffle 分区数(只能在非SQL的DataSet或DataFrame中设置)。
并不能选择出最优执行计划:
在没有开启
CBO
前,Spark默认从逻辑计划生成的物理计划列表中选择第一个作为物理计划。
而CBO的优化并非是全能的,起首CBO 仅支持注册到 Hive Metastore 的数据表的优化。别的开发者还需预先统计网络表信息,存在性能消耗。最告急的是CBO也是一种静态的优化策略,它团结各类统计信息订定执行计划,一旦执行计划交付运行,CBO 的使命就算完成了。换句话说,Spark在启动后不能在对物理计划举行修改。
但其着实Spark运行中会涉及些必须落盘的操作,比方shuffle等操作。通过落盘信息网络更多的运行时信息,可以提供更加精准的Statistics。
从SparkSQL统计信息文章可以看出,SparkSQL默认对数据的评估黑白常不准确的。如果真实表的数据量小于BroadcastHashJoin的阈值,就可以将SortMergeJoin转换为BroadcastHashJoin,它避免了大表在集群中的Shuffle, 这大大提高了查询性能。
数据倾斜严峻影响SparkSQL的稳定性:
数据倾斜指的是某些分区的数据大小远大于其他分区的情况,那些相应的任务可能会运行更长时间,依据木桶理论,整个Query变慢。
   目前有一些处理数据倾斜的方法:
增加 shuffle 分区数,这样数据更有可能被散列到差别的分区。不幸的是,当大量数据共享相同的hash key时,这将无济于事。
增加 BroadcastHashJoin 阈值以将更多先前plan的 SortMergeJoin 转换为 BroadcastHashJoin。这样可以避免在有限的情况下shuffle造成的数据倾斜。
手动过滤倾斜的key并向这些数据添加随机"加盐"处理。在另一个表中,相应的数据也需要复制。但当此中一个连接表取自中间结果,这种解决办法很难实现。
这些题目的原因一方面是SparkSQL并不能准确的评估处理的数据量,另一方面是不能根据运行中的状态动态的调整运行计划和配置。
  为了解决这些题目,Spark社区引入在RDBMS 世界中广泛使用多年基于成本的优化(CBO)。然而,在分布式系统中使用 CBO 是一个“极其复杂的题目”,在Spark中网络和维护一组准确和最新的统计数据是昂贵的。
为了解决这些题目,Spark 社区在 2015 年提出了 Adaptive Execution 的基本头脑,并在 DAGScheduler 中,增加了一个新的 API 来支持提交单个map阶段,以及运行时更改 shuffle 分区数。基于社区工作,英特尔和百度团队重新计划了 Adaptive Execution,实现了更加灵活的AQE框架。
二 AQE是什么?它是什么实现原理?

简单来说,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会团结这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化
起首,AQE 赖以优化的统计信息与 CBO 差别,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。学习过 Shuffle 的工作原理之后,我们知道,每个 Map Task 都会输出以 data 为后缀的数据文件,另有以 index 为结尾的索引文件,这些文件统称为中间文件。每个 data 文件的大小、空文件数量与占比、每个 Reduce Task 对应的分区大小,全部这些基于中间文件的统计值构成了 AQE 举行优化的信息来源。
其次,AQE 从运行时获取统计信息,在条件允许的情况下,优化决定会分别作用到逻辑计划和物理计划。
那么它是怎么实现基于统计信息修改尚未执行的逻辑计划和物理计划的呢?
在非AQE的情况下,Spark会在规划阶段确定了物理执行计划后,根据每个算子的界说生成RDD对应的DAG。然后 Spark DAGScheduler通过shuffle来分别RDD Graph并创建stage,然后提交Stage以供执行。
下面我们来看看在启用AQE后的情况:
起首,会将逻辑计划拆分为QueryStage的独立子图,这可以更早地拆分Stage。通过单独提交mapStage,网络它们的MapOutputStatistics对象。
在AQE的plan中界说了两种范例的QueryStage, 分别为:
Shuffle query stages: 将其输出物化为Shuffle文件。
Broadcast query stages:将其输出物化到Driver内存中的数组。
顾名思义,它们分别是基于Shuffle和broadcast来举行分别的。同时这里分别出来的是QueryStage而非Stage。实现上述这个功能的实现是基于DAGScheduler现在支持提交单个mapStage。
其次,AQE还包罗对物理规划和执行规划的修改。
在物理规划中,会在原执行计划中找到Exchange,并引入了两个新的操作节点。
QueryStage是一个阶段的根节点,负责运行时决定
QueryStageInput是一个stage的叶子节点,告急目的是在物理计划更新后将子stage的结果提供给它的父亲

如上图所示,QueryStages 和 QueryStageInputs 是通过在执行计划中查找 Exchange 来添加的。
在执行阶段中,plan树上的任何 QueryStage 都会引用其子阶段并递归执行它们。在 QueryStage 的全部子节点完成后,将网络运行时 shuffle-write 统计信息并用于进一步细化。然后 Spark 重新启动逻辑优化和物理规划阶段,并根据这些新信息动态更新查询计划。

总之,如图所示,在非AQE的情况下,SparkSQL会转换为DAG图,然后DAGScheduler基于shuffle将其分别为多个stage, 然后再执行stage。
在AQE的情况下,起首会将plan树拆分为多个QueryStages, 在执行时先将它的子 QueryStages 被提交。在全部子节点完成后,网络 shuffle 数据大小。根据网络到的 shuffle 数据统计信息,将当前 QueryStage 的执行计划优化为更好的执行计划。然后转换为DAG图再执行Stage。
三 目前AQE告急有三大特性:

自动分区合并:在 Shuffle 事后,Reduce Task 数据分布乱七八糟,AQE 将自动合并过小的数据分区。
Join 策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表到场的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。
自动倾斜处理:团结配置项,AQE 自动拆分 Reduce 阶段过大的数据分区,低落单个 Reduce Task 的工作负载。
下面我们依次来简单了解下。
1 自动分区合并

分区合并的原理比较简单,在 Reduce 阶段,当 Reduce Task 把数据分片从map端拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起。
举个例子,假设 shuffle 分区数为 5,每个 reducer 的目标数据大小为 64MB。map阶段完成后,我们知道每个分区的大小分别为70MB、30MB、20MB、10MB和50MB。为了尽量在不拆分分区的情况下,让每个 post-shuffle 分区的大小小于目标数据大小,确定在运行时使用 3 个 reducer 举行平衡。第一个 reducer 处理分区 0 (70MB)。第二个 reducer 处理 3 个一连的分区(分区 1 到 3,统共 60MB)。第三个 reducer 处理分区 4 (50MB)。
在合并分区时,会按照分区编号依次将 shuffle 分区打包到单个coalesced 分区,直到添加另一个 shuffle 分区会导致 coalesced 分区的大小大于目标大小。
此外,在新的AQE框架中,每个 QueryStage 都知道它的全部子阶段,这使得它可以很好地处理 3 个以上的表join,避免引入更多的shuffle。
调整自动合并分区大小告急有两个参数:

1.spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分区的保举目标大小
2.spark.sql.adaptive.coalescePartitions.minPartitionNum 分区合并后的最小分区数

  1. val maxTargetSize = math.max(  math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
  2. val targetSize = math.min(maxTargetSize, advisoryTargetSize)
复制代码
从上面的代码可以看出,目标分区的大小是由两个参数共同决定。shuffle数据的总大小除以最小分区数与保举目标大小间的最小值即为目标分区的大小。
2 Join 策略调整

在AQE中,shuffle数据的统计信息可以为当前的 QueryStage 的plan提供更好的优化建议。那么当此中一个join表的实际大小小于阈值时,如果使用BroadcastHashJoin而不是SortMergeJoin可以获得更好的性能。
这里有两个优化规则,一个逻辑规则和一个物理策略分别是:DemoteBroadcastHashJoin 和 OptimizeLocalShuffleReader。
DemoteBroadcastHashJoin 规则的作用,是把 Shuffle Sort Merge Joins 降级为 Broadcast Joins。
对于到场 Join 的两张表来说,在它们分别完成 Shuffle Map 阶段的计算之后,DemoteBroadcastHashJoin 会判断中间文件是否满意如下条件:
自动降级为BHJ的参数配置

1.spark.sql.autoBroadcastJoinThreshold 中间文件尺寸总和小于广播阈值
2.spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 空文件占比小于配置项

只要有任意一张表的统计信息满意这两个条件,Shuffle Sort Merge Join 就会降级为 Broadcast Hash Join。

但是,AQE 依靠的统计信息来自于 Shuffle Map 阶段生成的中间文件。这意味什么呢?
纵然已经将Shuffle Sort Merge Join 就会降级为 Broadcast Hash Join,但join两表都已经按照 Sort Merge Join 的方式走了一半(即大表和小表都举行了shuffle操作)。
在常规的 Shuffle 计算流程中,Reduce 阶段的计算需要跨节点访问中间文件拉取数据分片。如果遵循常规步调,即便 AQE 在运行时把 Shuffle Sort Merge Join 降级为 Broadcast Join,大表的中间文件还是需要通过网络举行分发。这个时候,AQE 的动态 Join 策略调整也就失去了实用价值。原因很简单,负载最重的大表 Shuffle 计算已经完成,再去决定切换到 Broadcast Join 已经没有任何意义。
在这样的背景下,OptimizeLocalShuffleReader 物理策略就非常告急了。既然大表已经完成 Shuffle Map 阶段的计算,这些计算可不能白白浪费掉。采取 OptimizeLocalShuffleReader 策略可以省去 Shuffle 常规步调中的网络分发,Reduce Task 可以就地读取当地节点(Local)的中间文件,完成与广播小表的关联操作。
举个例子:假设 shuffle 分区数为 5。在 map 阶段,任一连接都包罗 2 个 map 任务。在 reduce 阶段,为 SortMergeJoin 启动了 5 个 reducer,每个 reducer 举行远程 shuffle 读取。但是,如果将 BroadcastHashJoin 转换为,则只需要启动 2 个 reducer,并且每个 reducer 在当地读取一个 mapper 的完备 shuffle 输出。

这里有3个优点:
没有数据通过网络传输,因此节流了网络I/O。
顺序读取文件比正常随机读取文件的一小部分要快得多。
防止shuffle,避免数据歪斜。

3 自动倾斜处理

在AQE中,执行子 QueryStages 后,网络每个分区的 shuffle 数据大小和记录数。如果一个分区的数据量或记录数比中位数大N倍,也比预先配置的值大,则判断为倾斜分区,该连接判断为倾斜连接。这是在 Reduce 阶段,当 Reduce Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区。
OptimizeSkewedJoin规则使 Spark 通过遵循一个简单的规则来实现更好的负载平衡。此中 F 为倾斜因子,S 为倾斜大小和 R 为倾斜行数,一个分区被认为是倾斜的条件是:
它的大小大于 S 并且大于分区大小中位数乘以 F。
它的行数比 R 大,并且大于分区行数的中位数乘以 F。
假设表 A 和表 B 执行内连接并且表 A 中的分区 0 是倾斜的。对于正常执行,表 A 和 B 的分区 0 都被洗牌到单个 reducer 举行处理。由于这个 reducer 需要通过网络和进程获取大量数据,因此它可能是延伸整个阶段时间的最慢任务

如上图所示,N个task用于处理表A的偏斜分区0,每个task只读取表A的少数mapper的shuffle输出,并与表B的分区0举行join,将这N个task的结果合并得到最终的join结果. 为了实现这一点,我们更新了 shuffle read API 以允许仅从几个映射器而不是全部读取分区。
在处理过程中,我们可以看到表 B 的分区 0 将被多次读取。尽管引入了开销,但性能改进仍然很显着。
不外这种解决数据倾斜的方式针对的是Task级别的数据倾斜,告急是将同一个executor内的倾斜task举行拆分,而对于数据全集中在个别executor内的情况就无济于事了。

对于倾斜的分区集中在某些executor中,我们可以使用两阶段方式:手动加盐复制。
优化数据倾斜的配置参数

  1. spark.sql.adaptive.skewJoin.enabled = true
  2. spark.sql.adaptive.skewJoin.skewedPartitionFactor = 10  ,判定倾斜的膨胀系数
  3. spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB ,判定倾斜的最低阈值
  4. spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度
复制代码
  参考文档,嘎嘎好:
    不理解的地方:

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4