ToB企服应用市场:ToB评测及商务社交产业平台

标题: Hive大表join大表怎样调优 [打印本页]

作者: 千千梦丶琪    时间: 2024-7-14 02:42
标题: Hive大表join大表怎样调优
在Hive中,优化器会根据统计信息决定是将大表放在前面(Join的左边)还是小表放在前面。通常,优化器会选择数据量较小的表作为驱动表(小表作为左边),由于这样可以淘汰内存消耗并提高效率。
但是,如果你有特定的需求,好比你知道大部分数据能快速过滤掉,渴望淘汰任务的实行时间,那么你可以逼迫指定某个表作为小表。在Hive中,可以使用/*+ MAPJOIN(table_name) */ 注释来逼迫将一个大表作为小表处理。
例如,如果你想要将big_table作为小表:
  1. SELECT /*+ MAPJOIN(big_table) */
  2.   a.column1, a.column2, b.column1, b.column2
  3. FROM
  4.   small_table a
  5. JOIN
  6.   big_table b
  7. ON
  8.   a.common_column = b.common_column;
复制代码
一、调优思路

1、SQL优化

1.1 大小表join

1、mapjoin,小表使用mapjoin,大概逼迫hint
2、将大表放后头,原因:Hive假定查询中末了的一个表是大表。它会将其它表缓存起来,然后扫描末了那个表。因此通常需要将小表放前面,大概标记哪张表是大表:/*streamtable(table_name) */
3、过滤无效值:空值、不使用的字段等。
4、不能过滤的空值,将空值转化为随机数避免数据倾斜。
1.2 大大表join

  1. 1)创建第二张大表
  2. create table bigtable2(
  3.     id bigint,
  4.     t bigint,
  5.     uid string,
  6.     keyword string,
  7.     url_rank int,
  8.     click_num int,
  9.     click_url string)
  10. row format delimited fields terminated by '\t';
  11. load data local inpath '/opt/module/data/bigtable' into table bigtable2;
  12. 2)测试大表直接JOIN
  13. insert overwrite table jointable
  14. select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
  15. from bigtable a
  16. join bigtable2 b
  17. on a.id = b.id;
  18. 测试结果:Time taken: 72.289 seconds
  19. insert overwrite table jointable
  20. select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
  21. from bigtable a
  22. join bigtable2 b
  23. on a.id = b.id;
  24. 3)创建分桶表1
  25. create table bigtable_buck1(
  26.     id bigint,
  27.     t bigint,
  28.     uid string,
  29.     keyword string,
  30.     url_rank int,
  31.     click_num int,
  32.     click_url string)
  33. clustered by(id)
  34. sorted by(id)
  35. into 6 buckets
  36. row format delimited fields terminated by '\t';
  37. load data local inpath '/opt/module/data/bigtable' into table bigtable_buck1;
  38. 4)创建分桶表2,分桶数和第一张表的分桶数为倍数关系
  39. create table bigtable_buck2(
  40.     id bigint,
  41.     t bigint,
  42.     uid string,
  43.     keyword string,
  44.     url_rank int,
  45.     click_num int,
  46.     click_url string)
  47. clustered by(id)
  48. sorted by(id)
  49. into 6 buckets
  50. row format delimited fields terminated by '\t';
  51. load data local inpath '/opt/module/data/bigtable' into table bigtable_buck2;
  52. 5)设置参数
  53. set hive.optimize.bucketmapjoin = true;
  54. set hive.optimize.bucketmapjoin.sortedmerge = true;
  55. set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
  56. 6)测试 Time taken: 34.685 seconds
  57. insert overwrite table jointable
  58. select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
  59. from bigtable_buck1 s
  60. join bigtable_buck2 b
  61. on b.id = s.id;
复制代码
1、使用相同的连接键

2、过滤无效、未使用的数据:淘汰每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段。
  1. 加随机数打散
  2. 1)空值0值 或 关联不上的,用随机数
  3. from a join b
  4. on if(a.key=’’, rand(id)%10, a.key)=b.key
  5. –rand() 0-1之间的小数
  6. (2)都是有用的key,则加随机数后缀
  7. group by concat(key, cast(round(rand()*10) as int))
  8. 缺点是分成10份是提前写好的,数据变更大时,还是会跑得慢。
复制代码
3、逻辑拆分,使用中间表计算

4、列裁剪,避免使用select * 如果查询的是分区表,一定要记得带上分区条件
5、where条件写在子查询中:先过滤再关联(最好使用这种笨办法,虽然hive3.0自带谓词下推)
6、关联条件写在on中,而不是where中

7、数据量小时,用in代替join
8、使用semi join替代in/exists
inner join和left semi join的联系和区别
2、insert into更换union all

如果union all的部分个数大于2,大概每个union部分数据量大,应该拆成多个insert into 语句,效率有提升。
?insert into到不同分区?
3、排序order by换位sort by

order by:对输入做全局排序,因此只有一个reducer(多个reducer无法包管全局有序),只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。
sort by:局部排序,包管每个reducer的输出文件是有序的。
hive order by,sort by, distribute by, cluster by作用以及用法
4、并行实行

Hive会将一个查询转化成一个大概多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、归并阶段、limit阶段。大概Hive实行过程中可能需要的其他阶段。
默认情况下,Hive一次只会实行一个阶段。不外,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全相互依赖的,也就是说有些阶段是可以并行实行的,这样可能使得整个job的实行时间缩短。如果有更多的阶段可以并行实行,那么job可能就越快完成。
通过设置参数hive.exec.parallel值为true,就可以开启并发实行。在共享集群中,需要注意下,如果job中并行阶段增多,那么集群使用率就会增长。
  1. set hive.exec.parallel=true; //打开任务并行执行
  2. set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
  3. set hive.exec.parallel=true
复制代码
5、数据倾斜优化

spark-数据倾斜、
hadoop-hive-数据倾斜问题
6、小文件优化

spark调优-小文件问题
参考链接
HiveSQL大表join大表数据倾斜
二、实战

添加链接描述
2.1 场景

【配景】
A表为一个汇总表,汇总的是卖家买家近来N天交易汇总信息,即对于每个卖家近来N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数。
A表的字段有:buyer_id、seller_id、pay_cnt_90day。
B表为卖家基本信息表,其字段有seller_id、sale_level,其中sale_levels是卖家的一个分层评级信息,好比吧卖家分为6个级别:S0、S1、S2、S3、S4和S5。
要得到的效果是每个买家在各个级别的卖家的成交比例信息,好比:某买家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。
【初始思路】
第一反应是直接join两表并统计:
  1.         select
  2.          m.buyer_id,
  3.         sum(pay_cnt_90day)  as pay_cnt_90day,
  4.         sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
  5.         sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
  6.         sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
  7.         sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
  8.         sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
  9.         sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
  10.       from (
  11.         select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
  12.         from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
  13.         join
  14.                (select seller_id,  sale_level  from table_B)  b
  15.         on  a.seller_id  = b.seller_id
  16.         )  m
  17.       group by m.buyer_id
复制代码
但是此SQL会引起数据倾斜,原因在于卖家的二八准则,某些卖家90天内会有几百万甚至上万万的买家,但是大部分的卖家90天内买家的数目并不多,join table_A和table_B的时候,ODPS会按照seller_id进行分发,table_A的大卖家引起了数据倾斜。但是数据本身无法用mapjoin table_B解决,由于卖家凌驾万万条,文件大小有几个GB,凌驾了1GB的限定。
2.2 限定所需的字段,间接mapjoin

思路:只看90天内有交易的卖家,不join全部的卖家表
范围:此方案在一些情况可以起作用,但是很多时候还是无法解决上述问题,由于大部分卖家只管90天内买家不多,但还是有一些的,过滤后的B表仍然很多。
  1.  select
  2.          m.buyer_id,
  3.         sum(pay_cnt_90day)  as pay_cnt_90day,
  4.         sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,
  5.         sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,
  6.         sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,
  7.         sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
  8.         sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
  9.         sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
  10.       from (
  11.         select  /*+mapjoin(b)*/
  12.           a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day
  13.         from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a
  14.         join  (
  15.                            select seller_id,  sale_level  from table_B b0
  16.                            join
  17.                            (select seller_id from table_A group by seller_id) a0
  18.                              on b0.seller_id = a0.selller_id
  19.           )  b
  20.         on  a.seller_id  = b.seller_id
  21.         )  m
  22.       group by m.buyer_id   
复制代码
2.2 解决异常值倾斜,如NULL加随机数打散

**思路:**焦点是将这些引起倾斜的值随机分发到Reduce,join时对这些特殊值concat随机数,从而达到随机分发的目的。
**适用于:**倾斜的值是明确的而且数目很少,好比null值引起的倾斜。
**范围:**无法解决本问题场景的倾斜问题,由于倾斜的卖家大量存在而且动态变化。
此方案的焦点逻辑如下:
  1.                 select a.user_id, a.order_id, b.user_id
  2.       from table_a a
  3.       join table_b b
  4.       on (case when a.user_is is null then concat('hive', rand(id)) else a.user_id end) = b.user_id
复制代码
Hive 已对此进行了优化,只需要设置参数skewinfo和skewjoin参数,不修改SQL代码,例如,由于table_B的值“0” 和“1”引起了倾斜,值需要做如下设置:
  1. set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ]
  2. set hive.optimize.skewjoin = true;
复制代码
2.3 扩容解决数据倾斜

推荐”2.3.3 推荐:分而治之:倾斜和非倾斜再union all“,可直接看。若不可,推荐方案2.3.2倾斜key扩容。
2.3.1 客户表扩大N倍

思路:按照seller_id分发会倾斜,那么再人工增长一列进行分发,这样之前倾斜的值的倾斜程度会淘汰到原来的1/10,可以通过设置numbers表改放大倍数来降低倾斜程度。
代码实现:建立一个numbers表,其值只有一列int 行,好比从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。
范围性:数据量翻倍,B表也会膨胀N倍。
  1. SELECT m.buyer_id,
  2.          sum(pay_cnt_90day) AS pay_cnt_90day,
  3.          sum(case  WHEN m.sale_level = 0 THEN pay_cnt_90day end) AS pay_cnt_90day_s0,
  4.          sum(case WHEN m.sale_level = 1 THEN pay_cnt_90day end) AS pay_cnt_90day_s1,
  5.          sum(case WHEN m.sale_level = 2 THEN  pay_cnt_90day end) AS pay_cnt_90day_s2,
  6.      sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
  7.      sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
  8.      sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
  9. FROM
  10.     (SELECT a.buer_id,
  11.          a.seller_id,
  12.          b.sale_level,
  13.          a.pay_cnt_90day
  14.     FROM
  15.         (SELECT buyer_id,
  16.          seller_id,
  17.          pay_cnt_90day
  18.         FROM table_A) a
  19.         JOIN -- 将B表扩容N倍
  20.             (SELECT /*+mapjoin(members)*/ seller_id,
  21.          sale_level ,
  22.         member
  23.             FROM table_B
  24.             JOIN members -- 扩容N倍的表
  25.              ) b
  26.           ON a.seller_id = b.seller_id
  27.               AND mod(a.pay_cnt_90day,10)+1 = b.number ) m
  28.         GROUP BY  m.buyer_id
复制代码
2.3.2 部分倾斜key扩容,大卖家扩容

思路:把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(好比dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。
代码实现:在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持稳定。
范围性: 相比全部数据扩容,仅倾斜指标扩容的运行效率有提升,但代码复杂性高,必须首先建立大数据表。
  1. SELECT m.buyer_id,
  2.          sum(pay_cnt_90day) AS pay_cnt_90day,
  3.          sum(case  WHEN m.sale_level = 0 THEN pay_cnt_90day end) AS pay_cnt_90day_s0,
  4.          sum(case WHEN m.sale_level = 1 THEN pay_cnt_90day end) AS pay_cnt_90day_s1,
  5.          sum(case WHEN m.sale_level = 2 THEN  pay_cnt_90day end) AS pay_cnt_90day_s2,
  6.      sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
  7.      sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
  8.      sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
  9.     pay_cnt_90day end) AS pay_cnt_90day_s5
  10. FROM
  11.     (SELECT a.buer_id,
  12.          a.seller_id,
  13.          b.sale_level,
  14.          a.pay_cnt_90day
  15.     FROM
  16.         (SELECT /*+mapjoin(big)*/ buyer_id,
  17.          seller_id,
  18.          pay_cnt_90day,
  19.          if(big.seller_id is NOT null,
  20.          concat( table_A.seller_id,
  21.          'rnd', cast( rand() * 1000 AS bigint ), table_A.seller_id) AS seller_id_joinkey
  22.         FROM table_A left outer
  23.         JOIN --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变
  24.         (  SELECT seller_id
  25.              FROM dim_big_seller
  26.              GROUP BY  seller_id
  27.          )big
  28.             ON table_A.seller_id = big.seller_id ) a
  29.         JOIN
  30.             (SELECT /*+mapjoin(big)*/ seller_id,
  31.                      sale_level ,
  32.                   --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样 coalesce(seller_id_joinkey,
  33.                      table_B.seller_id) AS seller_id_joinkey
  34.             FROM table_B
  35.             left out JOIN --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变
  36.                 (SELECT seller_id,
  37.                          seller_id_joinkey
  38.                 FROM dim_big_seller
  39.                 ) big
  40.                     ON table_B.seller_id= big.seller_id ) b
  41.                     ON a.seller_id_joinkey= b.seller_id_joinkey
  42.                         AND mod(a.pay_cnt_90day,10)+1 = b.number ) m
  43.                 GROUP BY  m.buyer_id
复制代码
2.3.3 推荐:分而治之:倾斜和非倾斜再union all

思路:对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,倾斜的把他们找出来做mapjoin,末了union all其效果即可。
代码实现
范围性:较贫苦,代码复杂而且需要一个临时表存放倾斜的键值。
  1. --1、构建临时表,由于数据倾斜,先找出90天买家超过10000的卖家
  2. insert overwrite table temp_table_B
  3. SELECT m.seller_id,
  4.          n.sale_level
  5. FROM
  6.     (SELECT seller_id
  7.     FROM
  8.         (SELECT seller_id,
  9.         count(buyer_id) AS byr_cnt
  10.         FROM table_A
  11.         GROUP BY  seller_id ) a
  12.         WHERE a.byr_cnt >10000 ) m
  13.     LEFT JOIN
  14.     (SELECT seller_id,
  15.          sale_level
  16.     FROM table_B
  17.     ) n
  18.     ON m.seller_id = n.seller_id;
  19.    
  20.    
  21. --2、分而治之,不倾斜union all 倾斜。
  22. --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可
  23. 。SELECT m.buyer_id,
  24.          sum(pay_cnt_90day) AS pay_cnt_90day,
  25.          sum(case  WHEN m.sale_level = 0 THEN pay_cnt_90day end) AS pay_cnt_90day_s0,
  26.          sum(case WHEN m.sale_level = 1 THEN pay_cnt_90day end) AS pay_cnt_90day_s1,
  27.          sum(case WHEN m.sale_level = 2 THEN  pay_cnt_90day end) AS pay_cnt_90day_s2,
  28.      sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,
  29.      sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,
  30.      sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5
  31. FROM
  32.     (SELECT a.buer_id,
  33.          a.seller_id,
  34.          b.sale_level,
  35.          a.pay_cnt_90day
  36.     FROM
  37.         (SELECT buyer_id,
  38.          seller_id,
  39.          pay_cnt_90day
  40.         FROM table_A
  41.         ) a
  42.         JOIN
  43.             (SELECT seller_id,
  44.                     a.sale_level
  45.             FROM table_A a
  46.             LEFT JOIN temp_table_B b
  47.                 ON a.seller_id = b.seller_id
  48.             WHERE b.seller_id is  NULL -- 限制为不倾斜的卖家
  49.             ) b
  50.                 ON a.seller_id = b.seller_id
  51.        UNION all
  52.        SELECT /*+mapjoin(b)*/ a.buer_id,
  53.          a.seller_id,
  54.          b.sale_level,
  55.          a.pay_cnt_90day
  56.             FROM
  57.                 (SELECT buyer_id,
  58.                          seller_id,
  59.                          pay_cnt_90day
  60.                 FROM table_A
  61.                 ) a
  62.                 JOIN
  63.                     (SELECT seller_id,
  64.                             sale_level
  65.                     FROM table_B -- 只看倾斜卖家
  66.                     ) b
  67.                ON a.seller_id = b.seller_id
  68.                ) m
  69.                 GROUP BY  m.buyer_id ) m
  70.             GROUP BY  m.buyer_id
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4