写在前面:Spark仍旧作为当前主流的离线盘算框架,深入浅出钻研一下其内核原理及优化思绪还是很有必要。以下将针对Spark3.x做些连贯的、精简的干货记录,全文拒绝废话。
1、执行流程
1.1、spark on yarn 客户端提交任务后的执行过程
正向通信过程概要:
客户端:submit作业到RM;
RM:关照NM启动AM;
AM:读参数、启动Driver;申请资源、注册、启动Executor;
Driver:执行代码,初始化SparkContext,切分任务、分配至Executor.
在Driver启动后,执行过程中,spark内核核心对象:
通报taskset(tasks)的顺序:DAGScheduler -> Taskscheduler -> SchedulerBackend(通光荣) -> ExecutorBackend(通光荣) -> Executor
1.2、sparksql 的执行计划处理流程
提交sparksql作业后:
生成unresolved logical plan:parser只检查sql语法是否有问题,不检查表名列名;
生成resolved logical plan - Analyzed Logical Plan:通过catalog验证语义、类型、表名列名等;
生成resolved logical plan - Optimized Logical Plan:catalyst优化器举行规则优化;
生成physical plan:物理计划的运算法大抵解释如下:
1)HashAggregate表示数据聚合,一般成对出现,第一个将执行节点本地的数据举行局部聚合,另一个将各个分区的数据进一步举行聚合盘算。
2)Exchange即shuffle,表示必要在集群上移动数据。很多时间HashAggregate 会以 Exchange 分隔开来。
3)Project是 SQL 中的投影操作,即select选择列。
4)BroadcastHashJoin表示通过基于广播方式举行HashJoin。
5)LocalTableScan即全表扫描本地的表。
生成终极代码&rdds:物理计划再经过cbo等策略优化。
2、角色概念
2.1、job, stage, task
application个数:sparkcontext上下文个数,在yarn中一般是容器个数;
job个数:action算子个数,如collect等;
stage个数:job中宽依靠数 + 1;
task个数:每个stage中,最后一个rdd的分区数相加;
执行到action算子->划定一个job->找宽依靠对job切stage->stage按照并发数产生task,被序列化发往executor执行。
2.2、RDD, DataFrame, DateSet
三种差别的分布式数据抽象条理
1、 RDD:对象集合,最基础的数据抽象,适用于低级api,处理复杂操作、细粒度操作场景。类型安全、无优化和管理;
2、DataFrame:行数据集,可以视为DataSet[Row],适用于SQL(高级api)处理布局化数据、数据分析。无类型、有优化;
3、DateSet:类型化数据集,适用于SQL(高级api)处理更高性能的场景。类型安全、有优化;
DataFrame和DateSet都基于sparksql引擎上构建,都有catalyst优化器。RDD可以转化成DataFrame和DateSet,反之亦然,差别抽象条理之间可以机动切换,以使用各自的优势。
3、资源规划
3.1、内存
有关堆内存的一些估算方法
1、executor-memory:NM上的堆内存/NM上的executor数;
2、估算other内存:自定义数据布局* 每个executor的vcore数;
3、估算storage内存:广播变量 + cache/executor数量;(cache巨细可以运行两次查看开启缓存前后的内存占用巨细,节省掉的部分≈cache)
4、估算execution内存:每个executor的vcore数 * (数据集巨细/shuffle partitions(并行度、task数))
- SET spark.executor.memory=2g; ##executor堆内存设置
- SET spark.memory.fraction=0.6; ##executor堆内存中统一内存(storage+execution,即除去other+预留300m的部分)
复制代码 有关堆外内存的一些重点概念
- 在spark3.0中,一个容器最多可申请的资源,由yarn.scheduler.maximum-allocationmb决定,需满足:spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size
≤ yarn.scheduler.maximum-allocation-mb
- spark.executor.memoryOverhead 泛指JVM堆内存外的额外内存,覆盖非JVM内存的其他需求。默认开启,巨细=max(堆*0.1, 384mb);
- spark.memory.offHeap.size 特指步调的堆外内存,默认关闭;
- 在spark3.0中,二者没有包含关系,总资源相加得到。在spark2中,额外包含堆外。
开启堆外内存后,可以使用堆外做缓存,减少gc开销,提升内存访问服从。
- ##executor额外内存,yarn模式专用。与spark.executor.memoryOverhead一样
- SET spark.yarn.executor.memoryOverhead=1g;
- ##开启堆外内存
- SET spark.memory.enable.offheap.enable=true;
- SET spark.memory.offHeap.size=1g;
复制代码 3.2、CPU
有关CPU的一些重点概念:
1、核数配置:确定yarn单台节点的线程数,预估一共要几个executor,总线程数/executor数=单个executor核数;
2、并行度:spark.sql.shuffle.partitions. 即shuffle 阶段的分区数、task数;
3、并发度:同时执行的task数,即vcore数。
cpu的服从低下,一般2种场景:
1、并行度过低:会导致数据分片较大,轻易使CPU线程挂起。比如一共10G数据,配置了2个并行度(即每个task有5G数据),假设只有1个executor(含2个核),但是executor内存一共就6G,就无法做到2个核同时运行,有1个在挂起浪费,这10G数据酿成串行执行。如果配置4个并行度(即每个task有2.5G数据),则该executor可以同时运行2个task;
2、并行度过高:每个分片较小,内存没压力,但是分片数(task数)过多,大概会导致cpu频繁调理,以及部分executor在空转,最严肃的场景则是调理时间大于了实际处理数据的时间。
3、比较理想的配置方式是:并行度(分区、task数) = 并发度(核心数)✖️ 2~3。 但前提是明确分片上的内存巨细能扛得住,如果过大 则并行度还必要再上调,但同时核心数(或executor数)同样也要上调。
- SET spark.sql.shuffle.partitions=3000; ##并行度设置
- SET spark.executor.cores=1; ##并发度设置
复制代码 4、sparksql语法优化
4.1、RBO优化
RBO优化基于Catalyst ,总共有 81 条优化规则(Rules),分成 27 组(Batches),根本归纳到以下 3 个范畴。
1、谓词下推:过滤条件提前执行,减少下游处理的数据量。
注意,对于left join,将过滤条件写在on上,只会下推右表,写在where则两表都下推(但是on和where对返回效果大概是不一样的 ,一般对于左表在where过滤,对于右表在on过滤即可)。inner join的join和where无区别。
2、列裁剪:即只读select或关联键相干的字段。
3、常量替换:主动把代码中含常量盘算的表达式转化成效果,如age>12+18 -> age>30.
4.2、CBO优化
CBO优化重要在物理计划层面,盘算并挑选代价最小的物理执行计划。代价包括2部分:执行节点对数据集的影响(输出数据集的巨细与分布)、执行节点操作算子的代价。
1、必要使用特定sql先网络表列的statistic信息
ANALYZE TABLE 表名 COMPUTE STATISTICS 统计表级别信息;
DESC FORMATTED 表名 展示网络到的信息。
2、开启CBO
4.3、AQE优化(核心)
概念:自适应查询执行(Adaptive Query Execution)
背景:CBO存在劣势,执行计划一旦生成,即便执行中发现有更优也不可更改; 必要提宿世成统计信息,如果数据频繁更新则成本较大; 基于静态统计信息,只是估算,不够准确;
策略:依据中间效果的准确统计信息,动态调整执行计划。
1、按照shuffle界限划分 QueryStage;
2、每个 QueryStage执行完之后,网络shuffle map的统计信息(shuffle block巨细和分布),运行时re-optimize调整plan;
3、按照新的plan,继续执行下一个QueryStage.
- ##启用AQE,作为以下所有优化项的前提
- SET spark.sql.adaptive.enabled=true
复制代码 4.3.1、动态调整执行计划:shuffle join转broadcast join
在一次QueryStage后,举行re-optimize更新统计信息,根据shuffle物化后的block统计,如果满足广播阈值,会主动将预计划中的shufflejoin(sortmergejoin)转成broadcast join,从而提高join性能。
- ##开启AQE动态调整join功能
- SET spark.sql.adaptive.join.enabled=true;
- ##转广播join的阈值,默认与spark.sql.autoBroadcastJoinThreshold相等
- SET spark.sql.adaptiveBroadcastJoinThreshold=64m;
- ##AQE同时会有其他的join优化策略,但其中有些可能会增加shuffle,可以配置是否允许,默认false
- SET spark.sql.adaptive.allowAdditionalShuffle=true;
复制代码 4.3.2、自适应调整并行度:动态设置shuffle partition
在一次QueryStage后,举行re-optimize更新统计信息,根据shuffle物化后的block巨细,将多个小block合并至下游一个reducer处理
必要注意的是:
1)即使某几个partition满足合并的阈值条件,但如果其不相临,也不会做合并,跳着读多个partition相当于随机读取,在HDD上性能不高,故只结合相邻的partition提高磁盘IO性能;
2)只会合并多个小的partition,不会将大的partition拆分,由于拆分过程必要引入一轮新的shuffle.
- ##设置每个reducer读取的目标数据量
- SET spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128m;
- ##设置动态合并后最小的分区数
- SET spark.sql.adaptive.coalescePartitions.minPartitionNum=100;
复制代码 4.3.3、主动处理数据倾斜
处理离线数据倾斜范例的解决思绪:
1、保证文件可 Split 从而避免读 HDFS 时数据倾斜;
2、调整并行度或自定义 Partitioner 从而分散分配给同一 Task 的大量差别 Key;
3、使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 从而避免 Shuffle 引起的数据倾斜;
4、对倾斜 Key 使用随机前缀或后缀从而分散大量倾斜 Key,同时将参与 Join 的小表扩容,从而保证 Join 效果的正确性。
AQE主动处理数据倾斜的原理:
shuffle物化后,统计map端shuffle block巨细分布,将倾斜的partition数据切成多个task,避免都fetch到一个reducer上处理。
- ##启用AQE的自动处理数据倾斜机制
- SET spark.sql.adaptive.skewedJoin.enabled=true;
- ##控制处理一个倾斜Partition的Task数上限,默认5
- SET spark.sql.adaptive.skewedPartitionMaxSplits=10;
- ##设置了一个Partition被视为倾斜的行数下限,默认1000w
- SET spark.sql.adaptive.skewedPartitionRowCountThreshold=5000000;
- ##设置了一个Partition被视为倾斜的行数下限,默认64m
- SET spark.sql.adaptive.skewedPartitionSizeThreshold=32 * 1024 * 1024;
- ##倾斜因子,大于行数下限且大于各分区行数中位数*因子,或大于大小下限且大于各分区大小中位数*因子,视为倾斜
- SET spark.sql.adaptive.skewedPartitionFactor=1.1;
复制代码 4.4、SMB join
sort merge bucket join
通太过桶+排序,使雷同key在同一分区,并有序,可以或许使用二分法等快速查找关联,大幅提升join服从
- 使用条件1:两表都必要分桶,且分桶个数一致
- 使用条件2:分桶字段(排序字段)=join键
- ##对表A做分桶+排序(clustered=distribute+sort)
- CREATE TABLE bucketed_orders
- USING PARQUET
- CLUSTERED BY (customer_id) INTO 10 BUCKETS
- AS SELECT * FROM orders
- ;
- ##对表B做分桶+排序(clustered=distribute+sort)
- CREATE TABLE bucketed_customers
- USING PARQUET
- CLUSTERED BY (customer_id) INTO 10 BUCKETS
- AS SELECT * FROM customers
- ##两表join
- SELECT *
- FROM bucketed_orders o
- JOIN bucketed_customers c
- ON o.customer_id = c.customer_id
复制代码 4.5、join策略与sql-hints
- 没有sql hints时,spark的5种join策略优先级顺序:(仅考虑等值关联)
①broadcast hash join. 小表够小,在广播阈值以内;
②shuffle hash join. 参数spark.sql.join.preferSortMergeJoin=false(默认true),且一张表足够小(将小表的分区数据构建成一个Hash table) ,按照join key做重分区,随后大表按joinkey去小表hash table匹配;
③sort-merge join. 若join keys是排序的,则选择;
④cartesian join. 内毗连,则选择;
⑤broadcast nested loop join. 若大概发生OOM或无可选策略,则终极选择。
- sql hints:
①Broadcast Hint: /*+ BROADCAST(t1) /、/+ BROADCASTJOIN(t1) /、/+ MAPJOIN(t1) / 如果join类型支持(等值且非full join),则选择broadcast hash join;
②Sort merge hint:/+ SHUFFLE_MERGE(t1) /、/+ MERGE(t1) /、/+ MERGEJOIN(t1) /如果join key是排序的,则选择sort-merge join;
③shuffle hash hint:/+ SHUFFLE_HASH(t1) / 如果join类型支持(等值且非full join), 选择 shuffle hash join;
④shuffle replicate NL hint:/+ SHUFFLE_REPLICATE_NL(t1) */ 如果是内毗连,选择笛卡尔积。
5、数据倾斜优化
缘故原由:涉及重分区的shuffle类算子,如 distinct、groupby、join等,若存在某一或某些key过大,则出现数据倾斜风险。
定位:抽样看key的分布,从概率角度定位倾斜的key
SELECT XX, SUM(1) FROM (SELECT * FROM xxx WHERE RAND() < 0.1) GROUP BY XX
5.1、单表倾斜优化
聚合shuffle场景,存在不匀称的key,可以使用两阶段聚合(加盐局部聚合+去盐全局聚合)
- ##加随机盐值
- WITH salted_transactions AS (
- SELECT
- user_id,
- CONCAT(user_id, '_', CAST(FLOOR(RAND() * 10) AS STRING)) AS salted_user_id,
- amount
- FROM transactions
- ),
- ##带盐值预聚合,使不均匀的key分布更散
- partial_aggregation AS (
- SELECT
- salted_user_id,
- SUM(amount) AS partial_sum
- FROM salted_transactions
- GROUP BY salted_user_id
- )
- ##去掉盐值全局聚合,此时重分区操作的数据是量更小、更均匀的key
- SELECT
- SPLIT(salted_user_id, '_')[0] AS user_id,
- SUM(partial_sum) AS total_amount
- FROM partial_aggregation
- GROUP BY SPLIT(salted_user_id, '_')[0];
复制代码 5.2、join倾斜优化
5.2.1、广播join
适用于小表存在热点key。若表足够小可以加载进driver端,广播到全部executor中,直接规避掉shuffle过程。
- ##设置广播的阈值,经验而言尽量不要超过100M,否则失去意义
- SET spark.sql.autoBroadcastJoinThreshold=26214400
复制代码 5.2.2、打散大表+扩容小表
适用于大表存在热点key。通常需完成如下四个步骤(假设大表order join 小表customer on customerid,大表有热点key)
- ##1. 拆分大 key 和普通 key
- ##将orders表拆分为两个数据集:一个包含倾斜的 customer_id(称为 skew_orders),另一个包含普通的 customer_id(称为 common_orders)。
- CREATE OR REPLACE TEMPORARY VIEW skew_orders AS
- SELECT *
- FROM orders
- WHERE customer_id IN ('big_key1', 'big_key2'); -- 假设 big_key1 和 big_key2 是倾斜的 key
- ;
- CREATE OR REPLACE TEMPORARY VIEW common_orders AS
- SELECT *
- FROM orders
- WHERE customer_id NOT IN ('big_key1', 'big_key2');
复制代码- ##2. 打散大表
- ##对skew_orders表的 customer_id 加上随机前缀,打散数据。
- CREATE OR REPLACE TEMPORARY VIEW dispersed_skew_orders AS
- SELECT
- CONCAT(customer_id, '_', CAST(FLOOR(RAND() * 10) AS STRING)) AS dispersed_customer_id,
- order_id,
- amount
- FROM skew_orders;
复制代码- ##3. 扩容小表
- ##通过关联的方式扩容 customers 表
- ##创建一个辅助表 numbers,用于扩容
- CREATE OR REPLACE TEMPORARY VIEW numbers AS
- SELECT explode(array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) AS num
- ;
- ##扩容 customers 表
- CREATE OR REPLACE TEMPORARY VIEW expanded_customers AS
- SELECT
- c.customer_id,
- c.other_columns,
- n.num
- FROM customers c
- CROSS JOIN numbers n;
复制代码- ##4. 联接操作
- ##将打散的skew_orders表分别与扩容的expanded_customers和普通的common_orders表进行联接,最后合并结果。
- -- 打散的 skew_orders 表与扩容的 expanded_customers 表联接
- CREATE OR REPLACE TEMPORARY VIEW skew_join AS
- SELECT
- s.order_id,
- s.amount,
- c.customer_id AS fan_id
- FROM dispersed_skew_orders s
- JOIN expanded_customers c
- ON s.dispersed_customer_id = CONCAT(c.customer_id, '_', CAST(c.num AS STRING));
- -- 普通的 common_orders 表与 customers 表联接
- CREATE OR REPLACE TEMPORARY VIEW common_join AS
- SELECT
- o.order_id,
- o.amount,
- c.customer_id AS fan_id
- FROM common_orders o
- JOIN customers c
- ON o.customer_id = c.customer_id;
- -- 合并结果
- SELECT * FROM skew_join
- UNION ALL
- SELECT * FROM common_join;
复制代码 6、job优化
6.1、map端优化
- 读取小文件优化:小文件过多时大量切片信息和task元数据对driver内存压力过大,读取性能差,带来单点问题。可通过2个参数控制读取小文件合并(见代码块):
N 个小文件总巨细 + (N-1)*openCostInBytes <= maxPartitionBytes 时,N个小文件可以合并读取进同一个分区。
以下配置目的都是通过控制分区巨细->控制map端分区数->控制小文件合并
- ##一个分区最大数据大小
- SET spark.sql.files.maxPartitionBytes=128MB
- ##打开一个文件的开销
- SET spark.files.openCostInBytes=4194304
- ##对于hdfs文件,可以直接设置每分区目标读入大小
- SET spark.hadoopRDD.targetBytesInPartition=67108864
复制代码
- 增加溢写输出流buffer
map端Shuffle Write三个关键参数
①缓冲区。决定是否溢写到磁盘,初始阈值5m,资源足够会2倍扩容,申请不到内存则溢写。internal,指定无效
②溢写时使用的输出流缓冲区。默认 32k,该区减少了磁盘搜索和系统调用次数,得当提高可以提升溢写服从。
③Shuffle文件批次。默认1万条。shuffle涉及到序列化,批次太低会导致在序列化时过度复制,internal,指定无效。
综上,优化空间只有溢写输出流buffer
- ##默认32k,可以调整到64k或128k,具体结合机器资源
- SET spark.buffer.size=65536
复制代码 6.2、reduce端优化
- 合理设置reduce个数。即shuffle并行度,参考3.2、CPU章节。
- 输出小文件优化
– join后写入场景:文件数为shuffle并行度。必要通过合理调整并行度来控制输出文件数。必要时通过repartition等算子单独处理。
– 动态分区场景:最坏环境,各分区全部离散,文件数为shuffle并行度*分区数。最理想环境,同一分区字段的数据全部hash到同一分区,即在写入前做distribute by分区字段。但会出现数据倾斜环境,可以将不倾斜的key正常distribute by,对倾斜的key做distribute by随机数
distribute by cast(rand() * 5 as int)
- reduce拉取数据相干优化
– 增大reduce缓冲区。资源充足时可增大缓冲区,减少拉取次数,减少网络io开销;
– 增加reduce拉取重试次数。对于超大数据量的shuffle过程,大概由于网络异常或full gc等缘故原由拉取失败,增大重试次数可以提升稳定性;
– 增加reduce拉取等候时间。拉取失败重试的隔断。增大提升稳定性
- ##reduce缓冲区,默认48mb
- SET spark.reducer.maxSizeInFlight=64MB;
- ##拉取重试次数,默认3
- SET spark.shuffle.io.maxRetries=60;
- ##重试等待时长,默认5s
- SET spark.shuffle.io.retryWait=60s
复制代码
- 合理使用bypass:Shuffle写入中跳过数据举行排序过程,提升性能
适用于不必要排序的任务。设置bypass阈值大于shuffle read tasks(并行度)开启强制bypass
SET spark.shuffle.sort.bypassMergeThreshold=10000;
- 提升毗连等候时长
executor长途网络传输获取其他节点数据时,如果遇到gc(大概由于task创建对象过大等缘故原由)则全部暂停相应,会导致网络超时,如果毗连等候时间过短(默认120s)会导致任务易失败。可考虑得当增加保障稳定性。
- SET spark.core.connection.ack.wait.timeout=300s
复制代码 job优化总结:
– map端偏重于shuffle write环节的优化,涉及磁盘IO开销;
– reduce端偏重于shuffle read环节的优化,涉及网络传输与内存解压反序列化开销。
7、DPP
动态分区裁剪:两表在使用分区字段做join时的优化策略
- 原理:将join一侧作为子查询盘算出来,再将其全部分区用到join另一侧作为过滤条件,减少关联时的数据量
- 使用条件:
– 裁剪左表,只能inner、right、left semi join,反之同理(如果left join,右表无论有无这个分区,左表全量都在,无裁剪意义);
– 被裁剪的表,关联键必要有分区字段;
– 另一张表(充当裁剪条件的表)必须有过滤条件。
- 使用方法:
SET spark.sql.optimizer.dynamicPartitionPruning.enabled=true
7、常规故障处理
7.1、OOM
- reduce端拉取缓冲区过大。前序提到可以通过增大来提升性能,但当资源不充足的环境下,且map端写出速率非常快、数据量大时,因reduce拉取盘算是动态即时的,大概会导致盘算内存不够。此时适量低落缓冲区巨细至默认以下,可以避免OOM,范例的以性能换执行。(参考6.2、reduce端优化)
- cluster模式JVM永世代不敷。client模式JVM永世代默认128m,而cluster模式默认82m。对于特别复杂的sparksql代码,必要解析语义、转换语法树等,重要占用PermGen,大概会导致cluster模式下OOM,可以适量调大。(如下,将默认调到128m,最大调到256m)
SET spark.driver.extraJavaOptions="-XX ermSize=128M -XX:MaxPermSize=256M"
- SQL代码中大量or导致driver栈溢出。大量or在解析时使用递归处理,过多方法层级调用会将栈打满。发起每条sql控制or关键字在100个以内,超出后拆分处理。
7.2、其他
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |