Flink Watermark详解

打印 上一主题 下一主题

主题 539|帖子 539|积分 1617

Watermark 是用于处理惩罚流数据中变乱时间(event time)乱序环境的重要机制。在流处理惩罚中,数据往往不是按照它们现实发生的时间次序到达的,这大概是由于网络延迟、体系处理惩罚延迟或其他因素导致的。为了能够在这种乱序环境中正确地执行基于时间的操纵(如时间窗口聚合),Flink 引入了 Watermark 的概念。
Watermark 是一个特别的标记,它表现“在此时间戳之前的数据应该都已经到达了”。当 Flink 的算子(operator)处理惩罚到 Watermark 时,它会认为该 Watermark 时间戳之前的全部数据都已经到达了,并可以安全地关闭或处理惩罚任何基于该时间戳的窗口。
概念



  • **定义:**Watermark是一个特别的时间戳,代表了某个时间点之前的数据理论上应该都已经到达了体系,即“最多允许的延迟”。
  • **作用:**用于处理惩罚乱序变乱,确保在某个时间窗口内完成全部相干的变乱处理惩罚。
原理



  • **乱序问题:**在流处理惩罚中,由于网络延迟等因素,变乱大概会乱序到达。Watermark机制就是用来解决这种乱序问题。
  • **工作原理:**当数据源在确认全部小于某个时间戳的消息都已输出到Flink流处理惩罚体系后,会生成一个包罗该时间戳的Watermark,插入到消息流中。Flink operator算子按照时间窗口缓存全部流入的消息,当操纵符处理惩罚到Watermark时,它会对全部小于该Watermark时间戳的时间窗口的数据举行处理惩罚并发送到下一个操纵符节点,然后也将Watermark发送到下一个操纵符节点。
用途



  • **确保窗口盘算的正确性:**Watermark团结窗口机制,可以确保在特定的时间后触发窗口去盘算,从而避免由于乱序变乱导致的窗口盘算错误。
  • **处理惩罚延迟数据:**Watermark提供了一个“最多允许的延迟”机制,对于延迟到达的数据,Flink可以根据Watermark来决定是否将其纳入当前窗口的盘算。
样例

  1. package com.wfg.flink.example.watermark;
  2. import com.wfg.flink.example.watermark.data.Event;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import java.time.Duration;
  8. import java.time.Instant;
  9. public class FlinkWatermarkDemo {
  10.     public static void main(String[] args) throws Exception {
  11.         // 设置执行环境
  12.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         // 假设我们有一个数据源,这里使用 fromElements 模拟
  14.         DataStream<Event> eventStream = env.fromElements(
  15.                 new Event(1L, Instant.now().minusSeconds(10).toEpochMilli()),
  16.                 new Event(2L, Instant.now().minusSeconds(8).toEpochMilli()),
  17.                 new Event(3L, Instant.now().minusSeconds(12).toEpochMilli()),
  18.                 new Event(4L, Instant.now().minusSeconds(15).toEpochMilli()),
  19.                 new Event(5L, Instant.now().minusSeconds(19).toEpochMilli()),
  20.                 new Event(6L, Instant.now().minusSeconds(18).toEpochMilli()),
  21.                 new Event(7L, Instant.now().minusSeconds(22).toEpochMilli())
  22.         );
  23.         // 定义 Watermark 策略,允许 5 秒的乱序
  24.         WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
  25.                 .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  26.                 .withTimestampAssigner((event, timestamp) -> {
  27.                     // 从事件中提取时间戳
  28.                     return event.getTimestamp();
  29. //                    timestamp.assignTimestamp(event.getTimestamp());
  30.                 });
  31.         // 应用 Watermark 策略,并处理数据流
  32.         DataStream<String> resultStream = eventStream
  33.                 .assignTimestampsAndWatermarks(watermarkStrategy)
  34.                 // 根据事件 ID 进行分区(这只是一个示例,实际可能根据业务需求分区)
  35.                 .keyBy(Event::getId)
  36.                 // 接下来可以进行窗口操作、时间聚合等操作
  37.                 .map(new MapFunction<Event, String>() {
  38.                     @Override
  39.                     public String map(Event event) throws Exception {
  40.                         return "Event ID: " + event.getId() + ", Timestamp: " + Instant.ofEpochMilli(event.getTimestamp());
  41.                     }
  42.                 });
  43.         // 输出结果
  44.         resultStream.print();
  45.         // 执行任务
  46.         env.execute("Flink Watermark Demo");
  47.     }
  48. }
复制代码
  若运行出错,可设置启动环境:–add-opens java.base/java.util=ALL-UNNAMED
数据类
  1. package com.wfg.flink.example.watermark.data;
  2. import lombok.Data;
  3. /**
  4. * @author wfg
  5. */
  6. @Data
  7. public class Event {
  8.     private final long id;
  9.     private final long timestamp;
  10.     public Event(long id, long timestamp) {
  11.         this.id = id;
  12.         this.timestamp = timestamp;
  13.     }
  14. }
复制代码
WatermarkStrategy

atermarkStrategy是用于处理惩罚基于变乱时间(event time)的流盘算体系中大概出现的数据乱序环境的机制。
Watermark是数据流中的一种特别数据,由Flink内部周期(可自定义)产生。它的重要作用是指示某个时间点之前的数据已经到达Flink体系,从而允许Flink开始处理惩罚这些数据。Watermark的生成计谋可以实现数据乱序的兼容。
利用

atermarkStrategy在Flink中有两种重要的利用方式:

  • 直接在数据源上利用: 这种方式下,WatermarkStrategy会在数据源处被指定,并应用于从数据源读取的数据流。这种方式可以更精准地跟踪Watermark,由于数据源可以利用watermark生成逻辑中有关分片/分区的信息。
  • 直接在非数据源的操纵之后利用: 假如无法直接在数据源上设置WatermarkStrategy,可以在数据流的其他位置(如经过某个操纵后)设置。但这种方式通常不如第一种方式精准。
设置

WatermarkStrategy的设置重要涉及到Watermark的生成计谋和主动发送周期。比方,可以利用WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))来设置一个允许数据乱序程度不超过20秒的WatermarkStrategy。此外,还可以通过修改Flink的设置文件(如flink-conf.yaml)或调用相干API方法来设置Watermark的主动发送周期。
应用

基于Flink 1.16+版本的Java API,可以利用WatermarkStrategy类配合TimestampAssigner和TimestampExtractor接口来实现Watermark的生成器。具体实现方式可以参考相干文档和示例代码。
详情

WatermarkStrategy 是一个接口,它定义了怎样为流中的变乱生成 Watermarks。由于 Flink 是一个开源项目,我们可以直接检察其源代码来了解 WatermarkStrategy 的具体实现。
WatermarkStrategy 接口定义在 Flink 的 org.apache.flink.streaming.api.functions.timestamps 包中。这个接口定义了两个方法:

  • TimestampAssigner createTimestampAssigner(SerializedValue<TypeInformation> typeInfo): 用于创建一个 TimestampAssigner,该 TimestampAssigner 负责为流中的每个元素分配时间戳。
  • WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context): 用于创建一个 WatermarkGenerator,该 WatermarkGenerator 负责基于流中的元素生成 Watermarks。
  1. // 定义 Watermark 策略,允许 5 秒的乱序
  2.         WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
  3.                 .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  4.                 .withTimestampAssigner((event, timestamp) -> {
  5.                     // 从事件中提取时间戳
  6.                     return event.getTimestamp();
  7. //                    timestamp.assignTimestamp(event.getTimestamp());
  8.                 });
复制代码
  通常,不会直接实现 WatermarkStrategy 接口,而是利用 Flink 提供的静态工厂方法来创建计谋。比方,WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness) 方法就是一个常用的计谋,它允许一定程度的乱序。
  assignTimestampsAndWatermarks

assignTimestampsAndWatermarks 方法是用于为数据流中的变乱分配时间戳和 Watermarks 的。这个方法通常与 WatermarkStrategy 一起利用,以定义怎样为流中的每个元素分配时间戳以及何时生成 Watermarks。
  1. // ...  
  2.   
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  4.   
  5. // 假设有一个名为 eventStream 的 DataStream,其中包含具有时间戳的事件  
  6. DataStream<MyEvent> eventStream = ...; // 获取或创建事件流  
  7.   
  8. // 创建一个 WatermarkStrategy,这里使用了一个允许一定乱序的 BoundedOutOfOrdernessTimestampExtractor  
  9. WatermarkStrategy<MyEvent> watermarkStrategy = WatermarkStrategy  
  10.     .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))  
  11.     .withTimestampAssigner(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>() {  
  12.         @Override  
  13.         public long extractTimestamp(MyEvent element) {  
  14.             return element.getTimestamp(); // 假设 MyEvent 有一个 getTimestamp() 方法返回事件的时间戳  
  15.         }  
  16.   
  17.         @Override  
  18.         public long getMaxAllowedLatency(MyEvent element) {  
  19.             return Duration.ofSeconds(10).toMillis(); // 最大允许乱序时间为10秒  
  20.         }  
  21.     });  
  22.   
  23. // 为事件流分配时间戳和 Watermarks  
  24. DataStream<MyEvent> timestampedStream = eventStream.assignTimestampsAndWatermarks(watermarkStrategy);  
  25.   
  26. // 现在可以基于事件时间进行窗口操作或其他时间感知的操作了  
  27. timestampedStream  
  28.     .keyBy(event -> event.getKey()) // 假设 MyEvent 有一个 getKey() 方法  
  29.     .timeWindow(Time.seconds(30)) // 使用基于事件时间的30秒窗口  
  30.     .apply(new WindowFunction<MyEvent, String, String, TimeWindow>() {  
  31.         // ... 实现 WindowFunction  
  32.     })  
  33.     .print(); // 打印结果或其他后续操作  
  34.   
  35. // ...
复制代码
  WatermarkStrategy 利用了 BoundedOutOfOrdernessTimestampExtractor,它允许一定程度的数据乱序(在这个例子中是10秒)。extractTimestamp 方法用于为变乱分配时间戳,而 getMaxAllowedLatency 方法定义了乱序时间的上限。然后,我们利用 assignTimestampsAndWatermarks 方法将这个计谋应用到变乱流上,从而得到一个带偶然间戳和 Watermarks 的新流,可以在其上执行基于变乱时间的操纵。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

千千梦丶琪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表