Flink 实时数仓(四)【DWD 层搭建(二)流量域事实表】 ...

飞不高  论坛元老 | 2024-8-6 18:02:32 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1005|帖子 1005|积分 3015

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
前言

        昨天刚搬到新校区,新校区小的可怜,幸亏之后出去实习交通可以方便点;待在学院太受限了,早点离开!
        本日开始完成 DWD 层剩余的需求,上一节我们把日志数据根据不同类型分流写入到了不同的主题;
1、流量域独立访客变乱事实表

        独立访客指的实在就是我们 web 端日志分析指标中常说的 UV,上一节我们已经把页面日志写入到 dwd_page_traffic_log 主题当中了,以是这里我们直接对这个主题进行消耗处理;
1.1、实现思绪

        既然是独立访客,就必须对日志中的数据做去重(独立访客数一般用来做日活指标,因为我们的机器一般都是 24 小时全年无休的,以是我们实时数仓也可以做这种日级别的指标需求,通过状态来存储历史就可以实现),而怎么判断访客是否重复?这就又用到了 Flink 中的状态编程(状态就是历史);和上一节我们判断新老访客一样,我们这里也可以给每个 mid 维护一个名为 lastVisitDate 的 ValueState(对 mid 进行 keyby),存储上一次访问的日期(留意是日期,只准确到天),每来一条数据就判断它的 lastVisitDate:


  • 如果 lastVisitDate 为 null 或者 不是本日,则保留数据,否则抛弃
一旦进入第二天,lastVisitDate 状态就应该被清空(设置状态 TTL 为 1 天)
此外,对于 0 点的数据我们这里需要明白统计规则:


  • 独立访客数据对应的页面必然是会话起始页面,last_page_id 必为 null;以是对于跨天的访问不能盘算在内(昨天到本日访问了多个页面,而本日页面的 last_page_id 必然不为 null),我们需要在消耗数据后的第一步就需要进行过滤;
1.2、代码实现 

  1. public class DwdTrafficUniqueVisitorDetail {
  2.     public static void main(String[] args) throws Exception {
  3.         // TODO 1. 获取执行环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         env.setParallelism(1); // 生产环境中设置为kafka主题的分区数
  6.         // 1.1 开启checkpoint
  7.         env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
  8.         env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
  9.         env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
  10.         env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
  11.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次
  12.         // 1.2 设置状态后端
  13.         env.setStateBackend(new HashMapStateBackend());
  14.         // TODO 2. 消费 kafka dwd_traffic_page_log 主题
  15.         String topic = "dwd_traffic_page_log";
  16.         String groupId = "uvDetail";
  17.         DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
  18.         // TODO 3. 过滤 last_page_id != Null 的数据
  19.         // 使用 flatMap 而没用 filter,因为 flatMap 可以把过滤和转json 两步都一起完成
  20.         SingleOutputStreamOperator<JSONObject> jsonDS = pageDS.flatMap(new FlatMapFunction<String, JSONObject>() {
  21.             @Override
  22.             public void flatMap(String value, Collector<JSONObject> out) throws Exception {
  23.                 try {
  24.                     JSONObject jsonObject = JSONObject.parseObject(value);
  25.                     // 获取 last_page_id
  26.                     String last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");
  27.                     if (last_page_id == null) {
  28.                         out.collect(jsonObject);
  29.                     }
  30.                 } catch (Exception e) {
  31.                     e.printStackTrace();
  32.                 }
  33.             }
  34.         });
  35.         // TODO 4. 按照 mid 分组
  36.         KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getJSONObject("common").getString("mid"));
  37.         // TODO 5. 使用状态编程实现按照 mid 的日期进行去重
  38.         // 使用富函数,因为富函数提供更多的信息如上下文等
  39.         SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {
  40.             private ValueState<String> lastVisitDate = null;
  41.             @Override
  42.             public void open(Configuration parameters) throws Exception {
  43.                 ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("lastVisit", String.class);
  44.                 lastVisitDate = getRuntimeContext().getState(stateDescriptor);
  45.             }
  46.             @Override
  47.             public boolean filter(JSONObject value) throws Exception {
  48.                 // 获取状态数据 & 当前数据中的时间并转为日期
  49.                 String lastDate = lastVisitDate.value();
  50.                 Long ts = value.getLong("ts");
  51.                 String curDate = DateFormatUtil.toDate(ts);
  52.                 if (lastDate == null || !lastDate.equals(curDate)) {
  53.                     // 更新状态
  54.                     lastVisitDate.update(curDate);
  55.                     return true;
  56.                 }
  57.                 return false;
  58.             }
  59.         });
  60.         // TODO 6. 数据写入 kafka
  61.         String targetTopic = "dwd_traffic_unique_visitor_detail";
  62.         uvDS.map(data -> data.toJSONString()).addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));
  63.         // TODO 7. 执行任务
  64.         env.execute("DwdTrafficUniqueVisitorDetail");
  65.     }
  66. }
复制代码
1.3、TTL 优化

        上面我们的代码逻辑看起来已经没什么问题了,但是我们可以设想:假设一个用户,2024-01-01 初次登录之后,它的 lastVisitDate 状态会不停存储 2024-01-01,如果他下一次登录是在 2024-12-31,那么期间的 364 天我们依然要不停存储它的状态;而我们判断用户是否已经登录的逻辑是:lastVisitDate 是否为null 或者 lastVisitDate<本日,以是我们完全可以在一天之后把该用户的 lastVisitDate 状态清空,来淘汰状态的生存开销!
TTL 是给状态描述器设置的,而状态描述器是构造状态对象的必须参数!
TTL 是状态的一个属性,当我们修改状态值的时候,TTL 本身并不会更新!这里,我们需要在状态描述器中设置 TTL 的更新计谋为创建或更新状态值的时候就更新 TTL ,重新开始逾期倒计时;
我们只需要修改上面第 5 步,在初始化状态时,在状态描述器中给状态添加 TTL 属性:
  1.             @Override
  2.             public void open(Configuration parameters) throws Exception {
  3.                 ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("lastVisit", String.class);
  4.                 // 给状态添加 TTL
  5.                 stateDescriptor.enableTimeToLive(new StateTtlConfig.Builder(Time.days(1))
  6.                         // 设置 TTL 可更新,并且在创建或更新状态的时候更新
  7.                         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  8.                         .build()
  9.                 );
  10.                 lastVisitDate = getRuntimeContext().getState(stateDescriptor);
  11.             }
复制代码
2、流量域用户跳失变乱事实表

        跳出的概念:跳出指的是用户在一次会话中只访问了一个页面的环境(留意:粒度是会话),我们在之前做离线数仓的时候做过跳出率的指标,对于离线数仓,我们可以在 DWS 层构建一张流量域近1日会话粒度页面浏览表(dws_traffic_session_page_view_1d),通过下面的 SQL 就可以统计出该指标:
  1. SELECT
  2. CAST(SUM(IF(page_count=1,1,0))/COUNT(*)) AS DECIMAL(16,2) AS bounce_rate
  3. FROM dws_traffic_session_page_view_1d
复制代码
        在这里的实时数仓中,我们不大概等到一天结束最后才去盘算跳出率;但是我们这里又没有 session_id,以是我们只能换一种思绪:
思绪1(会话窗口)


  • 利用会话窗口,为每个 mid 开启一个会话窗口并指定间隔为 10 s;一旦到了 10s 触发窗口关闭,盘算窗口内的数据条数,> 1 条则说明这次会话没有发生跳出;
        这种思绪的问题很显着:① 如果我短时间(10s内)发生多个跳出,但是恰好这些跳出都在一个会话,这会导致窗口结束时误以为这不是跳出,毕竟窗口内有多条数据;② 大概我的一次正常的会话,被会话窗口切分到两个不同的会话窗口,效果把一个非跳出访问盘算为 2 个跳出访问;
思绪2(状态编程)
        在离线数仓中,当我们没有 session_id 时,我们可以一天的数据按照 mid 进行分组,然后根据时间戳字段进行排序,这样来盘算一个 session;但是这里是实时数仓,我们不知道什么时候一个 session 会结束,以是我们可以设置一个定时器,定时器时间范围内的数据如果没数据来就视作一个会话结束,触发盘算;并结合状态编程,把新会话的首页存入状态


  • 遇到 last_page 为 null 的数据就试着取出状态

    • 如果状态为 null,则该页面是新的会话起始页,开启定时器将数据自身写入状态
    • 如果状态不为 null,说明刚跳出一次,并且在定时器时间范围内又进来一次;这种环境需要将第一条数据(跳出的数据,也就是写入状态中的数据)输出,然后将自身写入状态,定时器依然存在,等时间到了触发盘算

  • 如果 last_page 不为 null,则状态中的数据和该条数据都抛弃
这种思绪同样存在问题,当数据是乱序的时候一切都就乱套了;
思绪3(Flink CEP)
Flink CEP 实在就是利用 状态编程 + within 开窗 来处理这种复杂变乱

Flink CEP 界说的规则之间的连续计谋


  • 严格连续: 盼望所有匹配的变乱严格的一个接一个出现,中间没有任何不匹配的变乱。对应方法为 next();
  • 疏松连续: 忽略匹配的变乱之间的不匹配的变乱。对应方法为followedBy();
  • 不确定的疏松连续: 更进一步的疏松连续,允许忽略掉一些匹配变乱的附加匹配。对应方法为followedByAny()。
界说模式之前的代码
        这里需要留意:因为我们反面要保证数据有序,以是我们最好指定变乱时间的提取字段,并添加水位线设置公道的超时时间(理论上可以保证数据绝对有序):
  1. public class DwdTrafficUserJumpDetail {
  2.     public static void main(String[] args) {
  3.         // TODO 1. 获取执行环境
  4.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.         env.setParallelism(1); // 生产环境中设置为kafka主题的分区数
  6.         // 1.1 开启checkpoint
  7.         env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
  8.         env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
  9.         env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
  10.         env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
  11.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次
  12.         // 1.2 设置状态后端
  13.         env.setStateBackend(new HashMapStateBackend());
  14.         // TODO 2. 消费 kafka dwd_traffic_page_log 主题
  15.         String topic = "dwd_traffic_page_log";
  16.         String groupId = "user_jump_detail";
  17.         DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
  18.         // TODO 3. 将数据转为 JSON
  19.         SingleOutputStreamOperator<JSONObject> jsonDS = pageDS.map(JSON::parseObject);
  20.         // TODO 4. 提取事件时间 & 按照 mid 分组
  21.         KeyedStream<JSONObject, String> keyedStream = jsonDS
  22.                 .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  23.                     .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
  24.                         @Override
  25.                         public long extractTimestamp(JSONObject element, long recordTimestamp) {
  26.                             return element.getLong("ts");
  27.                         }
  28.                     }))
  29.                 .keyBy(json -> json.getJSONObject("common").getString("mid"));
复制代码
接下来是核心的界说 CEP 模式的代码:
  1.         // TODO 5. 定义 CEP 模式序列
  2.         // 泛型方法类型指的是流的类型(下面的 start 和 next 作为提取事件的 key)
  3.         Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
  4.             @Override
  5.             public boolean filter(JSONObject value) throws Exception {
  6.                 return value.getJSONObject("page").getString("last_page_id") == null;
  7.             }
  8.         }).next("next").where(new SimpleCondition<JSONObject>() {
  9.             @Override
  10.             public boolean filter(JSONObject value) throws Exception {
  11.                 return value.getJSONObject("page").getString("last_page_id") == null;
  12.             }
  13.         }).within(Time.seconds(10L));
  14.         // 等价于 循环模式 共用一个 key: start
  15.         Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
  16.             @Override
  17.             public boolean filter(JSONObject value) throws Exception {
  18.                 return value.getJSONObject("page").getString("last_page_id") == null;
  19.             }
  20.         })
  21.                 .times(2) // 默认是宽松近邻 followedBy
  22.                 .consecutive() // 严格近邻 next
  23.                 .within(Time.seconds(10L));
  24.         // TODO 6. 建模式序列作用到流上
  25.         PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);
  26.         // TODO 7. 提取事件(匹配上的时间 和 超时时间)
  27.         OutputTag<String> timeoutTag = new OutputTag<>("timeout");
  28.         SingleOutputStreamOperator<String> selectDS = patternStream.select(timeoutTag,
  29.                 // 超时数据
  30.                 new PatternTimeoutFunction<JSONObject, String>() {
  31.                     // 对于超时数据来说,当前的数据第一个规则匹配上了,第二个没有匹配上导致超时,那么我们要提取的就是当前数据(第一个数据,第二个数据没来)
  32.                     // 这里的 Map 的 v 是 List 数据类型,因为考虑到我们可能使用的是循环模式(只有一个key)
  33.                     @Override
  34.                     public String timeout(Map<String, List<JSONObject>> map, long l) throws Exception {
  35.                         return map.get("start").get(0).toJSONString();
  36.                     }
  37.                 },
  38.                     // 匹配上的数据
  39.                     new PatternSelectFunction<JSONObject, String>() {
  40.                     // 匹配上的数据,我们只要第一个数据,因为只能证明第一个数据是跳出数据
  41.                     @Override
  42.                     public String select(Map<String, List<JSONObject>> map) throws Exception {
  43.                         return map.get("start").get(0).toJSONString();
  44.                     }
  45.                 });
  46.         DataStream<String> timeoutDS = selectDS.getSideOutput(timeoutTag);
  47.         // TODO 8. 合并两种事件
  48.         DataStream<String> unionDS = selectDS.union(timeoutDS);
  49.         // TODO 9. 合并后的数据写入 kafka
  50.         String targetTopic = "dwd_traffic_user_jump_detail";
  51.         unionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));
  52.         // TODO 10. 启动任务
  53.         env.execute("DwdTrafficUserJumpDetail");
  54.     }
  55. }
复制代码
上面我们界说了两种匹配规则:

  • 第一条数据的 last_page_id 为 null ,且超时没有收到第二条数据,认定该条数据为跳出数据
  • 第二条数据的 last_page_id 为 null ,则认定第一条数据是跳出数据
        超时时间内规则一被满意,未等到第二条数据则会被判断为超时数据。以是我们只要把超时数据和 满意连续两条数据的 last_page_id 均为 null 中的第一条数据 union 起来,得到的即为答案所需数据;
总结

        至此,流量域的三个需求都已经完成;

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

飞不高

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表