海哥 发表于 2024-8-22 19:58:36

FlinkSQL Regular Join之 Left Join

一、Regular Join

        通例连接(Regular Join)是SQL中原生界说的Join方式,是最通用的一类连接操作。它的具体语法与标准SQL完全雷同,通过关键字JOIN来连接两个表,后面用关键字ON来指明条件。
      Flink SQL的Regular Join也可以分为Inner Join 和 Outer Join,区别在于结果中是否包罗不符合联结条件的行。Regular Join 包罗以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):

[*]Inner Join:在流任务中,只有当两条流乐成连接时才会输出结果,格式为 +。
[*]Left Join:在流任务中,当左流数据到达时,无论是否与右流数据乐成连接,均会输出结果。如果连接乐成,输特别式为 +;如果连接失败,则输出 +。若后续右流数据到达并与之前未连接的左流数据匹配乐成,则会先发起一次回撤操作,输出 -,随后输出新的连接结果 +。
[*]Right Join:与 Left Join 相似,但左右流的角色互换,即右流数据到达时,无论是否与左流数据乐成连接,均会输出结果。
[*]Full Join:在流任务中,无论左流或右流的数据到达,都会实验输出结果。对于未乐成连接的情况,如果是右流数据,则输出 +;如果是左流数据,则输出 +。若后续另一条流的数据到达并与之前未连接的数据匹配乐成,则会先举行一次回撤操作 (对于左流数据到达,回撤 -;对于右流数据到达,回撤 -),随后输出新的连接结果 +。

二、案例

        接下来不玩虚的,直接上case,我司有一张评论表 `comment`,此中包罗一级回复、二级回复数据,先要求计算每篇物料(动态、帖子)中状态正常的一级回复量、二级回复量,并将结果插入Redis中,表布局如下:
CREATE TABLE `comment` (
`id` int(20) NOT NULL AUTO_INCREMENT,
`entry_id` int NOT NULL COMMENT '被评论的实体id' ,一级评论对应的是物料ID,非一级评论对应的是其所评论的一级评论ID,
`content` text NOT NULL,
`create_time` timestamp NOT NULL ,
`entry_type` int(11) NOT NULL COMMENT '被评论的实体类型',2 评论 8 帖子 73 博客 74 动态,
`status` int(11) NOT NULL ,
PRIMARY KEY (`id`),
); 我的初版代码如下:
CREATE TEMPORARY TABLE redis_statistic
(
entity_id         string
,first_comment_cntstring
,second_comment_cnt string
,PRIMARY KEY (entity_id) NOT ENFORCED
)
WITH (
'connector' = 'redis'
,'host' = ''
,'port' = ''
,'password' = ''
,'mode' = 'HASHMAP'
,'flattenHash' = 'true'
,'key-prefix' = 'comment_stat'
,'key-prefix-delimiter' = ':'
,'expiration' = '5184000000'
)
;


insert into redis_comment_statistic
select
   cast(t1.entry_type * 10000000000 + t1.entry_id as string) as entity_id
,cast(count(distinct t1.id) as string) as first_comment_cnt
,cast(count(distinct t2.id) as string) as second_comment_cnt
from (
select
   entry_type
    ,entry_id
    ,id
from `comment`
where `status` in (0,3)
    and entry_type <> 2
) t1
left join (
select
    entry_type
    ,entry_id
    ,id
from `comment`
where `status` in (0,3)
    and entry_type = 2
) t2
    on t1.id = t2.entry_id
group by
cast(t1.entry_type * 10000000000 + t1.entry_id as string)
having cast(t1.entry_type * 10000000000 + t1.entry_id as string) SIMILAR to '^74.*|^73.*|^80.*'
;         由代码可知,我的数据通过左连接、聚合、筛选之后将结果以多值模式写入到 Redis 中的 HashMap中,然后通过下面命令查询:
HMGET comment_stat:74000xxxxx first_comment_cnt second_comment_cnt         任务提交并且完成初始化之后,我试着在某物料下新增一条二级评论,但是Redis中数据并未发生更新,但是新增一条一级评论后Redis中的结果却发生了更新,明显同样的代码在MySQL中就没这种问题,问题到底出在哪里?
        因为生产上数据太多,并且对阿里云的flink还不认识,所以在线上探查问题照旧不太方便,索性之前搭建过 Flink HA 集群,具体可看 Flink实战 - 搭建HA高可用集群,于是在此中fake数据查找问题,我先在MySQL中创建一张`dept`表并且插入相应的数据:
https://i-blog.csdnimg.cn/direct/bfdc95d42fb14fc0a2b6236d42ba38e1.png
两版统计代码分别如下:
V1:错:
CREATE TABLE src_dept (
id INT,
entity_id int not null,
entity_type int not null,
content STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '',
    'port' = '',
    'username' = '',
    'password' = '',
    'database-name' = '',
    'table-name' = 'dept'
);


select count(distinct t1.id) as first_comment_cnt
      ,count(distinct t2.id) as second_comment_cnt
from
    (select *
    from src_dept
    where entity_type <> 2
    )t1
left join
    (select *
    from src_dept
    where entity_type = 2
    ) t2
on t1.id = t2.entity_id
; V2:对:
CREATE TABLE src_dept (
id INT,
entity_id int not null,
entity_type int not null,
content STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '',
    'port' = '',
    'username' = '',
    'password' = '',
    'database-name' = '',
    'table-name' = 'dept'
);


selectcount(distinct t1.id) as first_comment_cnt
       ,count(distinct t2.id) as second_comment_cnt
fromsrc_dept t1
left joinsrc_dept t2
on t1.id = t2.entity_id
and t2.entity_type = 2
where t1.entity_type <> 2
; V1、V2 初始化结果一样:
https://i-blog.csdnimg.cn/direct/6237ea43b8fe4b64b7f5163e78a85757.png
先在V1逻辑下向表中插入插入二级评论:
insert into dept values(7,3,2,'nihao'); 结果没有变革
https://i-blog.csdnimg.cn/direct/2e5efcf90b754a3e9f152b84d4d3d7b5.png
但是当我插入一级评论后,数据发生了更新
insert into dept values(8,224,74,'CIA'); https://i-blog.csdnimg.cn/direct/871cef3c6b6f407b8045fa71aa337eb8.png
接下来测试V2逻辑,分别向目的中插入1条一级二级评论,目的表中皆触发更新操作,结果如下:
insert into dept values(7,3,2,'nihao'),(8,224,74,'CIA'); https://i-blog.csdnimg.cn/direct/b7307ed50d014abfa5bb469569f7cdd8.png
三、总结

    尽管采用的代码基础相似,离线计算得出的数据结果并无差别,但在流式JOIN处理中却出现出显著不同的表现。在我看来,关键在于V1版本实验LEFT JOIN操作时,尽管主表与从表源自雷同的原始数据,它们各安闲JOIN前经过了独立的筛选过程,且依据的筛选条件大相径庭。这一过程等同于构建了两张布局与内容完全独立的临时表。值得注意的是,此场景中左表饰演驱动角色,意指左表数据的任何变动都将引发结果集的更新。然而,当仅向从表增加新的二级评论而主表未见新增时,由于驱动表(主表)未发生变革,最终的查询结果就不会反映出这次更新。
        转至V2版本,其差别性体如今处理逻辑次序调整为先JOIN后FILTER,并且在此配置下,主表包罗了全部数据记载。因此,不论是新增一级照旧二级评论,由于初始就完成了与全量主表的JOIN操作,任何评论层级的新增都会立即在通过Redis存储的结果集中体现出来,触发相应的更新。简而言之,V2设计下的SQL实验流程及全量主表的利用共同确保了对各类新增评论的即时响应与数据同步。












免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: FlinkSQL Regular Join之 Left Join