点一下关注吧!!!非常感谢!!持续更新!!!
现在已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink Time 详解
- 示例内容分析
- Watermark
Watermark
Watermark 在窗口计算中的作用
在利用基于变乱时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。比方,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的竣事时间后,Flink 才会触发该窗口的计算。
假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。
怎样处置惩罚迟到变乱
尽管 Watermark 能有用解决乱序题目,但总有可能会出现变乱在天生 Watermark 之后才到达的环境(即“迟到变乱”)。为此,Flink 提供了处置惩罚迟到变乱的机制:
- 答应肯定的延迟处置惩罚:可以设置窗口答应迟到的时间。
- 迟到变乱的侧输出流(Side Output):可以将迟到的变乱发送到一个侧输出流中,以便后续处置惩罚。
- DataStream<Tuple2<String, Integer>> mainStream =
- stream.keyBy(t -> t.f0)
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- .allowedLateness(Time.seconds(5))
- .sideOutputLateData(lateOutputTag);
复制代码 代码实现
数据格式
- 01,1586489566000
- 01,1586489567000
- 01,1586489568000
- 01,1586489569000
- 01,1586489570000
- 01,1586489571000
- 01,1586489572000
- 01,1586489573000
- 01,1586489574000
- 01,1586489575000
- 01,1586489576000
- 01,1586489577000
- 01,1586489578000
- 01,1586489579000
复制代码 编写代码
这段代码实现了:
- 通过 socket 获取实时流数据。
- 将流数据映射成带有时间戳的二元组形式。
- 应用了一个答应 5 秒乱序的水印计谋,确保 Flink 可以处置惩罚乱序的变乱流。
- 按照变乱的 key 举行分组,并在变乱时间的底子上举行 5 秒的滚动窗口计算。
- 末了输出每个窗口内变乱的时间范围、窗口开始和竣事时间等信息。
此中,这里对流数据举行了按 key(变乱的第一个字段)分组,并且利用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的全部变乱,并根据变乱时间戳举行排序,然后输出每个窗口的开始和竣事时间,以及窗口中最早和最晚变乱的时间戳。
- SingleOutputStreamOperator<String> res = waterMark
- .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
- @Override
- public String getKey(Tuple2<String, Long> value) throws Exception {
- return value.f0;
- }
- })
- .window(TumblingEventTimeWindows.of(Time.seconds(5)))
- .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
- @Override
- public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
- List<Long> list = new ArrayList<>();
- for (Tuple2<String, Long> next : input) {
- list.add(next.f1);
- }
- Collections.sort(list);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
- + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
- out.collect(result);
- }
- });
复制代码 水印的计谋,定义了一个Bounded Out-of-Orderness 的水印计谋,答应最多 5 秒的变乱乱序,在 extractTimestamp 中,提取了变乱的时间戳,并打印出每个变乱的 key 和对应的变乱时间。还维护了一个 currentMaxTimestamp 来记录当前最大的变乱时间戳:
- WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
- .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
- .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
- Long currentMaxTimestamp = 0L;
- final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- @Override
- public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
- long timestamp = element.f1;
- currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
- System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
- return element.f1;
- }
- });
复制代码 完整代码如下所示,代码实现了一个基于变乱时间的流处置惩罚系统,并通过水印(Watermark)机制来处置惩罚乱序变乱:
- package icu.wzk;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
- import java.text.SimpleDateFormat;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- public class WatermarkTest01 {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
- SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(
- new MapFunction<String, Tuple2<String, Long>>() {
- @Override
- public Tuple2<String, Long> map(String value) throws Exception {
- String[] split = value.split(",");
- return new Tuple2<>(split[0], Long.valueOf(split[1]));
- }
- }
- );
- WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
- .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
- .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
- Long currentMaxTimestamp = 0L;
- final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- @Override
- public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
- long timestamp = element.f1;
- currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
- System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
- return element.f1;
- }
- });
- SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped
- .assignTimestampsAndWatermarks(watermarkStrategy);
- SingleOutputStreamOperator<String> res = waterMark
- .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
- @Override
- public String getKey(Tuple2<String, Long> value) throws Exception {
- return value.f0;
- }
- })
- .window(TumblingEventTimeWindows.of(Time.seconds(5)))
- .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
- @Override
- public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
- List<Long> list = new ArrayList<>();
- for (Tuple2<String, Long> next : input) {
- list.add(next.f1);
- }
- Collections.sort(list);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
- + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
- out.collect(result);
- }
- });
- res.print();
- env.execute();
- }
- }
复制代码 运行代码
传入数据
在控制台中,输入如下的数据:
检察结果
控制台运行结果如下:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |