SparkSql oom原因以及参数调优+数据倾斜解决方案

打印 上一主题 下一主题

主题 889|帖子 889|积分 2677

1、Spark历史版本对比

spark1 vs spark2 vs spark3 
1、spark1引入内存盘算的理念,解决中央结果落盘导致的服从低下。在理想状态下性能可达到MR的100倍。虽然提高了一定的盘算服从,但也带来了大量的内存管理问题,典型的如内存oom问题频发。
2、spark2引入了Tungsten引擎,关键算子服从上比Spark1提拔了10倍。启用“统一内存管理”,不再使用“静态内存管理”,不再使用“静态内存管理”,oom问题大幅下降
3、spark3启用自顺应查询(Adaptive Query Execution)


  • 1.动态合并shuffle partitions

    可以简化甚至避免调整shuffle分区的数目。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区合并为较大的分区。
  • 2.动态调整join策略

    在一定水平上避免由于缺少统计信息或着错误估计巨细(当然也可能两种环境同时存在),而导致执行次优筹划的环境。这种自顺应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提拔性能
  • 3.动态优化倾斜的join(skew joins)

    skew joins可能导致负载的极端不均衡,并严肃降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。
Spark内存管理:https://zhuanlan.zhihu.com/p/642722131?utm_id=0
2、SparkSql oom的因素

sparksql oom的因素是多样的,一个程序,从输入、处理2个方面进行考量
2.1 输入数据因素

1、文件格式&压缩

现在我司新建表默认使用parquet格式

 对于snappy压缩,hivesql参考表的“parquet.compression”属性决定是否压缩,采用什么压缩方法,sparksql遇到parquet表默认snappy压缩。


  • parquet相比于text、csv格式广泛有3-4倍的压缩比
  • snappy相比于非压缩广泛有3-4倍的压缩比
  • parquet+snappy相比于原始的text、csv达到6-8倍的压缩比,甚至更高
2、数据文件的巨细

  数仓集群默认的文件巨细是256M,大数据是分布式盘算,广泛存在倾斜不均的情形,加之程序自身的控制,造成了数据文件的巨细不一。
  尽管spark有输入数据的拆分能力,但扔存在拆分不均匀的可能。例如:原始文件的巨细为300M,将拆分为256M+44M 2个文件,他们的数据规模相差5.8倍。按照1个task处理一个文件的设定,分得256M文件的task将更容易出现oom
3、数据的型态分布

  如字段中包含大量的null、枚举字段大量的相同值等,parquet+snappy将导致实际压缩比进一步提拔。
4、大字段如xml,json

  string字段范例,如此中存储的是xml、json等字段(字符长度达千、万字符数级的称为大字段),parquet+snappy导致实际压缩比进一步提拔
2.2 运行环境因素

  环境因素也是oom的紧张原因,表现在spark默认参数、yarn集群稳固性
1、spark默认参数

  定义在集群的spark.defaults中,每一个提交到集群的spark作业都默认使用这套参数。影响oom的主要参数:


  • spark.executor.cores=4 --executor 4cpu,可并行运行4 task
  • spark.executor.memory=12g --executor 12g内存,task间共享(不发起调整)
  • spark.executor.memoryOverhead=2g --executor 2g堆外内存,task间共享(不发起调整)
  • spark.sql.files.maxPartitionBytes=268435456(256m) --读取数据文件时,单个partition或task最大256m、文件超过256m即拆分
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum=none --aqe动态盘算shuffle阶段partitions的最大值,none时向下取spark.sql.shuffle.partitions
  • spark.sql.shuffle.partitions=400 --shuffle阶段最大400个partition,数据规模很大发起调整
  在spark ui 中可以观察任务的具体参数值

2、yarn集群稳固性

  yarn集群稳固性包罗yarn资源调度、yarn节点负载高低等。
3、Spark sql的参数调优

1、input阶段

  input阶段将表或数据文件加载到executor,先辈行解压(如果有,比如snappy)、展开(比如parquet、orc格式),这些都是在内存中进行,内存不敷时是不可溢写磁盘的。


  • 数据文件(拆分后巨细差别导致oom)



  • 数据文件(拆分后)巨细相似,大JSON字段、高压缩比导致oom

 2、shuffle阶段

  当sql出现出现distinct join、groupby等关键字时,spark须要进行shuffle操作。shuffle操作将前一个或多个阶段的分片数据拉取到同一个excutor进行聚合处理(即我们通常说的reduce),如shuffle partition或者说task数过小,oom是大概率时间。


  • shuffle阶段概要:多少个task执行读/写操作



  •  shuffle阶段详情:读/写是否均匀、是否倾斜

 3.1 参数调优

    现在executor默认设置的是12G+2G内存、4核CPU,最大可以有4个task同时并行(每颗CPU执行一个task),这4个task共享14G内存。一样平常调优可以如下:
  1、spark.executors.cores
    不论在哪个阶段发生oom,降低executor的负载总是可行的。可调整spark.executor.cores=2,即单个executor同时执行2个task,如许每个task约莫可使用7G内存。
  2、spark.sql.adaptive.coalescePartitions.initialPartitionNum/spark.sql.shuffle.partitions
    shuffle阶段的oom,说明shuffle partitions数比较小,导致上阶段的“大规模数据”在本阶段“更聚集”。可调整该参数为更大的值,使大规模数据更分散。


  • spark1,没有spark.sql.adaptive.coalescePartitions.initialPartitionNum参数,固定partitions值为400,调大会造成碎片化
  • spark2,没有spark.sql.adaptive.coalescePartitions.initialPartitionNum参数,可调整spark.sql.shuffle.partitions(配合AQE结果不错)
  • spark3,优先调整spark.sql.adaptive.coalescePartitions.initialPartitionNumtions(是AQE的系列参数之一,默认none时向下取spark.sql.shuffle.partitions)
    *注意:更大的shuffle partitions数固然能减轻shuffle阶段的oom概率,但须要向集群申请更多的服务器资源,不可无限调大(发起控制在2000以内)
方案实现原理:增长shuffle read task的数目,可以让原天职配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个 key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增长了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么天然每个task的执行时间都会变短了。
3、spark.sql.files.maxPartitionBytes
  对于input阶段出现的oom,通常原因是输入数据的超高压缩比,如:json、xml大字段、null值占比高、枚举字段多等。
  spark.executor.cores值仍不起作用时,可渐渐调小spark.sql.files.maxPartitionBytes=134217728(128m)/67108864(64m)
  调小spark.executor.cores、spark.sql.files.maxPartitionBytes值的本质都是减小executor的负载,spark.executor.cores淘汰了task的并行度,而spark.sql.files.maxPartitionBytes减小了单个task处理的数据规模。以是:


  •   优先调整spark.executor.cores,验证是否扔有oom
  •   确认不起作用后,再调整spark.sql.files.maxPartitionBytes,此时发起恢复spark.executor.cores的值(如不恢复,相当于2*2=4倍的负载降低、对于单executor14g的内存资源是巨大浪费)
  *注意:spark.sql.files.maxPartitionBytes不发起调得过小(比如32m、16m),过小须要向集群申请更多executor及盘算资源、集群负载增大,也可能导致数据文件碎片化
4、数据倾斜解决方案

https://download.csdn.net/blog/column/8735546/111667492

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

篮之新喜

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表