EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework
作者:周克勇,混名一锤,阿里巴巴计算平台事业部EMR团队技能专家,大数据领域技能爱好者,对Spark有浓厚爱好和一定的相识,目前重要专注于EMR产品中开源计算引擎的优化工作。背景和动机
SparkSQL多年来的性能优化集中在Optimizer和Runtime两个领域。前者的目的是为了获得最优的执行筹划,后者的目的是针对既定的筹划尽大概执行的更快。
相比于Runtime,Optimizer是更加通用的、跟实现无关的优化。无论是Java世界(Spark, Hive)还是C++世界(Impala, MaxCompute),无论是Batch-Based(Spark, Hive)还是MPP-Based(Impala, Presto),甚至无论是大数据领域还是传统数据库领域亦或HTAP领域(HyPer, ADB),在Optimizer层面考虑的都是非常雷同的问题: Stats网络,Cost评估以及筹划选择;采用的优化技能也比较雷同,如JoinReorder, CTE, GroupKey Elimination等。尽管由于上下文差别(如是否有索引)在Cost Model的构造上会有差别,或者特定场景下采用差别的空间搜刮策略(如遗传算法 vs. 动态规划),但方法大体是相同的。
恒久以来,Runtime的优化工作基本聚焦在解决当时的硬件瓶颈。如MapReduce刚出来时网络带宽是瓶颈,以是Google做了很多Locality方面的优化;Spark刚出来时解决的问题是磁盘IO,内存缓存的设计使得性能相比MapReduce有了数目级的提升;厥后CPU成为了新的瓶颈,因此提升CPU性能成了近年来Runtime领域重要的优化方向。
提升CPU性能的两个主流技能是以MonetDB/X100(如今演化为VectorWise)为代表的向量化(Vectorized Processing)技能和以HyPer为代表的代码天生(CodeGen)技能(其中Spark跟进的是CodeGen)。简朴来说,向量化技能沿用了火山模型,但与其让SQL算子每次计算一条Record,向量化技能会积攒一批数据后再执行。逐批计算相比于逐条计算有了更大的优化空间,例如虚函数的开销分摊,SIMD优化,更加Cache友爱等。这个技能的劣势在于算子之间转达的数据从条变成了批,因此增大了中间数据的物化开销。CodeGen技能从别的一个角度解决虚函数开销和中间数据物化问题:算子融合。简朴来说,CodeGen框架通过打破算子之间的界限把火山模型“压平”了,把原来迭代器链压缩成了大的for循环,同时天生语义相同的代码(Java/C++/LLVM),紧接着用对应的工具链编译天生的代码,末了用编译后的class(Java)或so(C++,LLVM)去执行,从而把表明执行转变成了编译执行。此外,尽管还是逐条执行,由于抹去了函数调用,一条Record从(Stage内的)初始算子不绝执行到竣事算子都基本处于寄存器中,不会物化到内存。CodeGen技能的劣势在于难以应用SIMD等优化。
两个门派相爱相杀,在经历了互相发论文验证自家优于对方后两家走向了合作,合作产出了一系列项目和论文,而目前学界的主流看法也是两者融合是最优解,一些采用融合做法的项目也应运而生,如进化版HyPer, Pelonton等。
尽管学界已走到了融合,业界主流却没有很强的动力往融合的路子走,探究其重要缘故原由一是目前融合的做法相比单独的优化并没有质的提升;二是融合技能目前没有一个广为担当的最优做法,还在探索阶段;三是业界在单一的技能上还没有发挥出最大潜力。以SparkSQL为例,从2015年SparkSQL首次露面自带的Expression级别的Codegen,到厥后参考HyPer实现的WholeStage Codegen,再颠末多年的打磨,SparkSQL的Codegen技能已趋成熟,性能也获得了两次数目级的跃升。然而,也许是出于可维护性或开辟者担当度的考虑,SparkSQL的Codegen不绝限定在天生Java代码,并没有尝试过NativeCode(C/C++, LLVM)。尽管Java的性能已经很优,但相比于Native Code还是有一定的Overhead,并缺乏SIMD(Java在做这方面feature),Prefetch等语义,更重要的是,Native Code直接操作裸金属,易于极致压榨硬件性能,对一些加速器(如GPU)或新硬件(如AEP)的支持也更方便。
基于以上动机,EMR团队探索并开辟了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来20%左右的性能提升,为EMR再次获取世界第一立下汗马功劳,本文将具体介绍Native Codegen框架。
核心问题
做Native Codegen,核心问题有三个:
1.天生什么?
2.怎么天生?
3.如何集成到Spark?
天生什么
针对天生什么代码,结合调研的效果以及开辟同学的技能栈,有三个候选项:C/C++, LLVM, Weld IR。C/C++的优势是实现相对简朴,只需对照Spark天生的Java代码逻辑改写即可,劣势是编译时间过长,下图是HyPer的测评数据,C++的编译时间比LLVM高了一个数目级。
https://img-blog.csdnimg.cn/img_convert/957411a76175b5380cb4fc0f8901e53e.png
编译时间过长对小query很不友爱,极度case编译时间比运行时间还要长。基于这个考虑,我们清除了C/C++选项。上图看上去LLVM的编译时间非常友爱,而且很多Native CodeGen的引擎,如HyPer, Impala, 以及阿里云自研大数据引擎MaxCompute,ADB等,均采用了LLVM作为目的代码。LLVM对我们来说(对你们则不一定:D)最大的劣势就是过于底层,语法靠近于汇编,试想用汇编重写SparkSQL算子的工作量会有多酸爽。大多数引擎也不会用LLVM写全量代码,如HyPer仅把算子核心逻辑用LLVM天生,其他通用功能(如spill,复杂数据布局管理等)用C++编写并提前编译好。即使LLVM+C++节省了不少工作量,对我们来说依然不可担当,因此我们把眼光转向了第三个选项: Weld IR(Intermediate Representation)。
首先简短介绍以下Weld。Weld的作者Shoumik Palkar是 Matei Zaharia的学生,后者大家一定很认识,Spark的作者。Weld最初想解决的问题是差别lib之间互相调用时数据传输的开销,例如要在pandas里调用numpy的接口,首先pandas把数据写入内存,然后numpy读取内存进行计算,对于极度优化的lib来说,内存的写入和读取的时间大概会远超计算自己。针对这个问题,Weld开辟了Common Runtime并配套提供了一组IR,再加上惰性求值的特性,只需(简朴)修改lib使其符合Weld的规范,便可以做到差别lib共用Weld Runtime,Weld Runtime利用惰性求值实现跨lib的Pipeline,从而省去数据物化的开销。Weld Runtime还做了多少优化,如循环融合,循环睁开,向量化,自顺应执行等。此外,Weld支持调用C代码,可以方便调用三方库。
我们感爱好的是Weld提供的IR和对应的Runtime。Weld IR面向数据分析进行设计,因此语义上跟SQL非常靠近,能较好的表达算子。数据布局层面,Weld IR最核心的数据布局是vec和struct,能较好地表达SparkSQL的UnsafeRow Batch;基于struct和vec可以构造dict,能较好的表达SQL里重度使用的Hash布局。操作层面,Weld IR提供了类函数式语言的语义,如map, filter, iterator等,共同builder语义,能方便的表达Project, Filter, Agg, BroadCastJoin等算子语义。例如,以下IR表达了Filter + Project语义,具体含义是若第二列大于10,则返回第一列:
|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b)) 以下IR表达了groupBy的语义,具体含义是按照第一列做groupBy来计算第二列的sum:
|v: vec[{i32,i32}]| for(v,dictmerger,|b,i,n| merge(b,{n.$0,n.$1})) 具体的语法定义请参考Weld文档(https://github.com/weld-project/weld/blob/master/docs/language.md)。
Weld 开辟者API提供了两个核型接口:
[*] weld_module_compile, 把Weld IR编译成可执行模块(module)。
[*] weld_module_run, 执行编译好的模块。
基本流程如下图所示,最终也是天生LLVM代码。
https://img-blog.csdnimg.cn/img_convert/1eedc4fbf1135c8e6ed4d2b46f7038ac.png
由此,Weld IR的优势就显然易见了,既兼顾了性能(最终天生LLVM代码),又兼顾了易用性(CodeGen Weld IR相比LLVM, C++方便很多)。基于这些考虑,我们最终选择Weld IR作为目的代码。
怎么天生
SparkSQL原有的CodeGen框架之前简朴介绍过了,详见https://developer.aliyun.com/article/727277。我们参考了Spark原有的做法,支持了表达式级别,算子级别,以及WholeStage级别的Codegen。复用Producer-Consumer框架,每个算子负责天生自己的代码,末了由WholeStageCodeGenExec负责组装。
这个过程有两个关键问题:
1.算子之间传输的介质是什么?
2.如那边理Weld不支持的算子?
传输介质
差别于Java,Weld IR不提供循环布局,取而代之的是vec布局和其上的泛迭代器操作,因此Weld IR难以借鉴Java Codegen在Stage外层套个大循环,然后每个算子处置惩罚一条Record的模式,取而代之的做法是每个算子处置惩罚一批数据,IR层面做假物化,然后依赖Weld的Loop-Fusion优化去消除物化。例如前面提到的Filter后接Project,Filter算子天生的IR如下,过滤掉第二列<=10的数据:
|v:vec[{i32,i32}]| let res_fil = for(v,appender,|b,i,n| if(n.$1>10, merge(b,n), b) Project算子天生的IR如下,返回第一列数据:
let res_proj = for(res_fil,appender,|b,i,n| merge(b,n.$0)) 表面上看上去Filter算子会把中间效果做物化,现实上Weld的Loop-Fusion优化器会消除此次物化,优化后代码如下:
|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b)) 尽管依赖Weld的Loop-Fusion优化可以极大简化CodeGen的逻辑,但开辟中我们发现Loop-Fusion过程非常耗时,对于复杂SQL(嵌套3层以上)甚至无法在有限时间给出效果。当时面临两个选择:修改Weld的实现,或者修改CodeGen直接天生Loop-Fusion之后的代码,我们选择了后者。重构后天生的代码如下,其中1,2,11行由Scan算子天生,3,4,5,6,8,9,10行由Filter算子天生,7行由Project算子天生。
|v: vec[{i32,i32}]|
for(v,appender,|b,i,n|
if(
n.$1 > 10,
merge(
b,
n.$0
),
b
)
) 这个优化使得编译时间重回亚秒级别。
Fallback机制
受限于Weld当前的表达能力,一些算子无法用Weld实现,例如SortMergeJoin,Rollup等。即使是原版的Java CodeGen,一些算子如Outter Join也不支持CodeGen,因此如何做好Fallback是保证正确性的前提。我们采用的策略很直观:若当前算子不支持Native CodeGen,则由Java CodeGen接管。这里涉及的关键问题是Fallback的粒度:是算子级别还是Stage级别?
抛去实现难度不谈,固然直观上算子粒度的Fallback更加合理,但现实上却会导致更严重的问题:Stage内部Pipeline的断裂。如上文所述,CodeGen的一个优势是把整个Stage的逻辑Pipeline化,打破算子之间的界限,单条Record从初始算子执行到竣事算子,整个过程不存在物化。而算子粒度的Fallback则会导致Stage内部一部分走Native Runtime,另一部分走Java Runtime,则两者毗连处无可克制存在中间数据物化,这个开销通常会大于Native Runtime带来的收益。
基于以上考虑,我们选择了Stage级别的Fallback,在CodeGen阶段一旦遇到不支持的算子,则整个Stage都Fallback到Java CodeGen。统计显示,整个TPCDS Benchmark,命中Native CodeGen的Stage到达80%。
Spark集成
完成了代码天生和Fallback机制,末了的问题就是如何跟Spark集成了。Spark的WholeStageCodegenExec的执行可以明白为一个黑盒,无论上游是Table Scan,Shuffle Read,还是BroadCast,给到黑盒的输入类型只有两种: RowBatch(上游是Table Scan)或Row Iterator(上游非Table Scan),而黑盒的输出固定为Row Iterator,如下图所示:
https://img-blog.csdnimg.cn/img_convert/8c6a268216eed4103da7ea70cfc46a91.png
上文介绍我们选择了Stage级别的Fallback,也就决定了黑盒要么是Java Runtime,要么是Native Runtime,不存在混淆的情况,因此我们只需要关心如何把Row Batch/Row Iterator转化为Weld认识的内存布局,以及如何把Weld的输出转化成Row Iterator即可。为了进一步简化问题,我们留意到,尽管Shuffle Reader/BroadCast的输入是Row Iterator,但本质上远端序列化的数据布局是Row Batch,只不过Spark反序列化后转换成Row Iterator后再喂给CodeGen Module,RowBatch包装成Row Iterator非常浅易。因此Native Runtime的输入输出可以同一成RowBatch。
解决办法呼之欲出了:把RowBatch转换成Weld vec!但我们更进了一步,何不直接把Row Batch喂给Weld从而省去内存转换呢?本质上Row Batch也是满足某种规范的字节省而已,Spark也提供了OffHeap模式把内存直接存堆外(仅针对Scan Stage。Shuffle数据和Broadcast数据需要读到堆外),Weld可以直接访问。Spark UnsafeRow的内存布局大抵如下:
https://img-blog.csdnimg.cn/img_convert/842c90ebda35cb1ecc81a1a057233c72.png
针对确定的schema,null bitmap和fixed-length data的布局是固定的,可以映射成struct,而针对var-length data我们的做法是把这些数据copy到一连的内存地点中。云云一来,针对无变长数据的RowBatch,我们直接把内存块喂给Weld;针对有变长部分的数据,我们也只需做大粒度的内存拷贝(把定长部分和变长部分分别拷出来),而无需做列级别的细粒度拷贝转换。
继续举前文的Filter+Project的例子,一条Record包罗两个int列,其UnsafeRow的内存布局如下(为了对齐,Spark里定长部分最少使用8字节)。
https://img-blog.csdnimg.cn/img_convert/ef03892406053e325de2fb7208a0f76c.png
显而易见,这个布局可以很方便映射成Weld struct:
{i64,i64,i64} 而整个Row Batch便映射成Weld vec:
vec[{i64,i64,i64}] 云云便解决了Input的问题。而Weld Output转RowBatch本质是以上过程的逆向操作,不再赘述。
解决了Java和Native之间的数据转换问题,剩下的就是如何执行了。首先我们根据当前Stage的Mode来决定走Java Runtime还是Native Runtime。在Native分支,首先会执行StageInit做Stage级别的初始化工作,包括初始化Weld,加载编译好的Weld Module,拉取Broadcast数据(若有)等;接着是一个循环,每个循环读取一个RowBatch(来自Scan或Shuffle Reader)喂给Native Runtime执行,Output转换并喂给Shuffle Writer。如下图所示:
https://img-blog.csdnimg.cn/img_convert/cbb9f306bbd9c15e5f5c42ef5a3192f9.png
总结
本文介绍了EMR团队在Spark Native Codegen方向的探索实践,限于篇幅多少技能点和优化没有睁开,后续可另开文详解,例如:
1.极致Native算子优化
2.数据转换详解
3.Weld Dict优化
大家感爱好的任何内容欢迎沟通: )
Making Sense of Performance in Data Analytics Frameworks. Kay Ousterhout
MonetDB/X100: Hyper-Pipelining Query Execution. Peter Boncz
Vectorwise: a Vectorized Analytical DBMS. Marcin Zukowski
Efficiently Compiling Efficient Query Plans for Modern Hardware. Thomas Neumann
HyPer: A Hybrid OLTP&OLAP Main Memory Database System Based on Virtual Memory Snapshots. Alfons Kemper
Data Blocks: Hybrid OLTP and OLAP on Compressed Storage using both Vectorization and Compilation. Harald Lang
Relaxed Operator Fusion for In-Memory Databases: Making Compilation, Vectorization, and Prefetching Work Together At Last. Prashanth Menon
Vectorization vs. Compilation in Query Execution. Juliusz Sompolski
https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
查看更多内容,欢迎访问天池技能圈官方地点:EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework_天池技能圈-阿里云天池
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]