使用bitmap实现uv指标多维度自由聚合实时盘算-flink+starrocks
数仓架构计划思路重点难点
需求重点
数据准确性
此需求对数据准确性要求较高。避免自己有较大误差的技能方案。如布隆过滤器,较大几率的hash碰撞等。但是目前可以继承flink的至少一次语义。以后力达精准一次。
查询延迟
查询平台指在聚合平台上即时选取维度和时间条件后点击查询得到数据的延迟时间。目前需求为半个月内的数据5秒延迟。此需求满足主要依赖末了天生数据的查询计划。包罗存储位置,存储格式,查询方式等。
数据延迟
需求端到端延迟10分钟。使用flink盘算可以满足端到端时效性,目前指标需求较少,未举行分层。指标盘算都是由一个flink使命完成。目前正常延迟在一分钟至两分钟。这照旧考虑到写starrocks的压力。攒批写入。
需求难点
需求指标主要分为三种范例。一种是计数,如点击数(nlogAid=clickAd)。一种是去重计数比如UV(活跃用户),一种是比例盘算,如点击率(click_rate=click_pv/show_pv)。
其中计数比较好满足查询延迟,我们可以提前把指标算好,查询时只必要简单的聚合。查询速度很快。目前需求必要的最小时间粒度是天。我们的结果数据存放在starrocks中。结果表使用starrocks的OLAP的预聚合引擎以天举行分区,维度作为主键举行预聚合。即可满足查询延迟要求。
去重计数的难度相对较大。由于此需求无法用简单的预聚合的方式加速查询速度。如果不做额外的计划。必要在查询的时候举行明细数据的实时盘算。量非常大。远不能满足查询时效的需求。不能预聚合缘故原由主要在于两点。
第一是维度不固定。查询时可以自由组合。每个维度又有多个可能数值,因此维度互相组合的数目是大量且不确定的。
第二是时间不固定。可以查询最少一天最多半年甚至更长的时间数据。而盘算去重必须在选定时间范围内去重。不能先算好每一天的,然后如果时间范围有多天,就把这多天的值直接累加。那结果是错误的。
现有数仓架构方案
满足查询延迟方案:
使用bitmap存储去重计数指标
bitmap算法,中文又叫位图算法。这里的位图指的是内存中一连的二进制位(bit)。bitmap算法适用于大量整型数据做去重和查询。starrocks很好的支持了bitmap,直接提供了bitmap数据范例。
bitmap算法详解 参考bitmap解决uv盘算头脑-CSDN博客
编号系统(发号器)的使用缘故原由
在使用bitmap时除了要注意保持数据的紧凑还必要注意一点。就是bitmap只支持数值范例数据的存储。但是我们的cuid(装备编号)是字符串范例。因此我们必要先把cuid转为数字范例。最简单的方式就是hash。但是我们实践中发现了一个问题。就是的cuid数量太大了。目前日活是千万级别。存量用户是亿级别。cuid转数字如果使用32位hash的话。cuid举行hash的时候已经会出现hash碰撞了。就是多个cuid颠末hash盘算得到了同一个值。导致数据不准。如何解决hash碰撞呢?如果使用64位hash的话。hash碰撞可以避免。但是对于bitmap来说存储量级,离散性和盘算难度都大大增加。查询时长大大增加。
据上所诉。为了解决这个问题。我们决定不使用hash对cuid举行数值转换。而是使用发号器举行发号。发号器逻辑上就是对cuid举行编号,从0开始。如果当前cuid我们的编号系统中没有存,说明是新装备。就举行发号,发的号是当前号码系统中最大值加一。如果当前cuid在编号系统中有值。直接返回编号系统中的编号即可。
这里必要考虑一个问题。我们的编号系统(发号器)与flink步伐如何交互?
是靠flink自己实现发号照旧借助外部存储?
如果使用外部系统作为编号系统。那么要考虑两点。
第一,flink是多线程的。如果使用外部系统作为编号系统。要处理并发的问题。要考虑锁的问题。
第二,flink是基于事件触发的。也就是每一条日志会触发一次操作。如果使用外部系统,那么就产生大量的请求。qps压力和时效性都必要考虑
如果不使用外部系统,flink支持有状态编程。那么我们可以使用flink的内部状态。这样又必要考虑一个问题,如何保证编号的一连和不重复?
这里的关键就是键控状态。数据颠末keyby算子后会举行逻辑分组。同一个逻辑分组内的编号是可控的。那key如何选取呢?如果使用cuid作为keyby的key,那么将会把每个cuid的数据分为一个逻辑分组,分组数是不确定的。如果分组数不确定,如何给下一位新用户赋予编号呢?又如何获取编号系统中当前最大的编号呢?
所以我们使用cuid hash对4096取模来作为key。这样就确定了下来一共会有4096个逻辑分组。那这样又有什么用呢?用处就是每个逻辑分组现在不必关心编号系统中当前最大的编号了,只必要关心本逻辑分组内当前最大编号就可以了。如果来新装备。那么给它赋予的编号就是本逻辑分组内当前最大的编号加步长(4096)就可以了。简单示范一下。
组别
一
二
三
四
五
来了一批数据
1
2
3
4
5
又来一批数据
6
8
9
10
又来一批
7
13
14
15
又一批
11
12
19
20
如上图。在步伐运行过程中一定会存在“缺数据”不连贯的环境。但是是临时的。只要数据分布均匀,最终都会补齐。只有最新的一两批会不连贯。而且这种方式可以保证数据不会重复。
冷启动的处理
到这里,对于bitmap和发号器的计划可以说完成了。但是这基于一个条件。就是我的步伐永远正常的运行下去。但是这险些是不可能得。比如现在有新需求,必要修改指标逻辑。那么当步伐修改完再次启动。此步伐是作为新使命启动的。状态是空的。但是用户照旧老的。而cuid对应的编号也是不应该改变的。所以flink状态必要加载之前的发号记载。
想要加载离线数据,起首必要保存离线数据。这个没什么问题。在处理新cuid时将cuid与编码的信息写入外部编码表中即可。
那在步伐启动时如何读取冷数据呢?
方案一:不加载,热缓存。在数据到来时,先查状态,查不到去查编码表。再查不到就分配编码。但是分配编码时也要慎重考虑。由于已经不知道当前组的最大编码了。只知道所以编码的最大值了。缺点非常显着。要多次查询外部数据。理论最大要访问六千多万次。
方案二:预加载冷数据。在步伐启动时读取所有的历史数据。都加载到缓存中。cuid来临时查询状态和缓存即可确定是否发号过。那这里照旧有一个问题。为什么要全部加载呢?由于不能确定cuid会发到哪个taskmanager上去实验。那有没有办法知道呢?有办法。flink的分区也就是并行度。是物理上的区分。flink的keyby是逻辑上的分组。而这两个分组是有联系的。不是随机分的。
flink有一个keygroup的概念。我们为了清晰分辨就称为桶。flink在使用keyby算子时把所有数据按照key分成多个逻辑组。再把所有的key(逻辑组)分别到多个桶中。再把桶分别到每个taskmanager中。
salt(桶号) = MathUtils.murmurHash(currentKey.hashCode()) % maxParallelism(最大并行度)
taskmanager = salt(桶号) * parallelism(当前并行度) / maxParallelism(最大并行度);
每次启动时。每个桶应该分配给哪个taskmanager是根据其时的并行度和最大并行度即时盘算出来的。
在flink步伐的keyby后在process方法中举行逻辑处理。process方法继承keyedprocess function类。内里有个open方法,此方法在步伐启动时,每个并行度运行一次。在这里我们能拿到当前subtask号,也就是taskmanager号和当前并行度。最大并行度我们写死了512。所以在步伐启动时我们能知道哪些桶,哪些key是属于我们当前taskmanager的。我们就可以取外部的编码表中去取当前taskmanager的所有桶数据加载到内存就可以了。
dwd_ad_cui2int表预览
https://i-blog.csdnimg.cn/direct/53ad77b3bce648b9b8082379ba70d571.png
starrocks建表
表以日期,小时,广告应用id,广告位id,代码位id,广告位范例,代码位范例,广告网络id,广告sdk版本,操作系统范例,手机品牌,app版本,流量分组id,实验分组id,模板id,媒体代码位id为维度(主键)
统计指标有:广告触达用户,活跃用户,展示用户数,代码位点击量。
flink使命思路:
https://i-blog.csdnimg.cn/direct/6d834ccef409435f9a90656348c5ba4a.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]