FlinkSQL Regular Join之 Left Join

海哥  金牌会员 | 2024-8-22 19:58:36 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 534|帖子 534|积分 1602

一、Regular Join

        通例连接(Regular Join)是SQL中原生界说的Join方式,是最通用的一类连接操作。它的具体语法与标准SQL完全雷同,通过关键字JOIN来连接两个表,后面用关键字ON来指明条件。
      Flink SQL的Regular Join也可以分为Inner Join 和 Outer Join,区别在于结果中是否包罗不符合联结条件的行。Regular Join 包罗以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):

  • Inner Join:在流任务中,只有当两条流乐成连接时才会输出结果,格式为 +[L, R]。
  • Left Join:在流任务中,当左流数据到达时,无论是否与右流数据乐成连接,均会输出结果。如果连接乐成,输特别式为 +[L, R];如果连接失败,则输出 +[L, null]。若后续右流数据到达并与之前未连接的左流数据匹配乐成,则会先发起一次回撤操作,输出 -[L, null],随后输出新的连接结果 +[L, R]。
  • Right Join:与 Left Join 相似,但左右流的角色互换,即右流数据到达时,无论是否与左流数据乐成连接,均会输出结果。
  • Full Join:在流任务中,无论左流或右流的数据到达,都会实验输出结果。对于未乐成连接的情况,如果是右流数据,则输出 +[null, R];如果是左流数据,则输出 +[L, null]。若后续另一条流的数据到达并与之前未连接的数据匹配乐成,则会先举行一次回撤操作 (对于左流数据到达,回撤 -[null, R];对于右流数据到达,回撤 -[L, null]),随后输出新的连接结果 +[L, R]。

二、案例

        接下来不玩虚的,直接上case,我司有一张评论表 `comment`,此中包罗一级回复、二级回复数据,先要求计算每篇物料(动态、帖子)中状态正常的一级回复量、二级回复量,并将结果插入Redis中,表布局如下:
  1. CREATE TABLE `comment` (
  2.   `id` int(20) NOT NULL AUTO_INCREMENT,
  3.   `entry_id` int NOT NULL COMMENT '被评论的实体id' ,一级评论对应的是物料ID,非一级评论对应的是其所评论的一级评论ID,
  4.   `content` text NOT NULL,
  5.   `create_time` timestamp NOT NULL ,
  6.   `entry_type` int(11) NOT NULL COMMENT '被评论的实体类型',2 评论 8 帖子 73 博客 74 动态,
  7.   `status` int(11) NOT NULL ,
  8.   PRIMARY KEY (`id`),
  9. );
复制代码
我的初版代码如下:
  1. CREATE TEMPORARY TABLE redis_statistic
  2. (
  3.   entity_id           string
  4.   ,first_comment_cnt  string
  5.   ,second_comment_cnt string
  6.   ,PRIMARY KEY (entity_id) NOT ENFORCED
  7. )
  8. WITH (
  9.   'connector' = 'redis'
  10.   ,'host' = ''
  11.   ,'port' = ''
  12.   ,'password' = ''
  13.   ,'mode' = 'HASHMAP'  
  14.   ,'flattenHash' = 'true'
  15.   ,'key-prefix' = 'comment_stat'
  16.   ,'key-prefix-delimiter' = ':'
  17.   ,'expiration' = '5184000000'
  18. )
  19. ;
  20. insert into redis_comment_statistic
  21. select
  22.    cast(t1.entry_type * 10000000000 + t1.entry_id as string) as entity_id
  23.   ,cast(count(distinct t1.id) as string) as first_comment_cnt
  24.   ,cast(count(distinct t2.id) as string) as second_comment_cnt
  25. from (
  26.   select
  27.      entry_type
  28.     ,entry_id
  29.     ,id
  30.   from `comment`
  31.   where `status` in (0,3)
  32.     and entry_type <> 2
  33. ) t1
  34.   left join (
  35.   select
  36.     entry_type
  37.     ,entry_id
  38.     ,id
  39.   from `comment`
  40.   where `status` in (0,3)
  41.     and entry_type = 2
  42. ) t2
  43.     on t1.id = t2.entry_id
  44. group by
  45.   cast(t1.entry_type * 10000000000 + t1.entry_id as string)
  46. having cast(t1.entry_type * 10000000000 + t1.entry_id as string) SIMILAR to '^74.*|^73.*|^80.*'
  47. ;
复制代码
        由代码可知,我的数据通过左连接、聚合、筛选之后将结果以多值模式写入到 Redis 中的 HashMap中,然后通过下面命令查询:
  1. HMGET comment_stat:74000xxxxx first_comment_cnt second_comment_cnt
复制代码
        任务提交并且完成初始化之后,我试着在某物料下新增一条二级评论,但是Redis中数据并未发生更新,但是新增一条一级评论后Redis中的结果却发生了更新,明显同样的代码在MySQL中就没这种问题,问题到底出在哪里?
        因为生产上数据太多,并且对阿里云的flink还不认识,所以在线上探查问题照旧不太方便,索性之前搭建过 Flink HA 集群,具体可看 Flink实战 - 搭建HA高可用集群,于是在此中fake数据查找问题,我先在MySQL中创建一张`dept`表并且插入相应的数据:

两版统计代码分别如下:
V1:错:
  1. CREATE TABLE src_dept (
  2.   id INT,
  3.   entity_id int not null,
  4.   entity_type int not null,
  5.   content STRING,
  6.   PRIMARY KEY (id) NOT ENFORCED
  7. ) WITH (
  8.     'connector' = 'mysql-cdc',
  9.     'hostname' = '',
  10.     'port' = '',
  11.     'username' = '',
  12.     'password' = '',
  13.     'database-name' = '',
  14.     'table-name' = 'dept'
  15. );
  16. select count(distinct t1.id) as first_comment_cnt
  17.       ,count(distinct t2.id) as second_comment_cnt
  18. from
  19.     (select *
  20.     from src_dept  
  21.     where entity_type <> 2
  22.     )  t1
  23. left join
  24.     (select *
  25.     from src_dept  
  26.     where entity_type = 2
  27.     ) t2
  28. on t1.id = t2.entity_id
  29. ;
复制代码
V2:对:
  1. CREATE TABLE src_dept (
  2.   id INT,
  3.   entity_id int not null,
  4.   entity_type int not null,
  5.   content STRING,
  6.   PRIMARY KEY (id) NOT ENFORCED
  7. ) WITH (
  8.     'connector' = 'mysql-cdc',
  9.     'hostname' = '',
  10.     'port' = '',
  11.     'username' = '',
  12.     'password' = '',
  13.     'database-name' = '',
  14.     'table-name' = 'dept'
  15. );
  16. select  count(distinct t1.id) as first_comment_cnt
  17.        ,count(distinct t2.id) as second_comment_cnt
  18. from  src_dept t1
  19. left join  src_dept t2
  20. on t1.id = t2.entity_id
  21. and t2.entity_type = 2
  22. where t1.entity_type <> 2
  23. ;
复制代码
V1、V2 初始化结果一样:

先在V1逻辑下向表中插入插入二级评论:
  1. insert into dept values(7,3,2,'nihao');
复制代码
结果没有变革

但是当我插入一级评论后,数据发生了更新
  1. insert into dept values(8,224,74,'CIA');
复制代码

接下来测试V2逻辑,分别向目的中插入1条一级二级评论,目的表中皆触发更新操作,结果如下:
  1. insert into dept values(7,3,2,'nihao'),(8,224,74,'CIA');
复制代码

三、总结

    尽管采用的代码基础相似,离线计算得出的数据结果并无差别,但在流式JOIN处理中却出现出显著不同的表现。在我看来,关键在于V1版本实验LEFT JOIN操作时,尽管主表与从表源自雷同的原始数据,它们各安闲JOIN前经过了独立的筛选过程,且依据的筛选条件大相径庭。这一过程等同于构建了两张布局与内容完全独立的临时表。值得注意的是,此场景中左表饰演驱动角色,意指左表数据的任何变动都将引发结果集的更新。然而,当仅向从表增加新的二级评论而主表未见新增时,由于驱动表(主表)未发生变革,最终的查询结果就不会反映出这次更新。
        转至V2版本,其差别性体如今处理逻辑次序调整为先JOIN后FILTER,并且在此配置下,主表包罗了全部数据记载。因此,不论是新增一级照旧二级评论,由于初始就完成了与全量主表的JOIN操作,任何评论层级的新增都会立即在通过Redis存储的结果集中体现出来,触发相应的更新。简而言之,V2设计下的SQL实验流程及全量主表的利用共同确保了对各类新增评论的即时响应与数据同步。












免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

海哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表