以flink实时流的方式实现OneId

打印 上一主题 下一主题

主题 532|帖子 532|积分 1596

以flink实时流的方式实现OneId

前言

oneid相关概念,及其当前离线实现方式介绍请见以下链接及其系列其他文章:
用户标签(一):图计算实现ID_Mapping、Oneid打通数据孤岛
读完上述链接的相关文章,将理解oneid的需求及其实现方式.
背景

之前公司是做电商数据分析,可以接入多个数据源的数据(美团,饿了么,京东等),在我们系统中,我们将为每个用户统一打上在我们系统内部的user_id,即oneid.当时已经有了基于spark图计算实现的id-mapping来达成oneid.
到这里,我们已有的系统跟前言中链接文章提到的内容相似.
后来,我们有了新需求:实时化改造.实时为每个用户统计相关数据.
但是,实时计算的数据最后要归纳到用户上,那么我们的id-mapping也需要实时出现结果.
思考

本人不具备机器学习领域的知识,在看了几遍前言中的文章后,勉强搞懂了该图计算的原理,本质上是求最大连通图,研究后发现需要拿到全量数据进行迭代,不能改造为实时计算.
后来,基于"最大连通图"的算法,变种出了一种方法:

  • 输入两个数据关联关系,例如 美团id1-饿了么id1,即输入两个点一个线.
  • 拿到该关系,分别拿两个数据去与已有的id-mapping结果表对比.如,我们结果id-mapping表中,有关系
    1. 美团id1 - oneid1
    2. 饿了么id1 - oneid2
    复制代码
  • 通过第一步中传入的关系,可以得出 美团id1 与 饿了么id1 在我们系统中应该识别为同一个人,对应同一个oneid,可以得到
    1. 美团id1 - oneid1
    2. 饿了么id1 - oneid1
    复制代码
    或者,都对应为另一个oneid
    1. 美团id1 - oneid2
    2. 饿了么id1 - oneid2
    复制代码
    即,我们通过传入关联关系,将 美团id1 与 饿了么id1 在我们系统中重新更新为关联到同一个oneid.
  • 后续,某个用户id是 美团id1,那么它关联这个id-mapping结果表,可以得到它在我们系统中的id是oneid1(或者oneid2,此处根据第三步如何取值)
  • 根据不同的对比结果,进行相应的替换或者新增,我们变相实现了"最大连通图"的算法,并且这个算法可以用flink实时计算实现
详细步骤

0. id-mapping结果表设计

结果表可以有多个描述字段,但是核心应该是以下两个字段:
  1. 原id , 计算出的oneid
复制代码
1. 输入数据采集

我们在采集数据的时候,需要将数据解析成两两的关系对.如原始数据:
  1. 手机号1,美团id1,设备id1
复制代码
需要将这条消息拆分为:
  1. 手机号1 - 美团id1
  2. 手机号1 - 设备id1
  3. 美团id1 - 设备id1
复制代码
再将这三组关联关系传给后续对比计算.
2. 对比计算

假设我们得到关系对:
  1. x - y
复制代码
我们拿到此关系对到结果表中进行对比将有以下几种情况:

  • x,y都没有对应oneid: 直接对结果表插入计算得出的新oneid(可以使用uuid)
    1. x - 新oneid
    2. y - 新oneid
    复制代码
  • x已有对应oneid为 XXoneid,y没有:将y的oneid赋值为 XXoneid,并插入,得到
    1. x - XXoneid
    2. y - XXoneid
    复制代码
  • x没有,y有oneid为 YYoneid :同第二种情况,得到
    1. x - YYoneid
    2. y - YYoneid
    复制代码
  • x,y都有oneid,且一致,都为 ZZoneid: 不更新
  • x,y都有oneid,且不一致,分别为 XXoneid,YYoneid :将 x,y更新为同一个oneid(XXoneid或者YYoneid),或者重新生成一个.此处看个人选择.
    1. 并且!!!!!!
    2. 将结果表中所有oneid为 XXoneid,YYoneid的相关数据,oneid都重设为新选择的oneid
    3. 这是为了将相关联的其他数据一起指向新的oneid
    复制代码
至此,通过以上几种情况.我们复现了id-mapping中求最大连通图的算法.
实现程序设计

1. 数据源

kafka
2. 实时计算程序

flink
3. 对比中如何取数

redis:将结果表以k-v的形式放在内存中,这样flink可以快速取值并对比计算
4. 结果表存放

hbase:此处可以换为mysql,doris等支持更新的存储即可.并且还有以下原因:
  1. 对比计算中,第五种情况,需要从这里取所有oneid为 XXoneid,YYoneid的相关数据
  2. 而redis中没法根据value来取得key,所以第五种情况,需要查询此处存储得到相关数据
复制代码
5. 结果更新

结果不但要更新hbase,还要更新redis中存放的k-v对!!!建议先更新redis,因为比较快.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表