以flink实时流的方式实现OneId
前言
oneid相关概念,及其当前离线实现方式介绍请见以下链接及其系列其他文章:
用户标签(一):图计算实现ID_Mapping、Oneid打通数据孤岛
读完上述链接的相关文章,将理解oneid的需求及其实现方式.
背景
之前公司是做电商数据分析,可以接入多个数据源的数据(美团,饿了么,京东等),在我们系统中,我们将为每个用户统一打上在我们系统内部的user_id,即oneid.当时已经有了基于spark图计算实现的id-mapping来达成oneid.
到这里,我们已有的系统跟前言中链接文章提到的内容相似.
后来,我们有了新需求:实时化改造.实时为每个用户统计相关数据.
但是,实时计算的数据最后要归纳到用户上,那么我们的id-mapping也需要实时出现结果.
思考
本人不具备机器学习领域的知识,在看了几遍前言中的文章后,勉强搞懂了该图计算的原理,本质上是求最大连通图,研究后发现需要拿到全量数据进行迭代,不能改造为实时计算.
后来,基于"最大连通图"的算法,变种出了一种方法:
- 输入两个数据关联关系,例如 美团id1-饿了么id1,即输入两个点一个线.
- 拿到该关系,分别拿两个数据去与已有的id-mapping结果表对比.如,我们结果id-mapping表中,有关系
- 美团id1 - oneid1
- 饿了么id1 - oneid2
复制代码 - 通过第一步中传入的关系,可以得出 美团id1 与 饿了么id1 在我们系统中应该识别为同一个人,对应同一个oneid,可以得到
- 美团id1 - oneid1
- 饿了么id1 - oneid1
复制代码 或者,都对应为另一个oneid- 美团id1 - oneid2
- 饿了么id1 - oneid2
复制代码 即,我们通过传入关联关系,将 美团id1 与 饿了么id1 在我们系统中重新更新为关联到同一个oneid.
- 后续,某个用户id是 美团id1,那么它关联这个id-mapping结果表,可以得到它在我们系统中的id是oneid1(或者oneid2,此处根据第三步如何取值)
- 根据不同的对比结果,进行相应的替换或者新增,我们变相实现了"最大连通图"的算法,并且这个算法可以用flink实时计算实现
详细步骤
0. id-mapping结果表设计
结果表可以有多个描述字段,但是核心应该是以下两个字段:1. 输入数据采集
我们在采集数据的时候,需要将数据解析成两两的关系对.如原始数据:需要将这条消息拆分为:- 手机号1 - 美团id1
- 手机号1 - 设备id1
- 美团id1 - 设备id1
复制代码 再将这三组关联关系传给后续对比计算.
2. 对比计算
假设我们得到关系对:我们拿到此关系对到结果表中进行对比将有以下几种情况:
- x,y都没有对应oneid: 直接对结果表插入计算得出的新oneid(可以使用uuid)
- x已有对应oneid为 XXoneid,y没有:将y的oneid赋值为 XXoneid,并插入,得到
- x没有,y有oneid为 YYoneid :同第二种情况,得到
- x,y都有oneid,且一致,都为 ZZoneid: 不更新
- x,y都有oneid,且不一致,分别为 XXoneid,YYoneid :将 x,y更新为同一个oneid(XXoneid或者YYoneid),或者重新生成一个.此处看个人选择.
- 并且!!!!!!
- 将结果表中所有oneid为 XXoneid,YYoneid的相关数据,oneid都重设为新选择的oneid
- 这是为了将相关联的其他数据一起指向新的oneid
复制代码 至此,通过以上几种情况.我们复现了id-mapping中求最大连通图的算法.
实现程序设计
1. 数据源
kafka
2. 实时计算程序
flink
3. 对比中如何取数
redis:将结果表以k-v的形式放在内存中,这样flink可以快速取值并对比计算
4. 结果表存放
hbase:此处可以换为mysql,doris等支持更新的存储即可.并且还有以下原因:- 对比计算中,第五种情况,需要从这里取所有oneid为 XXoneid,YYoneid的相关数据
- 而redis中没法根据value来取得key,所以第五种情况,需要查询此处存储得到相关数据
复制代码 5. 结果更新
结果不但要更新hbase,还要更新redis中存放的k-v对!!!建议先更新redis,因为比较快.
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |