ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window [打印本页]

作者: 慢吞云雾缓吐愁    时间: 2024-9-14 01:39
标题: 大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
点一下关注吧!!!非常感谢!!持续更新!!!

现在已经更新到了:


章节内容

上节我们完成了如下的内容:


Watermark

Watermark 在窗口计算中的作用

在利用基于变乱时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。比方,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的竣事时间后,Flink 才会触发该窗口的计算。
假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。
怎样处置惩罚迟到变乱

尽管 Watermark 能有用解决乱序题目,但总有可能会出现变乱在天生 Watermark 之后才到达的环境(即“迟到变乱”)。为此,Flink 提供了处置惩罚迟到变乱的机制:

  1. DataStream<Tuple2<String, Integer>> mainStream =
  2.   stream.keyBy(t -> t.f0)
  3.         .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  4.         .allowedLateness(Time.seconds(5))
  5.         .sideOutputLateData(lateOutputTag);
复制代码
代码实现

数据格式

  1. 01,1586489566000
  2. 01,1586489567000
  3. 01,1586489568000
  4. 01,1586489569000
  5. 01,1586489570000
  6. 01,1586489571000
  7. 01,1586489572000
  8. 01,1586489573000
  9. 01,1586489574000
  10. 01,1586489575000
  11. 01,1586489576000
  12. 01,1586489577000
  13. 01,1586489578000
  14. 01,1586489579000
复制代码
编写代码

这段代码实现了:

此中,这里对流数据举行了按 key(变乱的第一个字段)分组,并且利用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的全部变乱,并根据变乱时间戳举行排序,然后输出每个窗口的开始和竣事时间,以及窗口中最早和最晚变乱的时间戳。
  1. SingleOutputStreamOperator<String> res = waterMark
  2.     .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
  3.         @Override
  4.         public String getKey(Tuple2<String, Long> value) throws Exception {
  5.             return value.f0;
  6.         }
  7.     })
  8.     .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  9.     .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
  10.         @Override
  11.         public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
  12.             List<Long> list = new ArrayList<>();
  13.             for (Tuple2<String, Long> next : input) {
  14.                 list.add(next.f1);
  15.             }
  16.             Collections.sort(list);
  17.             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  18.             String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
  19.                     + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
  20.             out.collect(result);
  21.         }
  22.     });
复制代码
水印的计谋,定义了一个Bounded Out-of-Orderness 的水印计谋,答应最多 5 秒的变乱乱序,在 extractTimestamp 中,提取了变乱的时间戳,并打印出每个变乱的 key 和对应的变乱时间。还维护了一个 currentMaxTimestamp 来记录当前最大的变乱时间戳:
  1. WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
  2.     .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  3.     .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
  4.         Long currentMaxTimestamp = 0L;
  5.         final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  6.         @Override
  7.         public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
  8.             long timestamp = element.f1;
  9.             currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
  10.             System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
  11.             return element.f1;
  12.         }
  13.     });
复制代码
完整代码如下所示,代码实现了一个基于变乱时间的流处置惩罚系统,并通过水印(Watermark)机制来处置惩罚乱序变乱:
  1. package icu.wzk;
  2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.functions.KeySelector;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.streaming.api.TimeCharacteristic;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import java.text.SimpleDateFormat;
  17. import java.time.Duration;
  18. import java.util.ArrayList;
  19. import java.util.Collections;
  20. import java.util.List;
  21. public class WatermarkTest01 {
  22.     public static void main(String[] args) throws Exception {
  23.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24.         env.setParallelism(1);
  25.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  26.         DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
  27.         SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(
  28.                 new MapFunction<String, Tuple2<String, Long>>() {
  29.                     @Override
  30.                     public Tuple2<String, Long> map(String value) throws Exception {
  31.                         String[] split = value.split(",");
  32.                         return new Tuple2<>(split[0], Long.valueOf(split[1]));
  33.                     }
  34.                 }
  35.         );
  36.         WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
  37.                 .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  38.                 .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
  39.                     Long currentMaxTimestamp = 0L;
  40.                     final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  41.                     @Override
  42.                     public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
  43.                         long timestamp = element.f1;
  44.                         currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
  45.                         System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
  46.                         return element.f1;
  47.                     }
  48.                 });
  49.         SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped
  50.                 .assignTimestampsAndWatermarks(watermarkStrategy);
  51.         SingleOutputStreamOperator<String> res = waterMark
  52.                 .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
  53.                     @Override
  54.                     public String getKey(Tuple2<String, Long> value) throws Exception {
  55.                         return value.f0;
  56.                     }
  57.                 })
  58.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  59.                 .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
  60.                     @Override
  61.                     public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
  62.                         List<Long> list = new ArrayList<>();
  63.                         for (Tuple2<String, Long> next : input) {
  64.                             list.add(next.f1);
  65.                         }
  66.                         Collections.sort(list);
  67.                         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  68.                         String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
  69.                                 + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
  70.                         out.collect(result);
  71.                     }
  72.                 });
  73.         res.print();
  74.         env.execute();
  75.     }
  76. }
复制代码
运行代码


传入数据

在控制台中,输入如下的数据:

检察结果

控制台运行结果如下:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4