基于flink维表join优化浅述

打印 上一主题 下一主题

主题 1023|帖子 1023|积分 3069

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
一、维表join基础操作

      在实际工作场景中,数据流处理往往必要与相干的维度数据举行关联操作。根据不同的业务场景和需求,合理选择相应的缓存策略至关紧张,以确保数据处理的效率和准确性。
   通用缓存策略详解

          在利用维表 Join 时,大部门毗连器都支持缓存策略,但不同毗连器对缓存策略的支持环境可能略有不同。因此,在利用缓存策略之前,请务必查阅对应毗连器的官方文档,以确定具体的支持环境。以下是通用的缓存策略及其具体说明:
  1. None(默认值):无缓存

  形貌:倒霉用任何缓存机制,每次查询都直接从物理维表中获取数据。
  利用场景
  

  • 对数据准确性要求极高,无法容忍任何数据耽误或不准确的环境。
  • 维表数据更新频繁,及时性要求高。
  优点
  

  • 数据及时性强,能够保证查询效果的准确性。
  缺点
  

  • 当数据量较大时,Join 操作的性能压力会明显增加,容易导致反压。
  • 对维表数据库的查询压力较大,可能影响数据库性能。
  2. LRU(Least Recently Used):缓存部门数据

  形貌:缓存维表中的部门数据。系统会根据最近利用环境维护一个缓存,源表的每条数据都会先在缓存中查找,如果未找到,则从物理维表中查询。
  利用场景
  

  • 维表数据更新频率较低,可以容忍少量数据不准确的环境。
  • 对查询性能有肯定要求,但对数据及时性要求不高。
  优点
  

  • 提高 Join 操作的性能,减少对物理维表的查询次数。
  • 缓存机制可以有用缓解数据库的压力。
  缺点
  

  • 当维表数据发生变化时,缓存中的部门数据可能会变得不准确。
  • 缓存巨细有限,可能导致部门数据无法被缓存。
  3. ALL:缓存全部数据

  形貌:缓存维表中的所有数据。在作业运行前,系统会将维表中的所有数据加载到缓存中,之后所有的维表查找操作都会通过缓存举行。如果在缓存中未找到数据,则认为该键不存在。全量缓存有一个过期时间,过期后会重新加载全量缓存。
  利用场景
  

  • 适用于远程表数据量较小,且源表数据与维表 JOIN 时,ON 条件无法关联(MISS KEY)的环境较多。
  • 对查询性能要求极高,且可以接受全量数据加载的耽误。
  优点
  

  • 提供最高的查询性能,由于所有数据都在缓存中。
  • 减少了对物理维表的查询,降低了数据库的压力。
  缺点
  

  • 缓存占用内存较大,可能必要较多的资源。
  • 数据更新后,必要等待缓存过期或手动刷新缓存,可能存在短暂的数据不一致。
          现在维表关联伪代码如下:
  1. --数据源
  2. CREATE TEMPORARY TABLE tmp (
  3.     driver_id bigint,
  4.     lng double precision,
  5.     lat double precision,
  6.     speed double precision,
  7.     proctime as PROCTIME()
  8. )
  9. WITH (
  10.     'connector' = 'mysql', --连接器的标示
  11.     'dbname' = '',
  12.     'tablename' = '',
  13.     'username' = '',
  14.     'password' = '',
  15.     'endpoint' = ''
  16. );
  17. --维表
  18. CREATE TEMPORARY TABLE dim_tmp (
  19.     driver_id bigint,
  20.     driver_value VARCHAR,
  21.     PRIMARY KEY(driver_id) not ENFORCED
  22. )
  23. WITH (
  24.     'connector' = 'redis',
  25.     'host' = '',
  26.     'port' = '6379',
  27.     'password' = '',
  28.     'cache' = 'LRU', --缓存
  29.     'cacheSize' = '100000',--缓存数据量
  30.     'cacheTTLMs' = '0',--时效
  31.     'dbNum' = '4'   
  32. );
  33. --维表join
  34. select
  35. t.driver_id
  36. ,t2.driver_value
  37. FROM tmp t1
  38. LEFT JOIN dim_tmp FOR SYSTEM_TIME AS OF t1.proctime AS t2
  39.   ON  t1.driver_id = t2.driver_id
复制代码
二、维表join 策略

Lookup Hints

场景策略
配置'shuffle' = 'true'选项
利用引擎默认的shuffle策略。
不配置'shuffle' = 'true'选项,且维表毗连器不提供自界说联接策略
配置'shuffle' = 'true' 选项,且维表毗连器不提供自界说联接策略
默认利用SHUFFLE_HASH策略
配置'shuffle' = 'true' 选项,且维表毗连器提供自界说联接策略
利用表毗连器的自界说shuffle策略。
对维表配置联接时的shuffle策略代码示比方下:
  1. --维表join
  2. select /*+ LOOKUP('table'='t2', 'shuffle' = 'true'),LOOKUP('table'='t3', 'shuffle' = 'true') */
  3. t.driver_id
  4. ,t2.driver_value
  5. FROM tmp t1
  6. LEFT JOIN dim_tmp FOR SYSTEM_TIME AS OF t1.proctime AS t2
  7.   ON  t1.driver_id = t2.driver_id
  8. LEFT JOIN dim_tmp_1 FOR SYSTEM_TIME AS OF t1.proctime AS t3
  9.     ON  t1.driver_id = t3.driver_id
复制代码
其他Join Hints

        维表Join Hints仅用于配置维表联接策略,包罗SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。维表Cache策略和联接策略之间的适用场景详情如下表所示。
Cache策略
SHUFFLE_HASH
REPLICATED_SHUFFLE_HASH
(和SKEW等价)
None
不发起利用该联接策略提示,主流会引入额外的网络开销。
不发起利用该联接策略提示,主流会引入额外的网络开销。
LRU
在维表查找IO成为瓶颈时,发起考虑利用该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO哀求数,从而提拔总吞吐。
紧张
主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,碰到性能瓶颈时,发起考虑REPLICATED_SHUFFLE_HASH
在维表查找IO成为瓶颈且主流数据在Join Key上有倾斜时,发起考虑该联接策略提示。当主流数据在Join Key上有时间局部性时,可以提高Cache命中率,减少IO哀求数,从而提拔总吞吐。
ALL
在维表内存利用量成为瓶颈时,发起利用该联接策略提示。内存利用率可降低为1/并发度
紧张
主流会引入额外的网络开销,当主流数据在Join Key上有倾斜,碰到性能瓶颈时,发起考虑REPLICATED_SHUFFLE_HASH
在维表内存利用量成为瓶颈且主流数据在Join Key上有倾斜时,发起利用该联接策略提示。内存利用率降低为分桶数/并发度
SHUFFLE_HASH

   

  • 利用效果
    在维表Join中利用Shuffle Hash策略,可以将主流数据在Join之前根据Join Key做一次shuffle。在利用LRU Cache策略时可以提高Cache命中率,减少IO哀求数;在利用ALL Cache策略时可以减少内存利用量。每个SHUFFLE_HASH联接提示可指定多张维表。
  • 利用限制
    虽然SHUFFLE_HASH可以减少内存开销,但是由于上游数据必要按照Join Key做一次shuffle,引入额外的网络开销,因此下面两种场景不适合利用SHUFFLE_HASH联接策略。
       

    • 主流数据在Join Key上存在严肃的数据倾斜,这种场景下如果利用SHUFFLE_HASH联接,会由于数据倾斜导致Join节点成为性能瓶颈,从而会导致流作业出现严肃反压或是批场景出现严肃长尾,此时发起利用REPLICATED_SHUFFLE_HASH联接。
    • 维表数据较小,ALL Cache策略加载没有内存瓶颈时,如果利用SHUFFLE_HASH联接,节省的内存开销和额外引入的网络开销相比,可能并不划算。

  代码示例:
  1. --维表join
  2. select /*+ SHUFFLE_HASH(t2, t3) */
  3. t.driver_id
  4. ,t2.driver_value
  5. FROM tmp t1
  6. LEFT JOIN dim_tmp FOR SYSTEM_TIME AS OF t1.proctime AS t2
  7.   ON  t1.driver_id = t2.driver_id
  8. LEFT JOIN dim_tmp_1 FOR SYSTEM_TIME AS OF t1.proctime AS t3
  9.     ON  t1.driver_id = t3.driver_id
复制代码
REPLICATED_SHUFFLE_HASH

   
  

  • 利用效果
    在维表Join中利用Replicated Shuffle Hash策略,其效果根本与SHUFFLE_HASH一致,但不同点是其会将主流具有相同key的数据随机打散到指定的N个并发上,可以办理数据倾斜导致的性能瓶颈。每个REPLICATED_SHUFFLE_HASH联接提示中可指定多张维表。
  • 利用限制
       

    • 必要配置倾斜数据分桶数目参数table.exec.skew-join.replicate-num,其默认值为16,取值不能大于维表联接节点的并发。
    • 当前不支持更新流,当主流是更新流时,利用REPLICATED_SHUFFLE_HASH策略会报错。

  代码示例:
  1. --维表join
  2. select /*+ REPLICATED_SHUFFLE_HASH(t2) */
  3. t.driver_id
  4. ,t2.driver_value
  5. FROM tmp t1
  6. LEFT JOIN dim_tmp FOR SYSTEM_TIME AS OF t1.proctime AS t2
  7.   ON  t1.driver_id = t2.driver_id
复制代码
SKEW

   

  • 利用效果
    当指定表存在数据倾斜时,优化器会在维表Join中利用Replicated Shuffle Hash策略(Skew只是一个语法糖,底层的实现是用的Replicated Shuffle Hash策略)。
  • 利用限制
       

    • 每个SKEW提示只能指定1张表。
    • 表名必要为存在数据倾斜的主表名称,而不是维表名称。
    • 当前不支持更新流,当主流是更新流时,利用SKEW策略会报错。

  代码示例:
  1. --维表join
  2. select /*+ SKEW(t1) */  
  3. t.driver_id
  4. ,t2.driver_value
  5. FROM tmp t1
  6. LEFT JOIN dim_tmp FOR SYSTEM_TIME AS OF t1.proctime AS t2
  7.   ON  t1.driver_id = t2.driver_id
复制代码
⚠️紧张
   

  • 当前LOOKUP Hint的shuffle选项已能覆盖 SHUFFLE_HASH hint功能,两者同时利用时,会优先采纳LOOKUP hint的shuffle选项。
  • 当前LOOKUP Hint的shuffle选项还未支持办理数据倾斜的功能,当和REPLICATED_SHUFFLE_HASH、SKEW同时利用时,会优先采纳REPLICATED_SHUFFLE_HASH、SKEW对应的shuffle策略。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

老婆出轨

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表