ODPS开发大全:进阶篇

打印 上一主题 下一主题

主题 530|帖子 530|积分 1590



  本文旨在网络整理ODPS开发中入门及进阶层知识,尽可能涵盖大多数ODPS开发问题,成为一本mini百科全书,后续也会一连更新。希望通过笔者的梳理和明白,资助刚接触ODPS开发的同学快速上手。

  本系列分为两部分:入门篇进阶篇
     ODPS开发大全:入门篇
   

  
常用参数设置

  常用的调整无外乎调整map、join、reduce的个数,map、join、reduce的内存大小。
  以ODPS的参数设置为例,参数可能因版本不同而略有差异。
  
参数类型

详细使用

      

  • Map设置
set odps.sql.mapper.cpu=100
作用:设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。场景:某些使命如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql使命来说,一般不需要调整Cpu个数的。
set odps.sql.mapper.memory=1024
作用:设定Map Task每个Instance的Memory大小,单元M,默认1024M,在[256,12288]之间调整。场景:当Map阶段的Instance有Writer Dumps时,可以适当的增加内存大小,淘汰Dumps所花的时间。
set odps.sql.mapper.merge.limit.size=64
作用:设定控制文件被合并的最大阈值,单元M,默认64M,在[0,Integer.MAX_VALUE]之间调整。场景:当Map端每个Instance读入的数据量不匀称时,可以通过设置这个变量值举行小文件的合并,使得每个Instance的读入文件匀称。一般会和odps.sql.mapper.split.size这个参数团结使用。
set odps.sql.mapper.split.size=256
作用:设定一个Map的最大数据输入量,可以通过设置这个变量达到对Map端输入的控制,单元M,默认256M,在[1,Integer.MAX_VALUE]之间调整。场景:当每个Map Instance处理的数据量比力大,时间比力长,而且没有发生长尾时,可以适当调小这个参数。如果有发生长尾,则团结odps.sql.mapper.merge.limit.size这个参数设置每个Map的输入数量。
2. Join设置
set odps.sql.joiner.instances=-1
作用: 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。不走HBO优化时,ODPS可以或许自动设定的最大值为1111,手动设定的最大值为2000,走HBO时可以凌驾2000。场景:每个Join Instance处理的数据量比力大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set odps.sql.joiner.cpu=100
作用: 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。场景:某些使命如果特别耗计算资源的话,可以适当调整CPU数目。对于大多数SQL使命来说,一般不需要调整CPU。
set odps.sql.joiner.memory=1024
作用:设定Join Task每个Instance的Memory大小,单元为M,默认为1024M,在[256,12288]之间调整。场景:当Join阶段的Instance有Writer Dumps时,可以适当的增加内存大小,淘汰Dumps所花的时间。
3. Reduce设置
set odps.sql.reducer.instances=-1
作用: 设定Reduce Task的Instance数量,手动设置区间在[1,99999]之间调整。不走HBO优化时,ODPS可以或许自动设定的最大值为1111,手动设定的最大值为99999,走HBO优化时可以凌驾99999。场景:每个Join Instance处理的数据量比力大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set odps.sql.reducer.cpu=100
作用:设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整。场景:某些使命如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql使命来说,一般不需要调整Cpu。
set odps.sql.reducer.memory=1024
作用:设定Reduce Task每个Instance的Memory大小,单元M,默认1024M,在[256,12288]之间调整。场景:当Reduce阶段的Instance有Writer Dumps时,可以适当的增加内存的大小,淘汰Dumps所花的时间。
上面这些参数虽然好用,但是也过于简朴暴力,可能会对集群产生一定的压力。特别是在集群整体资源紧张的情况下,增加资源的方法可能得不到应有的结果,随着资源的增大,等待资源的时间变长的风险也随之增加,导致结果不好!因此请合理的使用资源参数!
4. 小文件合并参数
set odps.merge.cross.paths=true|false
作用:设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区天生独立的Merge Action举行合并,所以对于odps.merge.cross.paths设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件。
set odps.merge.smallfile.filesize.threshold = 64
作用:设置合并文件的小文件大小阀值,文件大小凌驾该阀值,则不举行合并,单元为M,可以不设,不设时,则使用全局变量odps_g_merge_filesize_threshold,该值默认为32M,设置时必须大于32M。
set odps.merge.maxmerged.filesize.threshold = 256
作用:设置合并输出文件量的大小,输出文件大于该阀值,则创建新的输出文件,单元为M,可以不设,不设时,则使用全局变odps_g_max_merged_filesize_threshold,该值默认为256M,设置时必须大于256M。
set odps.merge.max.filenumber.per.instance = 10000
作用:设置合并Fuxi Job的单个Instance允许合并的小文件个数,控制合并并行的Fuxi Instance数,可以不设,不设时,则使用全局变量odps_g_merge_files_per_instance,该值默认为100,在一个Merge使掷中,需要的Fuxi Instance个数至少为该目次下面的总文件个数除以该限制。
set odps.merge.max.filenumber.per.job = 10000
作用:设置合并最大的小文件个数,小文件数量凌驾该限制,则凌驾限制部分的文件忽略,不举行合并,可以不设,不设时,则使用全局变量odps_g_max_merge_files,该值默认为10000。
5. UDF相干参数
set odps.sql.udf.jvm.memory=1024
作用: 设定UDF JVM Heap使用的最大内存,单元M,默认1024M,在[256,12288]之间调整。场景:某些UDF在内存计算、排序的数据量比力大时,会报内存溢出错误,这时候可以调大该参数,不外这个方法只能暂时缓解,还是需要从业务上去优化。
set odps.sql.udf.timeout=1800
作用:设置UDF超时时间,默认为1800秒,单元秒。[0,3600]之间调整。
set odps.sql.udf.python.memory=256
作用:设定UDF python 使用的最大内存,单元M,默认256M。[64,3072]之间调整。
set odps.sql.udf.optimize.reuse=true/false
作用:开启后,相同的UDF函数表达式,只计算一次,可以提高性能,默认为True。
set odps.sql.udf.strict.mode=false/true
作用:True为金融模式,False为淘宝模式,控制有些函数在遇到脏数据时是返回NULL还是抛非常,True是抛出非常,False是返回null。
6. Mapjoin设置
set odps.sql.mapjoin.memory.max=512
作用:设置Mapjoin时小表的最大内存,默认512,单元M,[128,2048]之间调整。
7. 动态分区设置
set odps.sql.reshuffle.dynamicpt=true/false
作用:默认true,用于避免拆分动态分区时产生过多小文件。如果天生的动态分区个数只会是很少几个,设为false避免数据倾斜。
8. 数据倾斜设置
set odps.sql.groupby.skewindata=true/false
作用:开启Group By优化。
set odps.sql.skewjoin=true/false
作用:开启Join优化,必须设置odps.sql.skewinfo 才有效。
  1. 常用内建函数
复制代码
常用内建函数大概分为这几类,这边我们挑选一些重点的函数举行阐明。
  
函数类型
阐明
日期函数
支持处理DATE、DATETIME、TIMESTAMP等日期类型数据,实现加减日期、计算日期差值、提取日期字段、获取当前时间、转换日期格式等业务处理本领。
数学函数
支持处理BIGINT、DOUBLE、DECIMAL、FLOAT等数值类型数据,实现转换进制、数学运算、四舍五入、获取随机数等业务处理本领。
窗口函数
支持在指定的开窗列中,实现求和、求最大最小值、求匀称值、求中央值、数值排序、数值偏移、抽样等业务处理本领。
聚合函数
支持将多条输入纪录聚合成一条输出值,实现求和、求匀称值、求最大最小值、求匀称值、参数聚合、字符勾通接等业务处理本领。
字符串函数
支持处理STRING类型字符串,实现截取字符串、替换字符串、查找字符串、转换大小写、转换字符串格式等业务处理本领。
复杂类型函数
支持处理MAP、ARRAY、STRUCT及JSON类型数据,实现去重元素、聚合元素、元素排序、合并元素等业务处理本领。
加密函数
支持处理STRING、BINARY类型的表数据,实现加密、解密等业务处理本领。
其他函数
除上述函数之外,提供支持其他业务场景的函数。
    日期函数

  
函数名

详细操作

      

  • 获取当前日期7天内的日期:
SELECTDATEADD(GETDATE(),-7,'dd');
TO_CHAR(DATEADD(GETDATE(),-7,'dd'),'yyyymmdd');
2. DATEADD(指定日期加减):
SELECT DATEADD(GETDATE(),1,"dd"); //2021-01-09 10:48:40 GETDATE()返回值是2021-01-08 10:48:40
SELECT DATEADD(GETDATE(),1,"mm");//2021-02-08 10:49:24 GETDATE()返回值是2021-01-08 10:49:24
3. DATEDIFF(计算两个日期的差值):
datediff(end, start, 'dd') = 1
datediff(end, start, 'mm') = 1
datediff(end, start, 'yyyy') = 1
datediff(end, start, 'hh') = 1
4. DATEPART(返回指定日期的年/月/日):
SELECT DATEPART(GETDATE(),'mm');
SELECT DATEPART(GETDATE(),'yyyy');
SELECT DATEPART(GETDATE(),'dd');
SELECT DATEPART(GETDATE(),'hh');
5. DATETRUNC(截取时间):
datetrunc(datetime '2011-12-07 16:28:46', 'yyyy') = 2011-01-01 00:00:00
datetrunc(datetime '2011-12-07 16:28:46', 'month') = 2011-12-01 00:00:00
datetrunc(datetime '2011-12-07 16:28:46', 'DD') = 2011-12-07 00:00:00
6. TO_CHAR 函数
使用方式 to_char(要处理的日期,日期格式)
推荐用法:2018-01-11 10:00:00 格式 ,处理为指定格式的日期字符串
结果:处理为yyyymmdd的日期格式,类型为字符串
to_char('2018-01-11 10:00:00','yyyymmdd') as date_3 
to_char('2018-01-11 10:00:00','yyyymmdd hh:mi:ss') as date_5 
to_char('2018-01-11 10:00:00','yyyy-mm-dd hh:mi:ss') as date_6
to_char('2018-01-11 10:00:00','yyyy-mm-dd 00:00:00') as date_8 
to_char('2018-01-11 10:00:00','yyyy-mm-01 23:59:59') as date_9 
7. TO_DATE函数
使用方式:to_date(datetime,format)
推荐用法:根据时间的格式,适当调整format的模版
结果:处理20180111、2018-01-11、2018-01-11 10:00:00
to_char('2018-01-11 10:00:00','yyyymmdd') as date_3
to_char('2018-01-11 10:00:00','yyyymmdd hh:mi:ss') as date_5
to_char('2018-01-11 10:00:00','yyyy-mm-dd hh:mi:ss') as date_6
to_char('2018-01-11 10:00:00','yyyy-mm-dd 00:00:00') as date_8
8. UNIX时间戳转换
函数:datetime from_unixtime(bigint unixtime) 支持秒
from_unixtime(123456789) = 1973-11-30 05:33:09
函数: from_utc_timestamp(bigint unixtime,string timezone) 支持毫秒
SELECT from_utc_timestamp(1501557840000 ,'GMT') ;
--返回:2017-08-01 04:24:00
9. DATE转UNIX时间戳
函数:bigint unix_timestamp(datetime date)
select unix_timestamp(datetime '2019-09-20 01:00:00'); 
--返回1568912400
select unix_timestamp('2019-09-20 01:00:00'); 
--返回1568912400
    字符串函数

  函数1. SPLIT
  场景:将字符串列按照分隔符(支持正则表达式)分割后返回数组。
  1. split(str, pat)
复制代码
例子:
  split("浙江省-杭州市-余杭区", "-") 返回["浙江省", "杭州市", "余杭区"]
  如果需要解析出杭州市,加上index即可,split("浙江省-杭州市-余杭区", "-")[1],下标从0开始计数。
  函数2. SPLIT_PART
  场景:将字符串按照分隔符分割,返回指定的子串。
  1. string split_part(string str, string separator, bigint start[, bigint end])
复制代码
例子:
  split_part("浙江省-杭州市-余杭区", "-", 2) 返回"杭州市", start从1开始
  split_part("浙江省-杭州市-余杭区", "-", 1, 2) 返回"浙江省-杭州市"
  函数3. KEYVALUE
  场景:将字符串按照key、value分隔符分割,返回指定key的value
  1. KEYVALUE(STRING srcStr,STRING split1,STRING split2, STRING key)
  2. KEYVALUE(STRING srcStr, STRING key) //默认split1 = ";",默认split2 = ":"
复制代码
例子:
  keyvalue("sendFlag_pass:20,sendFlag_benefit:30", "," , ":", "sendFlag_pass") 返回20
  函数4. STR_TO_MAP
  场景:跟KEYVALUE类似,将字符串按照key、value分隔符分割,返回一个map
  1. str_to_map(text [, delimiter1 [, delimiter2]])
  2. delimiter1 默认为 ","
  3. delimiter2 默认为 "="
复制代码
例子:
  str_to_map("sendFlag_pass:20,sendFlag_benefit:30", "," , ":") 返回map
  str_to_map("sendFlag_pass:20,sendFlag_benefit:30", "," , ":")["sendFlag_pass"], 返回sendFlag_pass的值20
  函数5. REGEXP_REPLACE
  场景:将字符串中的某些字符根据正则表达式举行替换,替换成""则相称于删除。
  1. string regexp_replace(string source, string pattern, string replace_string[, bigint occurrence])
复制代码
例子:需要将msg_id中的方括号去掉,然后再一列转多行处理
  regexp_replace(msg_id, "\\[|\\]", "") as msg_id
  函数6. GET_JSON_OBJECT
  场景:解析json格式字符串,返回指定key对应的value
  1. STRING GET_JSON_OBJECT(STRING json,STRING path)
复制代码
例子:
  解析bizSuccess,则使用get_json_object(json, "$.bizSuccess"),返回true
  解析totalCount,则使用get_json_object(json, "$.module.totalCount"),返回1,多层嵌套用.隔开
  函数7. JSON_TUPLE
  场景:get_json_object增强版,解析json格式字符串,返回指定多个key对应的value
  1. STRING JSON_TUPLE(STRING json,STRING key1,STRING key2,...)
复制代码
例子:
  json_tuple(json, "module.object
  • .activityId") 返回"[1310, 1314]"数组,然后可以使用regexp_replace去掉方括号举行处理。
      函数8. CONCAT
      场景:将多个字符串合并成一个字符串,如果有参数为NULL,则返回NULL
    1. string concat(string a, string b...)
    复制代码
    函数9. TRIM/LTRIM/RTRIM
      场景:字符串预处理,去除字符串两边的空格,LTRIM去除左边的空格,RTRIM去除右边的空格
    1. string trim(string str)
    复制代码
    函数10. TOUPPER/TOLOWER
      场景:字符串预处理,转大写/小写
    1. string tolower(string source)
    2. string toupper(string source)
    复制代码
    1. 自定义UDF开发
    复制代码
    这一章节主要讲Java UDF的开发流程,大概分为如许几个步调:
      

      详细流程

        1. 安装MaxCompute Studio idea插件

      在IDEA中,打开settings设置,找到Plugins
      

      点开Mange Plugin Repositories,如图
      

      点击➕号,添加 http://odps.alibaba.net:8080/studio/updatePlugins.xml
      

      在Plugins marketplace里搜索MaxCompute Studio插件,安装并重启idea
      

        2. 创建MaxCompute Java项目

      

      按默认继承创建,定好自己的project名
      

      创建好的project如图:
      

        3. 创建并编写MaxCompute Java类

      

      在project中,选择目次src->java右击添加新类,选择MaxCompute Java
      

      选择UDF
      

      在此,可看到已经建好的UDF类,对此中的evaluate方法举行自定义编写(定义入参出参),并验证方法的正确性(添加Main方法举行自测)
      

        4. 将UDF函数发布到对应ODPS工作空间

      

      首先,在idea上登陆个人账号
      

      登陆乐成后,可见到自己的花名
      

      接下来导入project建立链接:
      首先打开 Project Explorer, View->Tool Windows->roject Explorer
      

      

      一般来说,选择添加开发环境即可;添加生产环境可能导致后续步调没权限的问题。
      

      接着,我们将自己的项目打包成jar包
      右击我们写好的类,选择Delply to server
      

      填写好函数名,再点击 ok 即可
      

      打包完成会有success的提示
      

        5. 上传jar包至MaxCompute资源

      

      在自己项目ODPS空间中,MaxCompute -> 资源 -> JAR
      

      可自定义资源名称,并点击实行文件,举行上传
      

      这里我们选择项目中 targert->xxx.jar 文件
      

      末了,点击左上角提交资源,再点击右上角发布资源,即可
      

        6. 上传对应函数

      在自己项目ODPS空间中,MaxCompute->函数->新建函数
      

      取好函数名称,点击新建
      

      先从资源列表中,选择我们刚发布的资源,再点击提交➕发布
      

      至此,UDF上传已经乐成,我们就可以在自己的SQL中直接输入函数名称举行调用啦
      

        7. 留意maven依赖问题

      当我们UDF需要依赖其他jar包时,好比fastJSON,需要把其他依赖包一起打包进来,否则上传至ODPS时会报错:java.lang.NoClassDefFoundError
      此时需要在项目的pom.xml文件中,参加一段代码,即可将所有依赖一起打包,jar-with-dependencies
    1. <build>
    2.   <plugins>
    3.     <plugin>
    4.       <artifactId>maven-assembly-plugin</artifactId>
    5.       <configuration>
    6.         <appendAssemblyId>false</appendAssemblyId>
    7.         <descriptorRefs>
    8.           <descriptorRef>jar-with-dependencies</descriptorRef>
    9.         </descriptorRefs>
    10.         <archive>
    11.           <manifest>
    12.             <!-- 此处需改成自己定义的类名路径 -->
    13.             <mainClass>com.alibaba.mkt.odps.complete.CheckCompleteInfo</mainClass>
    14.           </manifest>
    15.         </archive>
    16.       </configuration>
    17.       <executions>
    18.         <execution>
    19.           <id>make-assembly</id>
    20.           <phase>package</phase>
    21.           <goals>
    22.             <goal>single</goal>
    23.           </goals>
    24.         </execution>
    25.       </executions>
    26.     </plugin>
    27.   </plugins>
    28. </build>
    复制代码

      

      
    拓展功能

        ODPS + SQL function 

      在跑SQL时,我们可以将一些重复繁琐的过程抽象成函数。明确好入参和出参,写好方法后可举行验证。
      例1:
      问题背景:
      字符串类型:对于所有的信息都存放在一个json串中,需要根据不同的key举行解析
      初始代码
    1. REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(json_data,'$.checkboxField_l1d6qn51'),'[\\"',''),'\\"]',''),'\\"','')
    复制代码
    改造成SQL函数
    1. CREATE SQL FUNCTION if not exists get_json_object_checkboxField(@a STRING,@b STRING )
    2. AS REPLACE(REPLACE(REPLACE(GET_JSON_OBJECT(@a,@b),'[\\"',''),'\\"]',''),'\\"','');
    复制代码
    改造后代码
    1. get_json_object_checkboxField(json_data,'$.checkboxField_l1d6qn51')
    复制代码
    例2:
      问题背景:
      时间类型:计算自然周或者自然月维度的指标
      初始代码
    1. TO_CHAR(DATEADD(TO_DATE('${bizdate}','yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE('${bizdate}','yyyymmdd')) == 0,7,WEEKDAY(TO_DATE('${bizdate}','yyyymmdd'))), 'dd'),'yyyymmdd')
    复制代码
    改造成SQL函数
    1. CREATE SQL FUNCTION if not exists natural_week(@a STRING)
    2. AS TO_CHAR(DATEADD(TO_DATE(@a,'yyyymmdd'), - 1 * IF(WEEKDAY(TO_DATE(@a,'yyyymmdd')) == 0,7,WEEKDAY(TO_DATE(@a,'yyyymmdd'))), 'dd'),'yyyymmdd');
    复制代码
    改造后代码

    1. natural_week('${bizdate}')
    复制代码
      ODPS + UDF

      通过对自定义的MAX_UDF函数的推出,仅通过申请一个UDF函数即可调用所有函数,操作轻巧,达到淘汰申请时间本钱及重复开发本钱的目的。
      本部分与上文自定义UDF开发篇密切相干,下面举一个简朴的例子:
      问题背景:
      字符串加密。
      入参:第一个参数是字符串加密的序列号;第二个参数是要加密的字符;第三个是加密的开始位数;第四个是要加密几位;第五个参数是加密的字符内容。
      出参:针对字符串加密处理。
    1. select process_string('{"clazzNo":"011","methodNo":"01"}',"123411412341","2","7","*");
    复制代码
    注:这里process_string,是我们自己写的UDF方法。
      

      
    问题处理

        性能分析

      

      

    • 编译阶段
      据 logview 的子状态(SubStatusHistory)可以进一步细分为调理、优化、天生物理实行筹划、数据跨集群复制等子阶段。
      
    阶段

    特性

    原因

    解决方案

    调理阶段
    子状态为“Waiting for cluster resource”,作业列队等待被编译。
    1.计算集群资源紧缺。
    查察计算集群的状态,需要等待计算集群的资源。
    2. 编译资源池资源不够
    优化阶段
    子状态为“SQLTask is optimizing query”,优化器正在优化实行筹划。
    1.实行筹划复杂,需要等待较长时间做优化。
    一般可担当10分钟以内,如果真的太长时间不退出,基本可认为是 odps 的 bug。
    天生物理实行筹划阶段
    子状态为“SQLTask is generating execution plan”。
    1.读取的分区太多。每个分区需要去根据分区信息来决定处理方式,决定 split,而且会写到天生的实行筹划中。
    需要好好设计 SQL,淘汰分区的数量,包罗:分区裁剪、筛除不需要读的分区、把大作业拆成小作业。
    2.小文件太多(万级别),ODPS 会根据文件大小决定 split,小文件多了会导致计算 split 的过程耗时增加。
    使用TunnelBufferedWriter接口,可以更简朴的举行上传功能,同时避免小文件。
    实行一次 alter table merge smallfiles; 让 odps 把小文件 merge 起来,
    数据跨集群复制阶段
    子状态列表内里出现多次“Task rerun”,result 里有错误信息“FAILED: ODPS-0110141ata version exception”。
    1.project 刚做集群迁移,每每前一两天有大量需要跨集群复制的作业。
    这种情况是预期中的跨集群复制,需要用户等待。
    2.可能是作业提交错集群,或者是中央 project 做过迁移,分区过滤没做好,读取了一些比力老的分区。
    检查作业提交的集群是否正确, Logview2.0使命详情页左侧的 BasicInfo 查察作业提交的集群。
      

    • 实行阶段
      logview 的 detail 界面有实行筹划(实行筹划没有全都绿掉),且作业状态还是 Running。

      实行阶段卡住或实行时间比预期长的主要原因有等待资源,数据倾斜,UDF 实行低效,数据膨胀等。
      
    阶段

    特性

    解决方案

    等待资源
    一些instance处于Ready状态,部分instance处于Running状态。
    确定列队状态是否正常。可以通过 logview 的列队信息“Queue”看作业在队列的位置。
    数据倾斜
    task 中大多数 instance 都已经结束了,但有某几个 instance 却迟迟不结束(长尾)。
          

    • 使用 MaxCompute Studio 的作业实行图及作业详情功能来分析作业运行情况,定位到长尾实例,找到导致长尾的数据泉源。
    • 使用 Logveiw2.0 查察使命实行图和 instance 运行情况来定位长尾实例。
    UDF实行低效
    某个 task 实行服从低,且该 task 中有效户自定义的扩展。
          

    • 检查 UDF 是否有 bug。
    有时候 bug 是由于某些特定的数据值引起的,好比出现某个值的时候会引起死循环。
          

    • 检查 UDF 函数是否与内置函数同名。
    内置函数是有可能被同名 UDF 覆盖的,当看到一个函数像是内置函数时,需要确定是否有同名 UDF 覆盖了内置函数。
          

    • 使用内置函数代替 UDF。
    evaluate 中只做与参数相干的必要操作。
    数据膨胀
    task 的输出数据量比输入数据量大许多。
          

    • 检查代码是否有 bug:JOIN 条件是不是写错,变成笛卡尔积了;UDTF是不是有问题,输出太多数据。
    • 检查 Aggregation 引起的数据膨胀。
    • 避免join引起的数据膨胀。
    • 由于grouping set 导致的数据膨胀。
    在线业务压抑
    ODPS集群中的一部分是离线集群,另一部分是在线集群。
    如果是弹内环境,可通过fuxi sensor确认是否存在在线业务压抑。
      UDF实行:
    1. set odps.sql.udf.jvm.memory=
    2. -- 设定UDF JVM Heap使用的最大内存,单位M,默认1024M
    3. -- 可手动调整区间[256,12288]
    复制代码


    • 结束阶段
      有时 Fuxi 作业结束时,作业总体进度仍旧处于运行状态。原因有两种:
      

    • 单 SQL 作业可能包含多个 Fuxi 作业
    • Fuxi 作业结束后,SQL 在结束阶段运行于控制集群的逻辑占用时间较长
      
    阶段特性解决方案
    子查询多阶段实行
    MaxCompute SQL 的子查询会被编译进同一个 Fuxi DAG,即所有子查询和主查询都通过一个 Fuxi 作业完成。
    但也有一些特别子查询需要先将子查询单独实行
    子查询 SELECT DISTINCT ds FROM t_ds_set 先实行,其结果需要被用来做分区裁剪,来优化主查询需要读取的分区数。
    过多小文件
    存储方面:小文件过多会给 Pangu 文件系统带来一定的压力,且影响空间的有效使用。
    计算方面:ODPS 处理单个大文件比处理多个小文件更有服从,小文件过多会影响整体的计算实行性能。
    为了避免系统产生过多小文件,SQL作业会在结束时自动触发合并小文件的操作。
    根据参数odps.merge.smallfile.filesize.threshold来判定小文件,默认阈值为32MB。
    可通过logview查察作业是否触发了自动合并小文件。
    动态分区元数据更新
    Fuxi 作业实行完后,有可能另有一些元数据操作。
    对分区表 sales 使用 insert into ... values命令新增 2000 个分区:
    INSERT INTO TABLE sales partition (ds)(ds, product, price)
    VALUES ('20170101','a',1),('20170102','b',2),('20170103','c',3), ...;
    输出文件size变大
    在输入输出条数相差不大的情况,结果膨胀几倍。
    一般是数据分布变化导致的,在写表的过程中,会对数据举行压缩,而压缩算法对于重复数据的压缩率是最高的。
      子查询:
    1. SELECT
    2.      product,
    3.     sum(price)
    4. FROM
    5.      sales
    6. WHERE
    7.      ds in (SELECT DISTINCT ds FROM t_ds_set)
    8. GROUP BY product;
    复制代码
      性能优化

      

      

    • 优化运行时间
      在优化运行时间这个维度上,我们重点关注时间上的加速,单元时间内可能会斲丧更多的计算资源。总本钱有可能上升,也可能低落。
      
    优化类型

    详细类型

    优化步伐

    调整并行度
    instance数量的增加会对实行速度产生影响:
          

    • 更多的instance意味着更长的等待资源和列队次数。
    • 每个instance的初始化需要一定时间,并行度越高,总初始化时间越长,有效实行时间占比越低。
    需要逼迫 1 个 instance 实行
    用户需要检查这些操作是否必要,能否去掉,尽量取消掉这些操作:读表的 task + 非读表的 task
    影响单个task并行度主要因素:
          

    • 某些操作逼迫必须 1 个 instance 来实行
    • 读表的 task
    • 非读表的 task
    • HBO会在上面的底子上根据历史作业的实行情况做调整
    对于读表的 task,一个 instance 读取 256M的数据,一些常见出问题的情况:
          

    • 数据压缩比很高
    • Task 中实行了一些很 heavy 的操作,特别是存在 UDF
    • 读取 256M 数据太少,导致 instance 的实行时间太短
    可以通过调整flag实现:
    set odps.sql.mapper.split.size= xxx
    非读表的 task,主要有三种方式调整并行度:
          

    • 调整 odps.sql.mapper.split.size
    • 通过 odps.sql.reducer.instances 逼迫设置 reducer 并行度
    • 通过 odps.sql.joiner.instances 逼迫设置 joiner 并行度
    set odps.sql.reducer.instances= xxx
    -- 设定Reduce task的instance数量
    set odps.sql.joiner.instances= xxx
    -- 设定Join task的instance数量
    HBO
    HBO (History-Based Optimization) 会根据对历史作业的分析来优化当前作业的。
    包罗内存、并行度等一系列参数,它能让你的周期作业越跑越快。
    为了尽可能解决HBO失效这个顽疾,我们在HBO中增加了若干新的功能,包罗:
          

    • realtime hbo
    • task-wise hbo
    • new signature
    优化实行筹划
    CBO优化器会基于统计信息、SQL语义、实行引擎本领、丰富的优化本领,自动天生最优的实行筹划,而且在一连提拔优化本领。
    Map Join Hint
    用户可以手动添加map join hint,使得原本的Sort-Merge Join变成Map Join,避免大表数据shuffle从而提拔性能。
    Distributed Map Join Hint
    Distributed MapJoin是MapJoin的升级版,实用于实用于大表Join中表的场景 的场景,二者的焦点目的都是为了淘汰大表侧的Shuffle和排序。
    Dynamic Filter Hint
    基于JOIN等值连接的特性,MaxCompute可以通过表A的数据天生一个过滤器,在Shuffle或JOIN之前提前过滤表B的数据。
    物化视图
    物化视图(Materialized View)本质是一种预计算,即把某些耗时的操作(如JOIN/AGGREGATE)的结果保存下来。
    以便在查询时直接复用,从而避免这些耗时的操作,终极达到加速查询的目的。
    数据倾斜
    数据Shuffle导致的数据倾斜 1
    数据倾斜大多数是由于数据的 reshuffle 引起的,由于按照某个 key 来做 shuffle,同一个 key 值的数据会逼迫集中在一个 instance 处理。
          

    • 去掉 shuffle
    • 换别的 shuffle key
    • 将热点数据特别处理
    数据Shuffle导致的数据倾斜 2
    特性:读表并写动态分区作业,M task 读入大量数据,但是只会写出少量的 动态分区。
    解决方法:set odps.sql.reshuffle.dynamicpt =false; 去掉reshuffle 过程。
      

    • 优化资源斲丧
      
    优化类型

    详细类型

    优化步伐

    SQL的新语法、新功能
    GROUPING SETS:对 SELECT 语句中 GROUP BY 子句的扩展。
    SQL 运行*时物理实行筹划做了 3 次聚合,然后再 UNION 起来。

    脚本模式:
    脚本模式能让用户以脚本的形式提交多条语句同时实行
    脚本模式的性能上风,现实上是“将分散的业务逻辑合并成一个作业来运行“的性能上风:
          

    • 合并重复的公共操作。
    • 避免中央数据写表,淘汰暂时表。
    • 更好的发挥 optimizer 的作用。
    • 淘汰了作业调理的开销。
    MR典型场景用SQL实现
          

    • 使用 SQL-聚合函数
    select k, WM_CONCAT(';',concat(v,":",c)) from 
    ( select k, v, count(v) c from t group by k,v) t2 group by k;
          

    • 用 SQL-窗口/分析函数
    rows between x preceding|following and y preceding|following
          

    • 使用 SQL-UDJ (User Defined Join)
    MapReduce 实现的 JOIN 逻辑。
          

    • 使用 SQL-TRANSFORM
    实用场景:MapReduce Streaming 作业。
    合理设置资源参数
    Map设置
    set odps.sql.mapper.cpu=100
    作用:设置处理Map Task每个Instance的CPU数目
    set odps.sql.mapper.memory=1024
    作用:设定Map Task每个Instance的Memory大小
    set odps.sql.mapper.merge.limit.size=64
    作用:设定控制文件被合并的最大阈值
    set odps.sql.mapper.split.size=256
    作用:设定一个Map的最大数据输入量
    Join设置
    set odps.sql.joiner.instances=-1
    作用: 设定Join Task的Instance数量
    set odps.sql.joiner.cpu=100
    作用: 设定Join Task每个Instance的CPU数目
    set odps.sql.joiner.memory=1024
    作用:设定Join Task每个Instance的Memory大小
    Reduce设置
    set odps.sql.reducer.instances=-1
    作用: 设定Reduce Task的Instance数量
    set odps.sql.reducer.cpu=100
    作用:设定处理Reduce Task每个Instance的Cpu数目
    set odps.sql.reducer.memory=1024
    作用:设定Reduce Task每个Instance的Memory大小
      GROUPING SETS 优化步伐*:
    1. SELECT NULL, NULL, NULL, COUNT(*)
    2. FROM requests
    3. UNION ALL
    4. SELECT os, device, NULL, COUNT(*)
    5. FROM requests GROUP BY os, device
    6. UNION ALL
    7. SELECT NULL, NULL, city, COUNT(*)
    8. FROM requests GROUP BY city;
    复制代码
    上述 SQL 运行时物理实行筹划做了 3 次聚合,然后再 UNION 起来。

    1. SELECT os, device, city, COUNT(*)
    2. FROM requests
    3. GROUP BY os, device, city GROUPING SETS((os, device), (city), ());
    复制代码
    物理实行筹划只包含一个 Reduce 阶段,无需举行 UNION 操作,使用更少代码的同时斲丧更少的集群资源。
        恢复已删

      

      

      
    总结

      颠末一个多月的整理和总结,终于完成了《ODPS开发大全》的这个底子版本。在这过程中我不停地接触到新的知识点,学到之前未曾掌握的技能,也感叹ODPS功能之丰富强大。渴望在将来工作中,自己可以多沉淀好的技能文档,这不但让我更加深刻地温习过往学习的技能,也可以把知识共享给更多求知若渴的技能人,建设更开放的CS技能社区。
      
    ¤ 拓展阅读 ¤

      
    3DXR技能 | 终端技能 | 音视频技能

      
    服务端技能 | 技能质量 | 数据算法


    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
  • 本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有账号?立即注册

    x
    回复

    使用道具 举报

    0 个回复

    倒序浏览

    快速回复

    您需要登录后才可以回帖 登录 or 立即注册

    本版积分规则

    王海鱼

    金牌会员
    这个人很懒什么都没写!

    标签云

    快速回复 返回顶部 返回列表