ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【FLink】水位线(Watermark) [打印本页]

作者: 兜兜零元    时间: 2024-7-20 08:02
标题: 【FLink】水位线(Watermark)
目录
1、关于时间语义
1.1事件时间
1.2处理时间​编辑
2、什么是水位线
2.1 顺序流和乱序流
2.2乱序数据的处理
2.3 水位线的特性
3 、水位线的生成
3.1 生成水位线的总体原则
3.2 水位线生成计谋
3.3 Flink内置水位线
3.3.1 有序流中内置水位线设置
3.4.2 断点式水位线生成器(Punctuated Generator)
3.4.3 在数据源中发送水位线
4、水位线的传递
5、迟到数据的处理


1、关于时间语义

1.1事件时间

        一般情况下,业务日志数据中都会记载数据生成的时间戳(timestamp),它就可以作为事件时间的判定基础。从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。
1.2处理时间


2、什么是水位线

在Flink中,用来权衡事件时间希望的标志,就被称作“水位线”(Watermark)。说白了就是事件时间戳。
2.1 顺序流和乱序流

有序流就是指数据按照生成的先后顺序,每条数据产生一个有先后顺序的水位线

这是一种理想的状态(数据量较小),而在实际中,我们产生的数据量往往非常庞大,而数据之间的时间隔断非常之小,以是为了提高服从,一般会每隔一段时间生成一个水位线。

在实际生产中,由于多服务之间网络传输等的因素,往往我们的数据流,并不是我们所想的顺序结果,而是数据先后错乱,这就是乱序流。

2.2乱序数据的处理

由于数据是乱序的,我们无法正确处理“迟到”的数据,为了让窗口可以或许正确的收集到迟到的数据,我们也可以让窗口等上一段时间,比如2秒。也就是说,我们可以在数据的时间戳基础上加上一些延迟来尽量包管不丢数据。

2.3 水位线的特性

3
3 、水位线的生成

3.1 生成水位线的总体原则

美满的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不外如果要包管绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、及时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,盘算结果禁绝确。当然,如果我们对准确性完全不思量、一味地寻求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
以是Flink中的水位线,实在是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权利交给了步伐员,我们可以在代码中界说水位线的生成计谋。
3.2 水位线生成计谋

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它重要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
  1. DataStream<Event> stream = env.addSource(new ClickSource());
  2. DataStream<Event> withTimestampsAndWatermarks =
  3. stream.assignTimestampsAndWatermarks(<watermark strategy>);
复制代码
WatermarkStrategy作为参数,这就是所谓的“水位线生成计谋”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
  1. public interface WatermarkStrategy<T>
  2.     extends TimestampAssignerSupplier<T>,
  3.             WatermarkGeneratorSupplier<T>{
  4.     // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
  5.     @Override
  6.     TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
  7.     // 主要负责按照既定的方式,基于时间戳生成水位线
  8.     @Override
  9.     WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
  10. }
复制代码
3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

对于有序流,重要特点就是时间戳单调增长,以是永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
  1. public class WatermarkMonoDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction());
  8.         // TODO 1.定义Watermark策略
  9.         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
  10.                 // 1.1 指定watermark生成:升序的watermark,没有等待时间
  11.                 .<WaterSensor>forMonotonousTimestamps()
  12.                 // 1.2 指定 时间戳分配器,从数据中提取
  13.                 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  14.                     @Override
  15.                     public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  16.                         // 返回的时间戳,要 毫秒
  17.                         System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
  18.                         return element.getTs() * 1000L;
  19.                     }
  20.                 });
  21.         // TODO 2. 指定 watermark策略
  22.         SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
  23.         sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
  24.                 // TODO 3.使用 事件时间语义 的窗口
  25.                 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  26.                 .process(
  27.                         new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  28.                             @Override
  29.                             public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  30.                                 long startTs = context.window().getStart();
  31.                                 long endTs = context.window().getEnd();
  32.                                 String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
  33.                                 String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
  34.                                 long count = elements.spliterator().estimateSize();
  35.                                 out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
  36.                             }
  37.                         }
  38.                 )
  39.                 .print();
  40.         env.execute();
  41.     }
  42. }
复制代码
3.3.2 乱序流中内置水位线设置
调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。
这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值
  1. public class WatermarkOutOfOrdernessDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction());
  8.         // TODO 1.定义Watermark策略
  9.         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
  10.                 // 1.1 指定watermark生成:乱序的,等待3s
  11.                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  12.                 // 1.2 指定 时间戳分配器,从数据中提取
  13.                 .withTimestampAssigner(
  14.                         (element, recordTimestamp) -> {
  15.                             // 返回的时间戳,要 毫秒
  16.                             System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
  17.                             return element.getTs() * 1000L;
  18.                         });
  19.         // TODO 2. 指定 watermark策略
  20.         SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
  21.         sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
  22.                 // TODO 3.使用 事件时间语义 的窗口
  23.                 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  24.                 .process(
  25.                         new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  26.                             @Override
  27.                             public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  28.                                 long startTs = context.window().getStart();
  29.                                 long endTs = context.window().getEnd();
  30.                                 String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
  31.                                 String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
  32.                                 long count = elements.spliterator().estimateSize();
  33.                                 out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
  34.                             }
  35.                         }
  36.                 )
  37.                 .print();
  38.         env.execute();
  39.     }
  40. }
复制代码
3.4 自界说水位线生成器
3.4.1 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判定输入的事件,而在onPeriodicEmit()里发出水位线。
  1. import com.atguigu.bean.Event;
  2. import org.apache.flink.api.common.eventtime.*;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. // 自定义水位线的产生
  5. public class CustomPeriodicWatermarkExample {
  6.     public static void main(String[] args) throws Exception {
  7.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.         env
  9.                 .addSource(new ClickSource())
  10.                 .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
  11.                 .print();
  12.         env.execute();
  13.     }
  14.     public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
  15.         @Override
  16.         public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  17.             return new SerializableTimestampAssigner<Event>() {
  18.                 @Override
  19.                 public long extractTimestamp(Event element,long recordTimestamp) {
  20.                     return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
  21.                 }
  22.             };
  23.         }
  24.         @Override
  25.         public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  26.             return new CustomBoundedOutOfOrdernessGenerator();
  27.         }
  28.     }
  29.     public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {
  30.         private Long delayTime = 5000L; // 延迟时间
  31.         private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
  32.         @Override
  33.         public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
  34.             // 每来一条数据就调用一次
  35.             maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
  36.         }
  37.         @Override
  38.         public void onPeriodicEmit(WatermarkOutput output) {
  39.             // 发射水位线,默认200ms调用一次
  40.             output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
  41.         }
  42.     }
  43. }
复制代码
如果想修改默认周期时间,可以通过下面方法修改。
  1. //修改默认周期为400ms
  2. env.getConfig().setAutoWatermarkInterval(400L);
复制代码
3.4.2 断点式水位线生成器(Punctuated Generator

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
3.4.3 在数据源中发送水位线

我们也可以在自界说的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自界说数据源中发送了水位线以后,就不能再在步伐中使用assignTimestampsAndWatermarks方法来生成水位线了。在自界说数据源中生成水位线和在步伐中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
  1. env.fromSource(
  2. kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
  3. )
复制代码
4、水位线的传递


在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常奥妙地克制了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
也就是说:水位线的传递是以最小事件时间为准则。
5、迟到数据的处理

5.1 推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,包管窗口盘算被延迟执行,为乱序的数据夺取更多的时间进入窗口。
  1. WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
复制代码
5.2 设置窗口延迟关闭
当触发了窗口盘算后,会先盘算当前的结果,但是此时并不会关闭窗口。直到wartermark 凌驾了窗口结束时间+推迟时间,此时窗口会真正关闭。
  1. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  2. .allowedLateness(Time.seconds(3))
复制代码
5.3 使用侧流接收迟到的数据
  1. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  2. .allowedLateness(Time.seconds(3))
  3. .sideOutputLateData(lateWS)
复制代码
完整示例:
  1. public class WatermarkLateDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6.                 .socketTextStream("hadoop102", 7777)
  7.                 .map(new WaterSensorMapFunction());
  8.         WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
  9.                 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  10.                 .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);
  11.         SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
  12.         OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
  13.         SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
  14.                 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  15.                 .allowedLateness(Time.seconds(2)) // 推迟2s关窗
  16.                 .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
  17.                 .process(
  18.                         new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  19.                             @Override
  20.                             public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  21.                                 long startTs = context.window().getStart();
  22.                                 long endTs = context.window().getEnd();
  23.                                 String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
  24.                                 String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
  25.                                 long count = elements.spliterator().estimateSize();
  26.                                 out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
  27.                             }
  28.                         }
  29.                 );
  30.         process.print();
  31.         // 从主流获取侧输出流,打印
  32.         process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");
  33.         env.execute();
  34.     }
  35. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4