ToB企服应用市场:ToB评测及商务社交产业平台

标题: 给技术新人的ODPS优化建议 [打印本页]

作者: 南七星之家    时间: 2023-4-12 19:20
标题: 给技术新人的ODPS优化建议
  1. 数据开发基本都是从陌生到熟悉,但是写多了就会发现各种好用的工具/函数,也会发现各种坑,本文分享了作者从拿到数据到数据开发到数据监控的一些实操经验。
复制代码
写在前面
本文档是组内的一份算法ODPS离线开发分享,仅列出了这些年积累下来的一些重要经验和结论,特别是在算法日常数据处理和调度中的技巧和配置方法,至于具体为什么,建议大家去阿里云官网查看底层map reduce以及data flow的工作原理,会有更加深刻的体会:)参考文档:https://help.aliyun.com/document_detail/89993.html各种join的用法篇
输入数据:zhule_a, zhule_b两张测试表,具体schema和数据如下(后续为了说明重复数据带来的问题,会在a和b表中各重复插入一条key=2,ds='20220930'的数据):read zhule_a;
keyds
120220930
220220930
120221001
220221001
320221001
read zhule_b;
keyds
220220930
320220930
120221001
220221001
320221001
420221001
520221001
 
Join/Inner Join用法:Returns the rows that have matching column values in both the left table and the right table based on the join condition。一句话:找出两个表中共同的部分,注意笛卡尔积下面的性能优化1、每张表先选出来subset,然后再join。
  1. -- better way to perform join, select small range of data first.
  2. SELECT A.*, B.*
  3. FROM
  4. (SELECT * FROM A WHERE ds='20180101') A
  5. JOIN
  6. (SELECT * FROM B WHERE ds='20180101') B
  7. ON a.key = b.key;
复制代码
注意:在进行各种jion操作前,一定要自查左右表是否有重复数据,否则最终重复的结果会以笛卡尔积的数量增长,比如左右表各有两条重复数据,那么join后重复数据会多达4条!2、最好的情况下是大表join小表,然后利用mapjoin来实现。官方解释:In the map stage, MAPJOIN loads all data in the specified tables into the memory of the program that performs the JOIN operation. The tables specified for MAPJOIN must be small tables, and the total memory occupied by the table data cannot exceed 512 MB.Limits on JOIN operations in MAPJOIN:
  1. SELECT  /*+ MAPJOIN(b) */
  2.         a.*
  3. FROM    test_a a
  4. JOIN test_b b
  5. ON      a.user_key = b.user_key
  6. ;
  7. //就是在sql语句前加一个标记说这是mapjoin,把小表别名写在括号里
复制代码
一个有趣的点:当我们用mapjoin时,除了正常的等式,mapjoin还支持不等式,如下面的例子: 
Left Join
用法:A LEFT JOIN operation first takes the Cartesian product of the rows in Table A and Table B and returns all the rows of Table A and rows in Table B that meet the join condition. If the join condition finds no matching rows in Table B for a row in Table A, the row in Table A is returned in the result set with NULL values in each column from Table B.一句话:输出左表的所有记录,以及右表中符合关联条件的数据。右表中不符合关联条件的行,输出NULL。 
Left Semi Join
用法:A LEFT SEMI JOIN operation returns only the rows in Table A that have a matching row in Table B. 对于左表中的一条数据,如果右表存在符合关联条件的行,则输出左表,否则不输出
  1. employee (2 columns - e_id and e_name)
  2. 10, Tom
  3. 20, Jerry
  4. 30, Emily
  5. employee_department_mapping (2 columns - e_id and d_name)
  6. 10, IT
  7. 10, Sales
  8. 20, HR
  9. -- inner join results
  10. SELECT e.e_id, e.e_name, d.d_name FROM
  11. employee e INNER JOIN employee_department_mapping d
  12. on e.e_id = d.e_id
  13. -- results
  14. 10, Tom, IT
  15. 10, Tom, Sales
  16. 20, Jerry, HR
  17. -- left semi join results
  18. SELECT e.e_id, e.e_name, d.d_name FROM
  19. employee e LEFT SEMI JOIN employee_department_mapping d
  20. on e.e_id = d.e_id
  21. -- results
  22. 10, Tom, IT
  23. 20, Jerry, HR
复制代码
 
Left Anti Join用法:A LEFT ANTI JOIN operation returns only the rows in Table A that have no matching rows in Table B.一句话:对于左表中的一条数据,如果右表中不存在符合关联条件的数据,则输出左表。
 
Full Join
用法:A FULL JOIN operation takes the Cartesian product of the rows in Table A and Table B and returns all the rows in Table A and Table B, whether the join condition is met or not. In the result set, NULL values are returned in the columns from the table that lacks a matching row in the other table.一句话:输出左表和右表的所有记录,对于不符合关联条件的数据,未关联的另一侧输出NULL举个栗子,其中today_feat是今天新计算的feature table,yest_feat是上一个分区的feature:
  1. SELECT  COALESCE(a.main_image_url,b.main_image_url) AS main_image_url
  2.         ,COALESCE(a.embedding,b.embedding) AS embedding
  3. FROM    today_feat a
  4. FULL JOIN yest_feat b
  5. ON      a.main_image_url = b.main_image_url
复制代码
其中full jion的效果如下,正好满足new,old,updated feature的更新,配合COALESCE很丝滑:
合理设置Mapper和Reducepriority
 
set odps.instance.priority
目前ODPS更新后只能在开发dev空间生效,通过设置优先级能够一定程度提升排队任务的执行优先级,但是目前线上正式环境不会生效了,建议大家优化好自己健康分,同时对于重要的线上调度任务设置好基线,保证产出的时效。 
set odps.sql.mapper.split.size
官方指导:Changes the input data amount of each Map worker, which is the split size of the input file. You can use this property to indirectly control the number of workers at each Map stage (default value: 256, unit: MB)。一句话:如果小文件很多,可以调大split.size的数值,这样可以保证在有限资源下更容易申请到Mapper,提升执行的效率。如果资源丰富,想要更多Mapper资源,那就调小split.size的数值,可以申请到更多的Mapper,提升执行效率。酌情处理哟~举个栗子: 
  1. -- original sql
  2. CREATE TABLE if not EXISTS tmp_zhl_test LIFECYCLE 1 AS
  3. SELECT sig, venture, seller_id, COUNT(product_id) as cnt
  4. FROM sku_main_image_sig
  5. WHERE LENGTH(sig) > 10 --some bad cases may have weird sigs like '#NEXT#'
  6. GROUP BY sig, venture, seller_id
  7. HAVING cnt>2
  8. ;
复制代码
如果是默认设置,553 mappers 和 184 reducers 被分配,大约耗时 3m18s:在资源充沛的情况下,我们设置odps.sql.mapper.split.size=64, 可以申请到更多的Mapper去处理文件的分片,同时更多的reducer也可以被分配到,同样的SQL代码执行时间降为: 2m34s. 同样的,如果你的数据量超大,但是每条数据本身很小,同时空间资源也有限(毕竟现在资源管控比较严格),与其等待9999个Mapper被分配,你可以尝试设置odps.sql.mapper.split.size=2048(甚至更大)去减少需要分配的Mapper数量,让任务能够快速执行:) 
set odps.sql.reducer.instances
显示设置reducer的数量(默认值从0到4000),不设置的话会根据任务动态分配,设置后就会分配设置数量的reducer。同样是上面的例子,当我们设置odps.sql.reducer.instances=1000, 耗时变为2m。 
set odps.sql.mapper(reducer).memory
设置每个Map/Reducer worker的内存(默认值是1024,可以设置为256到12288之间的值)一般我们不需要特别设置这个值,但是当任务报错并说「data exceeds the memory」时,可以根据个人情况来设置这个选项。在Python UDF中使用第三方库
在这部分主要和大家分享下如何在ODPS的python udf安装需要的第三方库(如numpy,opencv等),以及如果有不同依赖库之间的版本不兼容问题时的有效解决方法。 
Upload&Call Package
  1. def include_package_path(res_name, lib_name):   
  2.     archive_files = get_cache_archive(res_name)
  3.     dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  4.                         if '.dist_info' not in f.name], key=lambda v: len(v))
  5.     for dir_name in dir_names:
  6.         if dir_name.endswith(lib_name):
  7.             sys.path.insert(0, os.path.dirname(dir_name))
  8.             break
  9.         elif os.path.exists(os.path.join(dir_name, lib_name + '.py')):
  10.             sys.path.insert(0, os.path.abspath(dir_name))
  11.             break
  12. class PostProcess(BaseUDTF):
  13.     def __init__(self):
  14.         include_package_path('opencv_python-3.4.0.zip','cv2')
  15.         include_package_path('numpy-1.16.6.zip','numpy')
复制代码

  1. set odps.sql.python.version = cp37; --use python 3.7, default is 2.x version
  2. set odps.isolation.session.enable = true;
复制代码
 
Solve Compatibility Issue
有时候在使用多个库时,可能会出现不同版本之间的冲突问题(比如在使用opencv库的时候,如果对应的numpy版本不兼容的话,就会报错)。所以在上传多个库的资源包前,需要先确认版本间的兼容性。一般非常不建议大家用不同版本去试,而应该先在本地确定版本后再上传。可行的步骤如下:
发布任务时的一些额外建议
  1. --exclude input or output tables (especially those tmp tables)--@exclude_input=lsiqg_iqc_sku_product_detection_result--@exclude_output=lsmp_sku_image_url_bizdate
  2. -- include input or output tables (especially those separate venture tables)--@extra_input=lsiqg_iqc_sku_product_detection_result--@extra_output=lsmp_sku_image_url_bizdate
复制代码
  1. INSERT OVERWRITE TABLE sku_main_image_sig PARTITION (ds = '${bizdate}',venture)
  2. SELECT  id
  3.         ,image_url
  4.         ,venture
  5. FROM    (
  6.             SELECT  id
  7.                     ,image_url
  8.                     ,'ID' AS venture
  9.             FROM    auction_image_id
  10.             WHERE   ds = MAX_PT('auction_image_id')
  11.             UNION
  12.             SELECT  id
  13.                     ,image_url
  14.                     ,'PH' AS venture
  15.             FROM    auction_image_ph
  16.             WHERE   ds = MAX_PT('auction_image_ph')
  17.             UNION
  18.             SELECT  id
  19.                     ,image_url
  20.                     ,'VN' AS venture
  21.             FROM    auction_image_vn
  22.             WHERE   ds = MAX_PT('auction_image_vn')
  23.             UNION
  24.             SELECT  id
  25.                     ,image_url
  26.                     ,'SG' AS venture
  27.             FROM    auction_image_sg
  28.             WHERE   ds = MAX_PT('auction_image_sg')
  29.             UNION
  30.             SELECT  id
  31.                     ,image_url
  32.                     ,'MY' AS venture
  33.             FROM    auction_image_my
  34.             WHERE   ds = MAX_PT('auction_image_my')
  35.             UNION
  36.             SELECT  id
  37.                     ,image_url
  38.                     ,'TH' AS venture
  39.             FROM    auction_image_th
  40.             WHERE   ds = MAX_PT('auction_image_th')
  41.         ) union_table
  42. ;
复制代码

简单的任务可以直接在任务中心查看详情中设置:
对于更加细致的数据层面监控可以通过DQC平台进行配置,比如无数据产出,数据波动,数据最大/最小值监控等。写在最后
数据开发基本都是从陌生到熟悉,但是写多了就会发现各种好用的工具/函数,也会发现各种坑,个人拿到数据到数据开发到数据监控的一些经验是:Enjoy Data Engineering!!
 
作者|周慧玲(逐乐)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4