本文旨在网络整理ODPS开发中入门及进阶层知识,尽可能涵盖大多数ODPS开发问题,成为一本mini百科全书,后续也会一连更新。希望通过笔者的梳理和明白,资助刚接触ODPS开发的同学快速上手。
本系列分为两部分:入门篇和进阶篇。
ODPS开发大全:入门篇
常用参数设置
常用的调整无外乎调整map、join、reduce的个数,map、join、reduce的内存大小。
以ODPS的参数设置为例,参数可能因版本不同而略有差异。
参数类型
| 详细使用
| | set odps.sql.mapper.cpu=100
作用:设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。场景:某些使命如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql使命来说,一般不需要调整Cpu个数的。
set odps.sql.mapper.memory=1024
作用:设定Map Task每个Instance的Memory大小,单元M,默认1024M,在[256,12288]之间调整。场景:当Map阶段的Instance有Writer Dumps时,可以适当的增加内存大小,淘汰Dumps所花的时间。
set odps.sql.mapper.merge.limit.size=64
作用:设定控制文件被合并的最大阈值,单元M,默认64M,在[0,Integer.MAX_VALUE]之间调整。场景:当Map端每个Instance读入的数据量不匀称时,可以通过设置这个变量值举行小文件的合并,使得每个Instance的读入文件匀称。一般会和odps.sql.mapper.split.size这个参数团结使用。
set odps.sql.mapper.split.size=256
作用:设定一个Map的最大数据输入量,可以通过设置这个变量达到对Map端输入的控制,单元M,默认256M,在[1,Integer.MAX_VALUE]之间调整。场景:当每个Map Instance处理的数据量比力大,时间比力长,而且没有发生长尾时,可以适当调小这个参数。如果有发生长尾,则团结odps.sql.mapper.merge.limit.size这个参数设置每个Map的输入数量。
| 2. Join设置
| set odps.sql.joiner.instances=-1
作用: 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS可以或许自动设定的最大值为1111,手动设定的最大值为2000,走HBO时可以凌驾2000。场景:每个Join Instance处理的数据量比力大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set odps.sql.joiner.cpu=100
作用: 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。场景:某些使命如果特别耗计算资源的话,可以适当调整CPU数目。对于大多数SQL使命来说,一般不需要调整CPU。
set odps.sql.joiner.memory=1024
作用:设定Join Task每个Instance的Memory大小,单元为M,默认为1024M,在[256,12288]之间调整。场景:当Join阶段的Instance有Writer Dumps时,可以适当的增加内存大小,淘汰Dumps所花的时间。
| 3. Reduce设置
| set odps.sql.reducer.instances=-1
作用: 设定Reduce Task的Instance数量,手动设置区间在[1,99999]之间调整。不走HBO优化时,ODPS可以或许自动设定的最大值为1111,手动设定的最大值为99999,走HBO优化时可以凌驾99999。场景:每个Join Instance处理的数据量比力大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set odps.sql.reducer.cpu=100
作用:设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整。场景:某些使命如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql使命来说,一般不需要调整Cpu。
set odps.sql.reducer.memory=1024
作用:设定Reduce Task每个Instance的Memory大小,单元M,默认1024M,在[256,12288]之间调整。场景:当Reduce阶段的Instance有Writer Dumps时,可以适当的增加内存的大小,淘汰Dumps所花的时间。
上面这些参数虽然好用,但是也过于简朴暴力,可能会对集群产生一定的压力。特别是在集群整体资源紧张的情况下,增加资源的方法可能得不到应有的结果,随着资源的增大,等待资源的时间变长的风险也随之增加,导致结果不好!因此请合理的使用资源参数!
| 4. 小文件合并参数
| set odps.merge.cross.paths=true|false
作用:设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区天生独立的Merge Action举行合并,所以对于odps.merge.cross.paths设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件。
set odps.merge.smallfile.filesize.threshold = 64
作用:设置合并文件的小文件大小阀值,文件大小凌驾该阀值,则不举行合并,单元为M,可以不设,不设时,则使用全局变量odps_g_merge_filesize_threshold,该值默认为32M,设置时必须大于32M。
set odps.merge.maxmerged.filesize.threshold = 256
作用:设置合并输出文件量的大小,输出文件大于该阀值,则创建新的输出文件,单元为M,可以不设,不设时,则使用全局变odps_g_max_merged_filesize_threshold,该值默认为256M,设置时必须大于256M。
set odps.merge.max.filenumber.per.instance = 10000
作用:设置合并Fuxi Job的单个Instance允许合并的小文件个数,控制合并并行的Fuxi Instance数,可以不设,不设时,则使用全局变量odps_g_merge_files_per_instance,该值默认为100,在一个Merge使掷中,需要的Fuxi Instance个数至少为该目次下面的总文件个数除以该限制。
set odps.merge.max.filenumber.per.job = 10000
作用:设置合并最大的小文件个数,小文件数量凌驾该限制,则凌驾限制部分的文件忽略,不举行合并,可以不设,不设时,则使用全局变量odps_g_max_merge_files,该值默认为10000。
| 5. UDF相干参数
| set odps.sql.udf.jvm.memory=1024
作用: 设定UDF JVM Heap使用的最大内存,单元M,默认1024M,在[256,12288]之间调整。场景:某些UDF在内存计算、排序的数据量比力大时,会报内存溢出错误,这时候可以调大该参数,不外这个方法只能暂时缓解,还是需要从业务上去优化。
set odps.sql.udf.timeout=1800
作用:设置UDF超时时间,默认为1800秒,单元秒。[0,3600]之间调整。
set odps.sql.udf.python.memory=256
作用:设定UDF python 使用的最大内存,单元M,默认256M。[64,3072]之间调整。
set odps.sql.udf.optimize.reuse=true/false
作用:开启后,相同的UDF函数表达式,只计算一次,可以提高性能,默认为True。
set odps.sql.udf.strict.mode=false/true
作用:True为金融模式,False为淘宝模式,控制有些函数在遇到脏数据时是返回NULL还是抛非常,True是抛出非常,False是返回null。
| 6. Mapjoin设置
| set odps.sql.mapjoin.memory.max=512
作用:设置Mapjoin时小表的最大内存,默认512,单元M,[128,2048]之间调整。
| 7. 动态分区设置
| set odps.sql.reshuffle.dynamicpt=true/false
作用:默认true,用于避免拆分动态分区时产生过多小文件。如果天生的动态分区个数只会是很少几个,设为false避免数据倾斜。
| 8. 数据倾斜设置
| set odps.sql.groupby.skewindata=true/false
作用:开启Group By优化。
set odps.sql.skewjoin=true/false
作用:开启Join优化,必须设置odps.sql.skewinfo 才有效。
| 常用内建函数大概分为这几类,这边我们挑选一些重点的函数举行阐明。
函数类型
| 阐明
| 日期函数
| 支持处理DATE、DATETIME、TIMESTAMP等日期类型数据,实现加减日期、计算日期差值、提取日期字段、获取当前时间、转换日期格式等业务处理本领。
| 数学函数
| 支持处理BIGINT、DOUBLE、DECIMAL、FLOAT等数值类型数据,实现转换进制、数学运算、四舍五入、获取随机数等业务处理本领。
| 窗口函数
| 支持在指定的开窗列中,实现求和、求最大最小值、求匀称值、求中央值、数值排序、数值偏移、抽样等业务处理本领。
| 聚合函数
| 支持将多条输入纪录聚合成一条输出值,实现求和、求匀称值、求最大最小值、求匀称值、参数聚合、字符勾通接等业务处理本领。
| 字符串函数
| 支持处理STRING类型字符串,实现截取字符串、替换字符串、查找字符串、转换大小写、转换字符串格式等业务处理本领。
| 复杂类型函数
| 支持处理MAP、ARRAY、STRUCT及JSON类型数据,实现去重元素、聚合元素、元素排序、合并元素等业务处理本领。
| 加密函数
| 支持处理STRING、BINARY类型的表数据,实现加密、解密等业务处理本领。
| 其他函数
| 除上述函数之外,提供支持其他业务场景的函数。
| ▐ 日期函数
函数名
| 详细操作
| | SELECTDATEADD(GETDATE(),-7,'dd');
TO_CHAR(DATEADD(GETDATE(),-7,'dd'),'yyyymmdd');
| 2. DATEADD(指定日期加减):
| SELECT DATEADD(GETDATE(),1,"dd"); //2021-01-09 10:48:40 GETDATE()返回值是2021-01-08 10:48:40
SELECT DATEADD(GETDATE(),1,"mm");//2021-02-08 10:49:24 GETDATE()返回值是2021-01-08 10:49:24
| 3. DATEDIFF(计算两个日期的差值):
| datediff(end, start, 'dd') = 1
datediff(end, start, 'mm') = 1
datediff(end, start, 'yyyy') = 1
datediff(end, start, 'hh') = 1
| 4. DATEPART(返回指定日期的年/月/日):
| SELECT DATEPART(GETDATE(),'mm');
SELECT DATEPART(GETDATE(),'yyyy');
SELECT DATEPART(GETDATE(),'dd');
SELECT DATEPART(GETDATE(),'hh');
| 5. DATETRUNC(截取时间):
| datetrunc(datetime '2011-12-07 16:28:46', 'yyyy') = 2011-01-01 00:00:00
datetrunc(datetime '2011-12-07 16:28:46', 'month') = 2011-12-01 00:00:00
datetrunc(datetime '2011-12-07 16:28:46', 'DD') = 2011-12-07 00:00:00
| 6. TO_CHAR 函数
| 使用方式 to_char(要处理的日期,日期格式)
推荐用法:2018-01-11 10:00:00 格式 ,处理为指定格式的日期字符串
结果:处理为yyyymmdd的日期格式,类型为字符串
to_char('2018-01-11 10:00:00','yyyymmdd') as date_3
to_char('2018-01-11 10:00:00','yyyymmdd hh:mi:ss') as date_5
to_char('2018-01-11 10:00:00','yyyy-mm-dd hh:mi:ss') as date_6
to_char('2018-01-11 10:00:00','yyyy-mm-dd 00:00:00') as date_8
to_char('2018-01-11 10:00:00','yyyy-mm-01 23:59:59') as date_9
| 7. TO_DATE函数
| 使用方式:to_date(datetime,format)
推荐用法:根据时间的格式,适当调整format的模版
结果:处理20180111、2018-01-11、2018-01-11 10:00:00
to_char('2018-01-11 10:00:00','yyyymmdd') as date_3
to_char('2018-01-11 10:00:00','yyyymmdd hh:mi:ss') as date_5
to_char('2018-01-11 10:00:00','yyyy-mm-dd hh:mi:ss') as date_6
to_char('2018-01-11 10:00:00','yyyy-mm-dd 00:00:00') as date_8
| 8. UNIX时间戳转换
| 函数:datetime from_unixtime(bigint unixtime) 支持秒
from_unixtime(123456789) = 1973-11-30 05:33:09
函数: from_utc_timestamp(bigint unixtime,string timezone) 支持毫秒
SELECT from_utc_timestamp(1501557840000 ,'GMT') ;
--返回:2017-08-01 04:24:00
| 9. DATE转UNIX时间戳
| 函数:bigint unix_timestamp(datetime date)
select unix_timestamp(datetime '2019-09-20 01:00:00');
--返回1568912400
select unix_timestamp('2019-09-20 01:00:00');
--返回1568912400
| ▐ 字符串函数
函数1. SPLIT
场景:将字符串列按照分隔符(支持正则表达式)分割后返回数组。
例子:
split("浙江省-杭州市-余杭区", "-") 返回["浙江省", "杭州市", "余杭区"]
如果需要解析出杭州市,加上index即可,split("浙江省-杭州市-余杭区", "-")[1],下标从0开始计数。
函数2. SPLIT_PART
场景:将字符串按照分隔符分割,返回指定的子串。
- string split_part(string str, string separator, bigint start[, bigint end])
复制代码 例子:
split_part("浙江省-杭州市-余杭区", "-", 2) 返回"杭州市", start从1开始
split_part("浙江省-杭州市-余杭区", "-", 1, 2) 返回"浙江省-杭州市"
函数3. KEYVALUE
场景:将字符串按照key、value分隔符分割,返回指定key的value
- KEYVALUE(STRING srcStr,STRING split1,STRING split2, STRING key)
- KEYVALUE(STRING srcStr, STRING key) //默认split1 = ";",默认split2 = ":"
复制代码 例子:
keyvalue("sendFlag_pass:20,sendFlag_benefit:30", "," , ":", "sendFlag_pass") 返回20
函数4. STR_TO_MAP
场景:跟KEYVALUE类似,将字符串按照key、value分隔符分割,返回一个map
- str_to_map(text [, delimiter1 [, delimiter2]])
- delimiter1 默认为 ","
- delimiter2 默认为 "="
复制代码 例子:
str_to_map("sendFlag_pass:20,sendFlag_benefit:30", "," , ":") 返回map
str_to_map("sendFlag_pass:20,sendFlag_benefit:30", "," , ":")["sendFlag_pass"], 返回sendFlag_pass的值20
函数5. REGEXP_REPLACE
场景:将字符串中的某些字符根据正则表达式举行替换,替换成""则相称于删除。
- string regexp_replace(string source, string pattern, string replace_string[, bigint occurrence])
复制代码 例子:需要将msg_id中的方括号去掉,然后再一列转多行处理
regexp_replace(msg_id, "\\[|\\]", "") as msg_id
函数6. GET_JSON_OBJECT
场景:解析json格式字符串,返回指定key对应的value
- STRING GET_JSON_OBJECT(STRING json,STRING path)
复制代码 例子:
解析bizSuccess,则使用get_json_object(json, "$.bizSuccess"),返回true
解析totalCount,则使用get_json_object(json, "$.module.totalCount"),返回1,多层嵌套用.隔开
函数7. JSON_TUPLE
场景:get_json_object增强版,解析json格式字符串,返回指定多个key对应的value
- STRING JSON_TUPLE(STRING json,STRING key1,STRING key2,...)
复制代码 例子:
json_tuple(json, "module.object
.activityId") 返回"[1310, 1314]"数组,然后可以使用regexp_replace去掉方括号举行处理。
函数8. CONCAT
场景:将多个字符串合并成一个字符串,如果有参数为NULL,则返回NULL
- string concat(string a, string b...)
复制代码 函数9. TRIM/LTRIM/RTRIM
场景:字符串预处理,去除字符串两边的空格,LTRIM去除左边的空格,RTRIM去除右边的空格
函数10. TOUPPER/TOLOWER
场景:字符串预处理,转大写/小写
- string tolower(string source)
- string toupper(string source)
复制代码 这一章节主要讲Java UDF的开发流程,大概分为如许几个步调:
详细流程:
▐ 1. 安装MaxCompute Studio idea插件
在IDEA中,打开settings设置,找到Plugins
点开Mange Plugin Repositories,如图
点击➕号,添加 http://odps.alibaba.net:8080/studio/updatePlugins.xml
在Plugins marketplace里搜索MaxCompute Studio插件,安装并重启idea
▐ 2. 创建MaxCompute Java项目
按默认继承创建,定好自己的project名
创建好的project如图:
▐ 3. 创建并编写MaxCompute Java类
在project中,选择目次src->java右击添加新类,选择MaxCompute Java
选择UDF
在此,可看到已经建好的UDF类,对此中的evaluate方法举行自定义编写(定义入参出参),并验证方法的正确性(添加Main方法举行自测)
▐ 4. 将UDF函数发布到对应ODPS工作空间
首先,在idea上登陆个人账号
登陆乐成后,可见到自己的花名
接下来导入project建立链接:
首先打开 Project Explorer, View->Tool Windows->roject Explorer
一般来说,选择添加开发环境即可;添加生产环境可能导致后续步调没权限的问题。
接着,我们将自己的项目打包成jar包
右击我们写好的类,选择Delply to server
填写好函数名,再点击 ok 即可
打包完成会有success的提示
▐ 5. 上传jar包至MaxCompute资源
在自己项目ODPS空间中,MaxCompute -> 资源 -> JAR
可自定义资源名称,并点击实行文件,举行上传
这里我们选择项目中 targert->xxx.jar 文件
末了,点击左上角提交资源,再点击右上角发布资源,即可
▐ 6. 上传对应函数
在自己项目ODPS空间中,MaxCompute->函数->新建函数
取好函数名称,点击新建
先从资源列表中,选择我们刚发布的资源,再点击提交➕发布
至此,UDF上传已经乐成,我们就可以在自己的SQL中直接输入函数名称举行调用啦
▐ 7. 留意maven依赖问题
当我们UDF需要依赖其他jar包时,好比fastJSON,需要把其他依赖包一起打包进来,否则上传至ODPS时会报错:java.lang.NoClassDefFoundError
此时需要在项目的pom.xml文件中,参加一段代码,即可将所有依赖一起打包,jar-with-dependencies
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <appendAssemblyId>false</appendAssemblyId>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- <!-- 此处需改成自己定义的类名路径 -->
- <mainClass>com.alibaba.mkt.odps.complete.CheckCompleteInfo</mainClass>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
复制代码
拓展功能
▐ ODPS + SQL function
在跑SQL时,我们可以将一些重复繁琐的过程抽象成函数。明确好入参和出参,写好方法后可举行验证。
例1:
问题背景:
字符串类型:对于所有的信息都存放在一个json串中,需要根据不同的key举行解析
初始代码
- REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(json_data,'$.checkboxField_l1d6qn51'),'[\\"',''),'\\"]',''),'\\"','')
复制代码 改造成SQL函数
- CREATE SQL FUNCTION if not exists get_json_object_checkboxField(@a STRING,@b STRING )
- AS REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(@a,@b),'[\\"',''),'\\"]',''),'\\"','');
复制代码 改造后代码
- get_json_object_checkboxField(json_data,'$.checkboxField_l1d6qn51')
复制代码 例2:
问题背景:
时间类型:计算自然周或者自然月维度的指标
初始代码
- TO_CHAR(DATEADD(TO_DATE('${bizdate}','yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE('${bizdate}','yyyymmdd')) == 0,7,WEEKDAY(TO_DATE('${bizdate}','yyyymmdd'))), 'dd'),'yyyymmdd')
复制代码 改造成SQL函数
- CREATE SQL FUNCTION if not exists natural_week(@a STRING)
- AS TO_CHAR(DATEADD(TO_DATE(@a,'yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE(@a,'yyyymmdd')) == 0,7,WEEKDAY(TO_DATE(@a,'yyyymmdd'))), 'dd'),'yyyymmdd');
复制代码 改造后代码
- natural_week('${bizdate}')
复制代码 ▐ ODPS + UDF
通过对自定义的MAX_UDF函数的推出,仅通过申请一个UDF函数即可调用所有函数,操作轻巧,达到淘汰申请时间本钱及重复开发本钱的目的。
本部分与上文自定义UDF开发篇密切相干,下面举一个简朴的例子:
问题背景:
字符串加密。
入参:第一个参数是字符串加密的序列号;第二个参数是要加密的字符;第三个是加密的开始位数;第四个是要加密几位;第五个参数是加密的字符内容。
出参:针对字符串加密处理。
- select process_string('{"clazzNo":"011","methodNo":"01"}',"123411412341","2","7","*");
复制代码 注:这里process_string,是我们自己写的UDF方法。
问题处理
▐ 性能分析
据 logview 的子状态(SubStatusHistory)可以进一步细分为调理、优化、天生物理实行筹划、数据跨集群复制等子阶段。
阶段
| 特性
| 原因
| 解决方案
| 调理阶段
| 子状态为“Waiting for cluster resource”,作业列队等待被编译。
| 1.计算集群资源紧缺。
| 查察计算集群的状态,需要等待计算集群的资源。
| 2. 编译资源池资源不够
| 优化阶段
| 子状态为“SQLTask is optimizing query”,优化器正在优化实行筹划。
| 1.实行筹划复杂,需要等待较长时间做优化。
| 一般可担当10分钟以内,如果真的太长时间不退出,基本可认为是 odps 的 bug。
| 天生物理实行筹划阶段
| 子状态为“SQLTask is generating execution plan”。
| 1.读取的分区太多。每个分区需要去根据分区信息来决定处理方式,决定 split,而且会写到天生的实行筹划中。
| 需要好好设计 SQL,淘汰分区的数量,包罗:分区裁剪、筛除不需要读的分区、把大作业拆成小作业。
| 2.小文件太多(万级别),ODPS 会根据文件大小决定 split,小文件多了会导致计算 split 的过程耗时增加。
| 使用TunnelBufferedWriter接口,可以更简朴的举行上传功能,同时避免小文件。
实行一次 alter table merge smallfiles; 让 odps 把小文件 merge 起来,
| 数据跨集群复制阶段
| 子状态列表内里出现多次“Task rerun”,result 里有错误信息“FAILED: ODPS-0110141ata version exception”。
| 1.project 刚做集群迁移,每每前一两天有大量需要跨集群复制的作业。
| 这种情况是预期中的跨集群复制,需要用户等待。
| 2.可能是作业提交错集群,或者是中央 project 做过迁移,分区过滤没做好,读取了一些比力老的分区。
| 检查作业提交的集群是否正确, Logview2.0使命详情页左侧的 BasicInfo 查察作业提交的集群。
|
logview 的 detail 界面有实行筹划(实行筹划没有全都绿掉),且作业状态还是 Running。
实行阶段卡住或实行时间比预期长的主要原因有等待资源,数据倾斜,UDF 实行低效,数据膨胀等。
阶段
| 特性
| 解决方案
| 等待资源
| 一些instance处于Ready状态,部分instance处于Running状态。
| 确定列队状态是否正常。可以通过 logview 的列队信息“Queue”看作业在队列的位置。
| 数据倾斜
| task 中大多数 instance 都已经结束了,但有某几个 instance 却迟迟不结束(长尾)。
|
- 使用 MaxCompute Studio 的作业实行图及作业详情功能来分析作业运行情况,定位到长尾实例,找到导致长尾的数据泉源。
- 使用 Logveiw2.0 查察使命实行图和 instance 运行情况来定位长尾实例。
| UDF实行低效
| 某个 task 实行服从低,且该 task 中有效户自定义的扩展。
| 有时候 bug 是由于某些特定的数据值引起的,好比出现某个值的时候会引起死循环。
内置函数是有可能被同名 UDF 覆盖的,当看到一个函数像是内置函数时,需要确定是否有同名 UDF 覆盖了内置函数。
evaluate 中只做与参数相干的必要操作。
| 数据膨胀
| task 的输出数据量比输入数据量大许多。
|
- 检查代码是否有 bug:JOIN 条件是不是写错,变成笛卡尔积了;UDTF是不是有问题,输出太多数据。
- 检查 Aggregation 引起的数据膨胀。
- 避免join引起的数据膨胀。
- 由于grouping set 导致的数据膨胀。
| 在线业务压抑
| ODPS集群中的一部分是离线集群,另一部分是在线集群。
| 如果是弹内环境,可通过fuxi sensor确认是否存在在线业务压抑。
| UDF实行:
- set odps.sql.udf.jvm.memory=
- -- 设定UDF JVM Heap使用的最大内存,单位M,默认1024M
- -- 可手动调整区间[256,12288]
复制代码
有时 Fuxi 作业结束时,作业总体进度仍旧处于运行状态。原因有两种:
- 单 SQL 作业可能包含多个 Fuxi 作业
- Fuxi 作业结束后,SQL 在结束阶段运行于控制集群的逻辑占用时间较长
阶段 | 特性 | 解决方案 | 子查询多阶段实行
| MaxCompute SQL 的子查询会被编译进同一个 Fuxi DAG,即所有子查询和主查询都通过一个 Fuxi 作业完成。
但也有一些特别子查询需要先将子查询单独实行。
| 子查询 SELECT DISTINCT ds FROM t_ds_set 先实行,其结果需要被用来做分区裁剪,来优化主查询需要读取的分区数。
| 过多小文件
| 存储方面:小文件过多会给 Pangu 文件系统带来一定的压力,且影响空间的有效使用。
计算方面:ODPS 处理单个大文件比处理多个小文件更有服从,小文件过多会影响整体的计算实行性能。
| 为了避免系统产生过多小文件,SQL作业会在结束时自动触发合并小文件的操作。
根据参数odps.merge.smallfile.filesize.threshold来判定小文件,默认阈值为32MB。
可通过logview查察作业是否触发了自动合并小文件。
| 动态分区元数据更新
| Fuxi 作业实行完后,有可能另有一些元数据操作。
| 对分区表 sales 使用 insert into ... values命令新增 2000 个分区:
INSERT INTO TABLE sales partition (ds)(ds, product, price)
VALUES ('20170101','a',1),('20170102','b',2),('20170103','c',3), ...;
| 输出文件size变大
| 在输入输出条数相差不大的情况,结果膨胀几倍。
| 一般是数据分布变化导致的,在写表的过程中,会对数据举行压缩,而压缩算法对于重复数据的压缩率是最高的。
| 子查询:
- SELECT
- product,
- sum(price)
- FROM
- sales
- WHERE
- ds in (SELECT DISTINCT ds FROM t_ds_set)
- GROUP BY product;
复制代码 ▐ 性能优化
在优化运行时间这个维度上,我们重点关注时间上的加速,单元时间内可能会斲丧更多的计算资源。总本钱有可能上升,也可能低落。
优化类型
| 详细类型
| 优化步伐
| 调整并行度
| instance数量的增加会对实行速度产生影响:
- 更多的instance意味着更长的等待资源和列队次数。
- 每个instance的初始化需要一定时间,并行度越高,总初始化时间越长,有效实行时间占比越低。
| 需要逼迫 1 个 instance 实行
用户需要检查这些操作是否必要,能否去掉,尽量取消掉这些操作:读表的 task + 非读表的 task
| 影响单个task并行度主要因素:
- 某些操作逼迫必须 1 个 instance 来实行
- 读表的 task
- 非读表的 task
- HBO会在上面的底子上根据历史作业的实行情况做调整
| 对于读表的 task,一个 instance 读取 256M的数据,一些常见出问题的情况:
- 数据压缩比很高
- Task 中实行了一些很 heavy 的操作,特别是存在 UDF
- 读取 256M 数据太少,导致 instance 的实行时间太短
可以通过调整flag实现:
set odps.sql.mapper.split.size= xxx
| 非读表的 task,主要有三种方式调整并行度:
- 调整 odps.sql.mapper.split.size
- 通过 odps.sql.reducer.instances 逼迫设置 reducer 并行度
- 通过 odps.sql.joiner.instances 逼迫设置 joiner 并行度
| set odps.sql.reducer.instances= xxx
-- 设定Reduce task的instance数量
set odps.sql.joiner.instances= xxx
-- 设定Join task的instance数量
| HBO
| HBO (History-Based Optimization) 会根据对历史作业的分析来优化当前作业的。
包罗内存、并行度等一系列参数,它能让你的周期作业越跑越快。
| 为了尽可能解决HBO失效这个顽疾,我们在HBO中增加了若干新的功能,包罗:
- realtime hbo
- task-wise hbo
- new signature
| 优化实行筹划
| CBO优化器会基于统计信息、SQL语义、实行引擎本领、丰富的优化本领,自动天生最优的实行筹划,而且在一连提拔优化本领。
| Map Join Hint
用户可以手动添加map join hint,使得原本的Sort-Merge Join变成Map Join,避免大表数据shuffle从而提拔性能。
| Distributed Map Join Hint
Distributed MapJoin是MapJoin的升级版,实用于实用于大表Join中表的场景 的场景,二者的焦点目的都是为了淘汰大表侧的Shuffle和排序。
| Dynamic Filter Hint
基于JOIN等值连接的特性,MaxCompute可以通过表A的数据天生一个过滤器,在Shuffle或JOIN之前提前过滤表B的数据。
| 物化视图
物化视图(Materialized View)本质是一种预计算,即把某些耗时的操作(如JOIN/AGGREGATE)的结果保存下来。
以便在查询时直接复用,从而避免这些耗时的操作,终极达到加速查询的目的。
| 数据倾斜
| 数据Shuffle导致的数据倾斜 1
| 数据倾斜大多数是由于数据的 reshuffle 引起的,由于按照某个 key 来做 shuffle,同一个 key 值的数据会逼迫集中在一个 instance 处理。
- 去掉 shuffle
- 换别的 shuffle key
- 将热点数据特别处理
| 数据Shuffle导致的数据倾斜 2
| 特性:读表并写动态分区作业,M task 读入大量数据,但是只会写出少量的 动态分区。
解决方法:set odps.sql.reshuffle.dynamicpt =false; 去掉reshuffle 过程。
|
优化类型
| 详细类型
| 优化步伐
| SQL的新语法、新功能
| GROUPING SETS:对 SELECT 语句中 GROUP BY 子句的扩展。
| SQL 运行*时物理实行筹划做了 3 次聚合,然后再 UNION 起来。
| 脚本模式:
脚本模式能让用户以脚本的形式提交多条语句同时实行。
| 脚本模式的性能上风,现实上是“将分散的业务逻辑合并成一个作业来运行“的性能上风:
- 合并重复的公共操作。
- 避免中央数据写表,淘汰暂时表。
- 更好的发挥 optimizer 的作用。
- 淘汰了作业调理的开销。
| MR典型场景用SQL实现
| select k, WM_CONCAT(';',concat(v,":",c)) from
( select k, v, count(v) c from t group by k,v) t2 group by k;
rows between x preceding|following and y preceding|following
- 使用 SQL-UDJ (User Defined Join)
MapReduce 实现的 JOIN 逻辑。
实用场景:MapReduce Streaming 作业。
| 合理设置资源参数
| Map设置
| set odps.sql.mapper.cpu=100
作用:设置处理Map Task每个Instance的CPU数目
set odps.sql.mapper.memory=1024
作用:设定Map Task每个Instance的Memory大小
set odps.sql.mapper.merge.limit.size=64
作用:设定控制文件被合并的最大阈值
set odps.sql.mapper.split.size=256
作用:设定一个Map的最大数据输入量
| Join设置
| set odps.sql.joiner.instances=-1
作用: 设定Join Task的Instance数量
set odps.sql.joiner.cpu=100
作用: 设定Join Task每个Instance的CPU数目
set odps.sql.joiner.memory=1024
作用:设定Join Task每个Instance的Memory大小
| Reduce设置
| set odps.sql.reducer.instances=-1
作用: 设定Reduce Task的Instance数量
set odps.sql.reducer.cpu=100
作用:设定处理Reduce Task每个Instance的Cpu数目
set odps.sql.reducer.memory=1024
作用:设定Reduce Task每个Instance的Memory大小
| GROUPING SETS 优化步伐*:
- SELECT NULL, NULL, NULL, COUNT(*)
- FROM requests
- UNION ALL
- SELECT os, device, NULL, COUNT(*)
- FROM requests GROUP BY os, device
- UNION ALL
- SELECT NULL, NULL, city, COUNT(*)
- FROM requests GROUP BY city;
复制代码 上述 SQL 运行时物理实行筹划做了 3 次聚合,然后再 UNION 起来。
- SELECT os, device, city, COUNT(*)
- FROM requests
- GROUP BY os, device, city GROUPING SETS((os, device), (city), ());
复制代码 物理实行筹划只包含一个 Reduce 阶段,无需举行 UNION 操作,使用更少代码的同时斲丧更少的集群资源。
▐ 恢复已删
总结
颠末一个多月的整理和总结,终于完成了《ODPS开发大全》的这个底子版本。在这过程中我不停地接触到新的知识点,学到之前未曾掌握的技能,也感叹ODPS功能之丰富强大。渴望在将来工作中,自己可以多沉淀好的技能文档,这不但让我更加深刻地温习过往学习的技能,也可以把知识共享给更多求知若渴的技能人,建设更开放的CS技能社区。
¤ 拓展阅读 ¤
3DXR技能 | 终端技能 | 音视频技能
服务端技能 | 技能质量 | 数据算法
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |