大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window ...

打印 上一主题 下一主题

主题 931|帖子 931|积分 2793

点一下关注吧!!!非常感谢!!持续更新!!!

现在已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Flink Time 详解
  • 示例内容分析
  • Watermark

Watermark

Watermark 在窗口计算中的作用

在利用基于变乱时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。比方,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的竣事时间后,Flink 才会触发该窗口的计算。
假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。
怎样处置惩罚迟到变乱

尽管 Watermark 能有用解决乱序题目,但总有可能会出现变乱在天生 Watermark 之后才到达的环境(即“迟到变乱”)。为此,Flink 提供了处置惩罚迟到变乱的机制:


  • 答应肯定的延迟处置惩罚:可以设置窗口答应迟到的时间。
  • 迟到变乱的侧输出流(Side Output):可以将迟到的变乱发送到一个侧输出流中,以便后续处置惩罚。
  1. DataStream<Tuple2<String, Integer>> mainStream =
  2.   stream.keyBy(t -> t.f0)
  3.         .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  4.         .allowedLateness(Time.seconds(5))
  5.         .sideOutputLateData(lateOutputTag);
复制代码
代码实现

数据格式

  1. 01,1586489566000
  2. 01,1586489567000
  3. 01,1586489568000
  4. 01,1586489569000
  5. 01,1586489570000
  6. 01,1586489571000
  7. 01,1586489572000
  8. 01,1586489573000
  9. 01,1586489574000
  10. 01,1586489575000
  11. 01,1586489576000
  12. 01,1586489577000
  13. 01,1586489578000
  14. 01,1586489579000
复制代码
编写代码

这段代码实现了:


  • 通过 socket 获取实时流数据。
  • 将流数据映射成带有时间戳的二元组形式。
  • 应用了一个答应 5 秒乱序的水印计谋,确保 Flink 可以处置惩罚乱序的变乱流。
  • 按照变乱的 key 举行分组,并在变乱时间的底子上举行 5 秒的滚动窗口计算。
  • 末了输出每个窗口内变乱的时间范围、窗口开始和竣事时间等信息。
此中,这里对流数据举行了按 key(变乱的第一个字段)分组,并且利用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的全部变乱,并根据变乱时间戳举行排序,然后输出每个窗口的开始和竣事时间,以及窗口中最早和最晚变乱的时间戳。
  1. SingleOutputStreamOperator<String> res = waterMark
  2.     .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
  3.         @Override
  4.         public String getKey(Tuple2<String, Long> value) throws Exception {
  5.             return value.f0;
  6.         }
  7.     })
  8.     .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  9.     .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
  10.         @Override
  11.         public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
  12.             List<Long> list = new ArrayList<>();
  13.             for (Tuple2<String, Long> next : input) {
  14.                 list.add(next.f1);
  15.             }
  16.             Collections.sort(list);
  17.             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  18.             String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
  19.                     + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
  20.             out.collect(result);
  21.         }
  22.     });
复制代码
水印的计谋,定义了一个Bounded Out-of-Orderness 的水印计谋,答应最多 5 秒的变乱乱序,在 extractTimestamp 中,提取了变乱的时间戳,并打印出每个变乱的 key 和对应的变乱时间。还维护了一个 currentMaxTimestamp 来记录当前最大的变乱时间戳:
  1. WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
  2.     .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  3.     .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
  4.         Long currentMaxTimestamp = 0L;
  5.         final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  6.         @Override
  7.         public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
  8.             long timestamp = element.f1;
  9.             currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
  10.             System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
  11.             return element.f1;
  12.         }
  13.     });
复制代码
完整代码如下所示,代码实现了一个基于变乱时间的流处置惩罚系统,并通过水印(Watermark)机制来处置惩罚乱序变乱:
  1. package icu.wzk;
  2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.functions.KeySelector;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.streaming.api.TimeCharacteristic;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import java.text.SimpleDateFormat;
  17. import java.time.Duration;
  18. import java.util.ArrayList;
  19. import java.util.Collections;
  20. import java.util.List;
  21. public class WatermarkTest01 {
  22.     public static void main(String[] args) throws Exception {
  23.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24.         env.setParallelism(1);
  25.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  26.         DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
  27.         SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(
  28.                 new MapFunction<String, Tuple2<String, Long>>() {
  29.                     @Override
  30.                     public Tuple2<String, Long> map(String value) throws Exception {
  31.                         String[] split = value.split(",");
  32.                         return new Tuple2<>(split[0], Long.valueOf(split[1]));
  33.                     }
  34.                 }
  35.         );
  36.         WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
  37.                 .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  38.                 .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
  39.                     Long currentMaxTimestamp = 0L;
  40.                     final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  41.                     @Override
  42.                     public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
  43.                         long timestamp = element.f1;
  44.                         currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
  45.                         System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
  46.                         return element.f1;
  47.                     }
  48.                 });
  49.         SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped
  50.                 .assignTimestampsAndWatermarks(watermarkStrategy);
  51.         SingleOutputStreamOperator<String> res = waterMark
  52.                 .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
  53.                     @Override
  54.                     public String getKey(Tuple2<String, Long> value) throws Exception {
  55.                         return value.f0;
  56.                     }
  57.                 })
  58.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  59.                 .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
  60.                     @Override
  61.                     public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
  62.                         List<Long> list = new ArrayList<>();
  63.                         for (Tuple2<String, Long> next : input) {
  64.                             list.add(next.f1);
  65.                         }
  66.                         Collections.sort(list);
  67.                         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  68.                         String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
  69.                                 + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
  70.                         out.collect(result);
  71.                     }
  72.                 });
  73.         res.print();
  74.         env.execute();
  75.     }
  76. }
复制代码
运行代码


传入数据

在控制台中,输入如下的数据:

检察结果

控制台运行结果如下:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表