Join/Inner Join用法:Returns the rows that have matching column values in both the left table and the right table based on the join condition。一句话:找出两个表中共同的部分,注意笛卡尔积下面的性能优化1、每张表先选出来subset,然后再join。
-- better way to perform join, select small range of data first.
SELECT A.*, B.*
FROM
(SELECT * FROM A WHERE ds='20180101') A
JOIN
(SELECT * FROM B WHERE ds='20180101') B
ON a.key = b.key;
复制代码
注意:在进行各种jion操作前,一定要自查左右表是否有重复数据,否则最终重复的结果会以笛卡尔积的数量增长,比如左右表各有两条重复数据,那么join后重复数据会多达4条!2、最好的情况下是大表join小表,然后利用mapjoin来实现。官方解释:In the map stage, MAPJOIN loads all data in the specified tables into the memory of the program that performs the JOIN operation. The tables specified for MAPJOIN must be small tables, and the total memory occupied by the table data cannot exceed 512 MB.Limits on JOIN operations in MAPJOIN:
The left table in a LEFT OUTER JOIN operation must be a large table.
The right table in a RIGHT OUTER JOIN operation must be a large table.
MAPJOIN cannot be used in a FULL OUTER JOIN operation.
The left or right table in an INNER JOIN operation can be a large table.
SELECT /*+ MAPJOIN(b) */
a.*
FROM test_a a
JOIN test_b b
ON a.user_key = b.user_key
;
//就是在sql语句前加一个标记说这是mapjoin,把小表别名写在括号里
复制代码
一个有趣的点:当我们用mapjoin时,除了正常的等式,mapjoin还支持不等式,如下面的例子: Left Join
用法:A LEFT JOIN operation first takes the Cartesian product of the rows in Table A and Table B and returns all the rows of Table A and rows in Table B that meet the join condition. If the join condition finds no matching rows in Table B for a row in Table A, the row in Table A is returned in the result set with NULL values in each column from Table B.一句话:输出左表的所有记录,以及右表中符合关联条件的数据。右表中不符合关联条件的行,输出NULL。
一定要保留左表的内容是,可以选择用left join,例如加入key_attrs
Right Join和Left Join没有本质区别,建议定义好左表后都利用Left Join来执行
如果右表有重复数据的情况,那么最终left join的结果会有重复
Left Semi Join
用法:A LEFT SEMI JOIN operation returns only the rows in Table A that have a matching row in Table B. 对于左表中的一条数据,如果右表存在符合关联条件的行,则输出左表,否则不输出
当右表没有重复数据时,和Join是一致的,只会保留相同的列下来
left semi join并不会返回右表B中的任何数据,所以你没法在where条件中指定关于右表B的任何筛选条件,下面得例子能够有更加清晰的对比(例子引用于开源论坛):
employee (2 columns - e_id and e_name)
10, Tom
20, Jerry
30, Emily
employee_department_mapping (2 columns - e_id and d_name)
10, IT
10, Sales
20, HR
-- inner join results
SELECT e.e_id, e.e_name, d.d_name FROM
employee e INNER JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
10, Tom, Sales
20, Jerry, HR
-- left semi join results
SELECT e.e_id, e.e_name, d.d_name FROM
employee e LEFT SEMI JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
20, Jerry, HR
复制代码
Left Anti Join用法:A LEFT ANTI JOIN operation returns only the rows in Table A that have no matching rows in Table B.一句话:对于左表中的一条数据,如果右表中不存在符合关联条件的数据,则输出左表。
最好用的场景就是找出两表的差异部分;
算法日常调度时可以用于每日新增修改商品的提取,将关键字段放到ON条件中就行
Full Join
用法:A FULL JOIN operation takes the Cartesian product of the rows in Table A and Table B and returns all the rows in Table A and Table B, whether the join condition is met or not. In the result set, NULL values are returned in the columns from the table that lacks a matching row in the other table.一句话:输出左表和右表的所有记录,对于不符合关联条件的数据,未关联的另一侧输出NULL
set odps.instance.priority
目前ODPS更新后只能在开发dev空间生效,通过设置优先级能够一定程度提升排队任务的执行优先级,但是目前线上正式环境不会生效了,建议大家优化好自己健康分,同时对于重要的线上调度任务设置好基线,保证产出的时效。 set odps.sql.mapper.split.size
官方指导:Changes the input data amount of each Map worker, which is the split size of the input file. You can use this property to indirectly control the number of workers at each Map stage (default value: 256, unit: MB)。一句话:如果小文件很多,可以调大split.size的数值,这样可以保证在有限资源下更容易申请到Mapper,提升执行的效率。如果资源丰富,想要更多Mapper资源,那就调小split.size的数值,可以申请到更多的Mapper,提升执行效率。酌情处理哟~举个栗子:
-- original sql
CREATE TABLE if not EXISTS tmp_zhl_test LIFECYCLE 1 AS
SELECT sig, venture, seller_id, COUNT(product_id) as cnt
FROM sku_main_image_sig
WHERE LENGTH(sig) > 10 --some bad cases may have weird sigs like '#NEXT#'
GROUP BY sig, venture, seller_id
HAVING cnt>2
;
复制代码
如果是默认设置,553 mappers 和 184 reducers 被分配,大约耗时 3m18s:在资源充沛的情况下,我们设置odps.sql.mapper.split.size=64, 可以申请到更多的Mapper去处理文件的分片,同时更多的reducer也可以被分配到,同样的SQL代码执行时间降为: 2m34s. 同样的,如果你的数据量超大,但是每条数据本身很小,同时空间资源也有限(毕竟现在资源管控比较严格),与其等待9999个Mapper被分配,你可以尝试设置odps.sql.mapper.split.size=2048(甚至更大)去减少需要分配的Mapper数量,让任务能够快速执行:) set odps.sql.reducer.instances
显示设置reducer的数量(默认值从0到4000),不设置的话会根据任务动态分配,设置后就会分配设置数量的reducer。同样是上面的例子,当我们设置odps.sql.reducer.instances=1000, 耗时变为2m。 set odps.sql.mapper(reducer).memory
设置每个Map/Reducer worker的内存(默认值是1024,可以设置为256到12288之间的值)一般我们不需要特别设置这个值,但是当任务报错并说「data exceeds the memory」时,可以根据个人情况来设置这个选项。在Python UDF中使用第三方库
在这部分主要和大家分享下如何在ODPS的python udf安装需要的第三方库(如numpy,opencv等),以及如果有不同依赖库之间的版本不兼容问题时的有效解决方法。 Upload&Call Package
需要下载第三方库的安装包xxx.whl,可以直接下载到自己的电脑上面,这样可以在离线环境验证多个版本的一致性(下面介绍)。一般来说我们需要去看安装包需要的python版本号以及兼容机器环境,一般来说都是cp37-cp37m or py2.py3-none-any在中间,然后末尾是x86_64的安装包;
本地直接将xxx.whl转换为xxx.zip,利用命令「mv xxx.whl xxx.zip」就行
将zip资源文件上传到ODPS对应的环境
在你的UDF中,利用下面的代码指定资源包的路径和引用(直接copy就行)
def include_package_path(res_name, lib_name):
archive_files = get_cache_archive(res_name)
dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
if '.dist_info' not in f.name], key=lambda v: len(v))
--exclude input or output tables (especially those tmp tables)--@exclude_input=lsiqg_iqc_sku_product_detection_result--@exclude_output=lsmp_sku_image_url_bizdate
-- include input or output tables (especially those separate venture tables)--@extra_input=lsiqg_iqc_sku_product_detection_result--@extra_output=lsmp_sku_image_url_bizdate