【Flink 核心篇】详解 Flink 中的 WaterMark

打印 上一主题 下一主题

主题 838|帖子 838|积分 2514

1.底子概念

1.1 流处理

流处理,最本质的是在处理数据的时候,接受一条处理一条数据。
批处理,则是累积数据到一定程度在处理。这是他们本质的区别。
在设计上 Flink 认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。


  • 有界数据对应批处理,API 对应 DateSet。
  • 无界数据对应流处理,API 对应 DataStream。
1.2 乱序

什么是乱序呢?
可以明白为数据到达的次序和实在际产生时间的排序不一致。导致这的原因有很多,比如延伸,消息积存,重试等等。
我们知道,流处理从事件产生,到流经 source,再到 operator,中心是有一个过程和时间的。固然大部分情况下,流到 operator 的数据都是按照事件产生的时间次序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order 或者说 late element)。
   ✅ 某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有 5 秒的延时,也就是在实际时间的第 1 秒产生的数据有可能在第 5 秒中产生的数据之后到来(比如到 Window 处理节点)。例如,有 1 ~ 10 个事件,乱序到达的序列是:2, 3, 4, 5, 1, 6, 3, 8, 9, 10, 7。
  1.3 窗口及其生命周期

对于 Flink,假如来一条消息盘算一条,这样是可以的,但是这样盘算是非常频繁而且消耗资源,假如想做一些统计这是不可能的。以是对于 Spark 和 Flink 都产生了窗口盘算。
比如,因为我们想看到过去一分钟或过去半小时的访问数据,这时候我们就需要窗口。


  • Window:Window 是处理无界流的关键,Window 将流拆分为一个个有限大小的 buckets,可以在每一个 buckets 中举行盘算。
  • 当 Window 是时间窗口的时候,每个 Window 都会有一个开始时间(start_time)和结束时间(end_time)(左闭右开),这个时间是体系时间。
简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)凌驾其结束时间戳加上用户指定的允许延伸时,窗口将被完全删除。
窗口有如下组件:


  • Window Assigner:用来决定某个元素被分配到哪个或哪些窗口中去。
  • Trigger:触发器。决定了一个窗口何时可以或许被盘算或清除。触发战略可能雷同于 “当窗口中的元素数量大于 4” 时,或 “当水位线通过窗口结束时”。
  • Evictor:驱逐器。Evictor 提供了在利用 WindowFunction 之前或者之后从窗口中删除元素的本领。
窗口还拥有函数,比如 ProcessWindowFunction,ReduceFunction,AggregateFunction 或 FoldFunction。该函数将包含要应用于窗口内容的盘算,而触发器指定窗口被认为准备好应用该函数的条件。
1.4 Keyed vs Non-Keyed

在界说窗口之前,要指定的第一件事是流是否需要 Keyed,利用 keyBy(...) 将无界流分成逻辑的 keyed stream。假如未调用 keyBy(...),则表现流不是 keyed stream。


  • 对于 Keyed 流,可以将传入事件的任何属性用作 key。拥有 keyed stream 将允许窗口盘算由多个任务并行实验,因为每个逻辑 Keyed 流可以独立于别的任务举行处理。相同 Key 的所有元素将被发送到同一个任务。
  • 在 Non-Keyed 流的情况下,原始流将不会被分成多个逻辑流,而且所有窗口逻辑将由单个任务实验,即并行性为 1。
1.5 Flink 中的时间

Flink 在流处理程序支持差别的时间概念。分别为是 事件时间(Event Time)、处理时间(Processing Time)、提取时间(Ingestion Time)。
从时间序列角度来说,发生的先后次序是:事件时间提取时间处理时间


  • Event Time 是事件在现实天下中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入 Apache Flink 流处理体系的时间,也就是 Flink 读取数据源时间。
  • Processing Time 是数据流入到详细某个算子 (消息被盘算处理) 时候相应的体系时间。也就是 Flink 程序处理该事件时当前体系时间。
2.Watermark

Watermark 是 Apache Flink 为了处理 EventTime 窗口盘算提出的一种机制,本质上也是一种时间戳。Watermark 是用于处理乱序事件或延伸数据的,这通常用 Watermark 机制结合 Window 来实现(Watermarks 用来触发 Window 窗口盘算)。
2.1 案例一

  1. public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
  2.     private final long maxOutOfOrderness = 3000; // 3.0 seconds
  3.     private long currentMaxTimestamp;
  4.     @Override
  5.     public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  6.         long timestamp = element.getCreationTime();
  7.         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  8.         return timestamp;
  9.     }
  10.     @Override
  11.     public Watermark getCurrentWatermark() {
  12.         // return the watermark as current highest timestamp minus the out-of-orderness bound
  13.         // 生成 watermark
  14.         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  15.     }
  16. }
复制代码

上图中是一个 10s 大小的窗口,10000~ 20000 为一个窗口。当 EventTime 为 23000 的数据到来,生成的 WaterMark 的时间戳为 20000,大于即是 window_end_time,会触发窗口盘算。
2.2 案例二

  1. public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
  2.     private final long maxTimeLag = 3000; // 3 seconds
  3.     @Override
  4.     public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  5.         return element.getCreationTime();
  6.     }
  7.     @Override
  8.     public Watermark getCurrentWatermark() {
  9.         // return the watermark as current time minus the maximum time lag
  10.         return new Watermark(System.currentTimeMillis() - maxTimeLag);
  11.     }
  12. }
复制代码

只是简朴的用当前体系时间减去最大延伸时间生成 Watermark ,当 WaterMark 为 20000 时,大于即是窗口的结束时间,会触发 10000 ~ 20000 窗口盘算。再当 EventTime 为 19500 的数据到来,它本应该是属于窗口 10000 ~ 20000 窗口的,但这个窗口已经触发盘算了,以是此数据会被抛弃。
2.3 如何设置最大乱序时间

虽说水位线表明着早于它的事件不应该再出现,接收到水位线以前的的消息是不可避免的,这就是所谓的 迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件差别的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。
迟到事件出现时窗口已经关闭并产出了盘算结果,因此处理的方法有 3 种:


  • 重新激活已经关闭的窗口并重新盘算以修正结果。将迟到事件收集起来别的处理。将迟到事件视为错误消息并抛弃。Flink 默认的处理方式是直接抛弃,其他两种方式分别利用 Side Output 和 Allowed Lateness。
  • Side Output 机制 可以将迟到事件单独放入一个数据流分支,这会作为 Window 盘算结果的副产物,以便用户获取并对其举行特别处理。
  • Allowed Lateness 机制 允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后不停保存窗口的状态直至凌驾允许迟到时长,这期间迟到的事件不会被抛弃,而是默认会触发窗口重新盘算。因为保存窗口状态需要额外内存,而且假如窗口盘算利用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量盘算,代价比较大,以是允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该思量降低水位线进步的速率或者调整算法。
这里总结机制为:


  • 窗口 Window 的作用是为了周期性的获取数据。
  • WaterMark 的作用是防止数据出现乱序(常常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
  • allowLateNess 是将窗口关闭时间再延伸一段时间。
  • sideOutPut 是末了兜底操作,所有过期延伸数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。
  1. public class TumblingEventWindowExample {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  5.         env.setParallelism(1);
  6. //        env.getConfig().setAutoWatermarkInterval(100);
  7.         DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
  8.         DataStream<Tuple2<String, Long>> resultStream = socketStream
  9.                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
  10.                     @Override
  11.                     public long extractTimestamp(String element) {
  12.                         long eventTime = Long.parseLong(element.split(" ")[0]);
  13.                         System.out.println(eventTime);
  14.                         return eventTime;
  15.                     }
  16.                 })
  17.                 .map(new MapFunction<String, Tuple2<String, Long>>() {
  18.                     @Override
  19.                     public Tuple2<String, Long> map(String value) throws Exception {
  20.                         return Tuple2.of(value.split(" ")[1], 1L);
  21.                     }
  22.                 })
  23.                 .keyBy(0)
  24.                 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  25.                 .allowedLateness(Time.seconds(2)) // 允许延迟处理2秒
  26.                 .reduce(new ReduceFunction<Tuple2<String, Long>>() {
  27.                     @Override
  28.                     public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
  29.                         return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
  30.                     }
  31.                 });
  32.         resultStream.print();
  33.         env.execute();
  34.     }
  35. }
复制代码

当 watermark 为 21000 时,触发了 [10000, 20000) 窗口盘算,由于设置了 allowedLateness(Time.seconds(2)),即允许两秒延伸处理,watermark < window_end_time + lateTime 公式得到满足,因此随后 10000 和 12000 进入窗口时,依然能触发窗口盘算;随后 watermark 增长到 22000,watermark < window_end_time + lateTime 不再满足,因此 11000 再次进入窗口时,窗口不再举行盘算。
2.4 延伸数据重定向

流的返回值必须是 SingleOutputStreamOperator,其是 DataStream 的子类。通过 getSideOutput 方法获取延伸数据。可以将延伸数据重定向到其他流或者举行输出。
  1. public class TumblingEventWindowExample {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  5.         env.setParallelism(1);
  6.         DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
  7.         //保存被丢弃的数据
  8.         OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
  9.         //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
  10.         SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = socketStream
  11.                 // Time.seconds(3)有序的情况修改为0
  12.                 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
  13.                     @Override
  14.                     public long extractTimestamp(String element) {
  15.                         long eventTime = Long.parseLong(element.split(" ")[0]);
  16.                         System.out.println(eventTime);
  17.                         return eventTime;
  18.                     }
  19.                 })
  20.                 .map(new MapFunction<String, Tuple2<String, Long>>() {
  21.                     @Override
  22.                     public Tuple2<String, Long> map(String value) throws Exception {
  23.                         return Tuple2.of(value.split(" ")[1], 1L);
  24.                     }
  25.                 })
  26.                 .keyBy(0)
  27.                 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  28.                 .sideOutputLateData(outputTag) // 收集延迟大于2s的数据
  29.                 .allowedLateness(Time.seconds(2)) //允许2s延迟
  30.                 .reduce(new ReduceFunction<Tuple2<String, Long>>() {
  31.                     @Override
  32.                     public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
  33.                         return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
  34.                     }
  35.                 });
  36.         resultStream.print();
  37.         //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
  38.         DataStream<Tuple2<String, Long>> sideOutput = resultStream.getSideOutput(outputTag);
  39.         sideOutput.print();
  40.         env.execute();
  41.     }
  42. }
复制代码
3.在 DDL 中的界说

3.1 事件时间

事件时间属性是通过 CREATE TABLE DDL 语句中的 WATERMARK 语句界说的。水印语句在现有事件时间字段上界说 水印生成表达式,将事件时间字段标记为事件时间属性。
Flink SQL 支持在 TIMESTAMP 和 TIMESTAMP_LTZ 列上界说事件时间属性。假如源中的时间戳数据以 年-月-日-时-分-秒 表现,通常是不含时区信息的字符串值,例如 2020-04-15 20:13:40.564,发起将事件-时间属性界说为 TIMESTAMP 列。
  1. CREATE TABLE user_actions (
  2.   user_name STRING,
  3.   data STRING,
  4.   user_action_time TIMESTAMP(3),
  5.   -- Declare the user_action_time column as an event-time attribute
  6.   -- and use a 5-seconds-delayed watermark strategy.
  7.   WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
  8. ) WITH (
  9.   ...
  10. );
  11. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  12. FROM user_actions
  13. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
复制代码
假如数据源中的时间戳数据以纪元时间表现,通常是一个长值,例如 1618989564564,发起将事件时间属性界说为 TIMESTAMP_LTZ 列。
  1. CREATE TABLE user_actions (
  2.   user_name STRING,
  3.   data STRING,
  4.   ts BIGINT,
  5.   time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  6.   -- Declare the time_ltz column as an event-time attribute
  7.   -- and use a 5-seconds-delayed watermark strategy.
  8.   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
  9. ) WITH (
  10.   ...
  11. );
  12. SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  13. FROM user_actions
  14. GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
复制代码
3.2 处理时间

处理时间能让表格程序根据本地呆板的时间产生结果。这是最简朴的时间概念,但会产生非确定性结果。处理时间不需要提取时间戳或生成水印。
在 CREATE TABLE DDL 语句中,利用体系 PROCTIME() 函数将处理时间属性界说为盘算列。函数返回范例为 TIMESTAMP_LTZ。
  1. CREATE TABLE user_actions (
  2.   user_name STRING,
  3.   data STRING,
  4.   -- Declare an additional field as a processing-time attribute.
  5.   user_action_time AS PROCTIME()
  6. ) WITH (
  7.   ...
  8. );
  9. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  10. FROM user_actions
  11. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表