ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink解决延迟数据题目 [打印本页]

作者: 一给    时间: 2024-11-28 02:43
标题: Flink解决延迟数据题目
总结:
水印:对于迟到数据不长
allowedLateness: 迟到时间很长
侧道输出:对于迟到时间特别长

对于延迟数据的明白:

水印机制(水位线、watermark)机制可以资助我们在短期延迟下,允许乱序数据的到来。
这个机制很好的处理惩罚了那些由于网络等环境短期延迟的数据,让窗口等它们一会儿。
但是水印机制无法长期的等待下去,由于水印机制简朴说就是让窗口不停等在那里,等达到水印时间才会触发计算和关闭窗口
这个等待不能不停等,由于会不停缓着数据不计算。
一样平常水印也就是几秒钟最多几分钟而已(看业务)

那么,在实际天下中,延迟数据除了有短期延迟外,长期延迟也是很常见的。
好比:
l 客户端断网,等了好几个小时才规复
l 车联网体系进入隧道后没有信号无法上报数据
l 手机欠费没有网
等等,这些场景下数据的迟到就不是简朴的网络堵塞造成的几秒延迟了
而是小时、天级别的延迟

对于水印来说,这样的长期延迟数据是无法很好处理惩罚的。
那么有没有什么办法去处理惩罚这些长期延迟的数据呢?让其可以找到其所属的窗口正常完成计算,哪怕晚了几个小时。
这个场景的解决方式就是:延迟数据处理惩罚机制(allowedLateness方法)

水印:乱序数据处理惩罚(时间很短的延迟)
延迟处理惩罚:长期延迟数据的处理惩罚机制。

延迟数据的处理惩罚:

waterMark和Window机制解决了流式数据的乱序题目,对于由于延迟而顺序有误的数据,可以根据eventTime进行业务处理惩罚,对于延迟的数据Flink也有自己的解决办法,
紧张的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理惩罚延迟数据
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置
生存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])生存
获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取
1)allowedLateness(lateness: Time)

当我们对流设置窗口后得到的WindowedSteam对象就可以使用allowedLateness方法
该方法传入一个Time值,设置允许的长期延迟(迟到)的时间。
和watermark不同。
未设置allowedLateness(为0),当watermark满足条件,会触发窗口的 执行 + 关闭
当设置了allowedLateness,当watermark满足条件后,只会触发窗口的执行,不会触发窗口关闭
也就是,watermark满足条件后会正常触发窗口计算,将已有的数据完成计算。
但是,不会关闭窗口。假如在allowedLateness允许的时间内仍有这个窗口的数据进来,那么每进来一条,会和已经计算过的(被watermark触发的)数据一起在计算一次
水印:短期延迟,达到条件后触发计算而且关闭窗口(触发+关闭同时进行)
水印+allowedLateness : 短期延迟+ 等待长期延迟结果, 达到水印条件后,会触发窗口计算,但是不关闭窗口。事件时间延迟达到水印+allowedLateness之和后会关闭窗口。
2) 侧道输出-SideOutput

Flink 通过watermark在短时间内允许了乱序到来的数据
通过延迟数据处理惩罚机制,可以处理惩罚长期迟到的数据。
但总有那么些数据来的晚的太久了。允许迟到1天的设置,它迟到了2天才来。
对于这样的迟到数据,水印无能为力,设置allowedLateness也无能为力,那对于这样的数据Flink就只能任其丢掉了吗?
不会,Flink的两个迟到机制尽量确保了数据不会错过了属于他们的窗口,但是真的迟到太久了,Flink也有一个机制将这些数据网络起来
生存成为一个DataStream,然后,交由开辟人员自行处理惩罚。
那么这个机制就叫做侧输出机制(Side Output)
侧输出机制:可以将错过水印又错过allowedLateness允许的时间的数据,单独的存放到一个DataStream中,然后开辟人员可以自定逻辑对这些超级迟到数据进行处理惩罚。

处理惩罚紧张使用两个方式:
对窗口对象调用sideOutputLateData(OutputTag outputTag)方法,将数据存储到一个地方
对DataStream对象调用getSideOutput(OutputTag outputTag)方法,取出这些被单独处理惩罚的数据的DataStream
注意,取到的是一个DataStream,这意味着你可以对这些超级迟到数据继承写 如keyBy, window等处理惩罚逻辑。
sideOutputLateData方法:
  1. 使用方式:
  2. 先定义OutputTag对象(注意,必须new一个匿名内部类形式的OutputTag对象的实例)
  3. 然后调用sideOutputLateData方法
  4. // side output   OutputTag对象必须是匿名内部类的形式创建出来, 本质上得到的是OutputTag对象的一个匿名子类
  5. OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("side output"){};
  6. WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> sideOutputLateData =
  7.         allowedLateness.sideOutputLateData(outputTag);
复制代码
DataStream.getSideOutput方法:
  1. 用法:
  2. DataStream<Tuple2<String, Long>> sideOutput = result.getSideOutput(outputTag);
  3. // 对得到的保存超级迟到数据的DataStream进行处理
  4. sideOutput.print("late>>>");
复制代码
代码演示

使用Watermark + AllowedLateness + SideOutput ,即使用侧道输出机制来单独网络延迟/迟到/乱序严重的数据,避免数据丢失!
  1. package com.bigdata.day05;
  2. import com.bigdata.day04.OrderInfo;
  3. import org.apache.commons.lang3.time.DateFormatUtils;
  4. import org.apache.flink.api.common.RuntimeExecutionMode;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  10. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  11. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  12. import org.apache.flink.streaming.api.windowing.time.Time;
  13. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  14. import org.apache.flink.util.Collector;
  15. import org.apache.flink.util.OutputTag;
  16. import java.text.SimpleDateFormat;
  17. import java.time.Duration;
  18. import java.util.Date;
  19. import java.util.Random;
  20. import java.util.UUID;
  21. class MyOrderSource2 implements SourceFunction<OrderInfo> {
  22.     @Override
  23.     public void run(SourceContext<OrderInfo> ctx) throws Exception {
  24.         Random random = new Random();
  25.         while(true){
  26.             OrderInfo orderInfo = new OrderInfo();
  27.             orderInfo.setOrderId(UUID.randomUUID().toString().replace("-",""));
  28.             // 在这个地方可以模拟迟到数据
  29.             long orderTime = System.currentTimeMillis() - 1000*random.nextInt(100);
  30.             orderInfo.setOrdertime(orderTime);
  31.             int money = random.nextInt(10);
  32.             System.out.println("订单产生的时间:"+ DateFormatUtils.format(orderTime,"yyyy-MM-dd HH:mm:ss")+",金额:"+money);
  33.             orderInfo.setMoney(money);
  34.             orderInfo.setUserId(random.nextInt(2));
  35.             ctx.collect(orderInfo);
  36.             Thread.sleep(500);
  37.         }
  38.     }
  39.     @Override
  40.     public void cancel() {
  41.     }
  42. }
  43. public class Demo01 {
  44.     public static void main(String[] args) throws Exception {
  45.         //1. env-准备环境
  46.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  47.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  48.         env.setParallelism(1);
  49.         // 每隔五秒统计每个用户的前面5秒的订单的总金额
  50.         //2. source-加载数据
  51.         DataStreamSource<OrderInfo> streamSource = env.addSource(new MyOrderSource2());
  52.         //-2.告诉Flink最大允许的延迟时间/乱序时间为多少
  53.         SingleOutputStreamOperator<OrderInfo> orderDSWithWatermark = streamSource.assignTimestampsAndWatermarks(
  54.                 WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  55.                         //-3.告诉Flink哪一列是事件时间
  56.                         .withTimestampAssigner((order, time) -> order.getOrdertime())
  57.         );
  58.         OutputTag<OrderInfo> outputTag = new OutputTag<OrderInfo>("side output"){};
  59.         //3. transformation-数据处理转换
  60.         SingleOutputStreamOperator<String> result = orderDSWithWatermark.keyBy(orderInfo -> orderInfo.getUserId()).
  61.                 window(TumblingEventTimeWindows.of(Time.seconds(5)))
  62.                 .allowedLateness(Time.seconds(4))
  63.                 .sideOutputLateData(outputTag)
  64.                 .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
  65.                     @Override
  66.                     public void apply(Integer key,  // 代表分组key值     五旬老太守国门
  67.                                       TimeWindow window, // 代表窗口对象
  68.                                       Iterable<OrderInfo> input, // 分组过之后的数据 [1,1,1,1,1]
  69.                                       Collector<String> out  // 用于输出的对象
  70.                     ) throws Exception {
  71.                         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  72.                         long start = window.getStart();
  73.                         long end = window.getEnd();
  74.                         int sum = 0;
  75.                         // 专门存放迟到的订单时间
  76.                         for (OrderInfo orderInfo : input) {
  77.                             sum += orderInfo.getMoney();
  78.                         }
  79.                         out.collect(key + ",窗口开始:" + dateFormat.format(new Date(start)) + ",结束时间:" + dateFormat.format(new Date(end)) + "," + sum);
  80.                         //out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);
  81.                     }
  82.                 });
  83.         result.print("流中的数据,包含迟到的数据:");
  84.         result.getSideOutput(outputTag).print("严重迟到的数据:");
  85.         //4. sink-数据输出
  86.         //5. execute-执行
  87.         env.execute();
  88.     }
  89. }
复制代码
虽然我们添加了延迟的结果,就是说侧道输出,侧道输出不能触发窗口的执行,窗口的执行只能通过水印时间触发 ,允许迟到的数据,不放入到当前窗口中,而是作为一个触发条件看到,它需要放入到它对应的窗口中。
只考虑 1 个并行度的题目
订单发生的真实事件:窗口5秒,隔断5秒,允许迟到 3秒 最晚允许迟到4秒
10:44:00 第一个区间就应该是10:44:00 10:44:05
10:44:01
10:44:02
10:44:03
10:44:04
10:44:05
10:44:07 第一个区间就应该是10:44:05 10:44:10
10:44:22 第一个区间就应该是10:44:20 10:44:25
10:44:30
10:44:28
10:44:20
通过上面这个图可以知道,44:07没有办法触发00~05的执行,但是07不放入00~05区间,而是放入10:44:05 10:44:10
44:22 一个数据触发了两个区间的执行 00~05 05~10
假如有一个订单时44:10产生的,放入哪个区间?应该放入10~15这个区间
 


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4