ToB企服应用市场:ToB评测及商务社交产业平台
标题:
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
[打印本页]
作者:
慢吞云雾缓吐愁
时间:
2024-9-14 01:39
标题:
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
点一下关注吧!!!非常感谢!!持续更新!!!
现在已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
Flink Time 详解
示例内容分析
Watermark
Watermark
Watermark 在窗口计算中的作用
在利用基于变乱时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。比方,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的竣事时间后,Flink 才会触发该窗口的计算。
假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。
怎样处置惩罚迟到变乱
尽管 Watermark 能有用解决乱序题目,但总有可能会出现变乱在天生 Watermark 之后才到达的环境(即“迟到变乱”)。为此,Flink 提供了处置惩罚迟到变乱的机制:
答应肯定的延迟处置惩罚:可以设置窗口答应迟到的时间。
迟到变乱的侧输出流(Side Output):可以将迟到的变乱发送到一个侧输出流中,以便后续处置惩罚。
DataStream<Tuple2<String, Integer>> mainStream =
stream.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(lateOutputTag);
复制代码
代码实现
数据格式
01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000
01,1586489574000
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000
复制代码
编写代码
这段代码实现了:
通过 socket 获取实时流数据。
将流数据映射成带有时间戳的二元组形式。
应用了一个答应 5 秒乱序的水印计谋,确保 Flink 可以处置惩罚乱序的变乱流。
按照变乱的 key 举行分组,并在变乱时间的底子上举行 5 秒的滚动窗口计算。
末了输出每个窗口内变乱的时间范围、窗口开始和竣事时间等信息。
此中,这里对流数据举行了按 key(变乱的第一个字段)分组,并且利用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的全部变乱,并根据变乱时间戳举行排序,然后输出每个窗口的开始和竣事时间,以及窗口中最早和最晚变乱的时间戳。
SingleOutputStreamOperator<String> res = waterMark
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
List<Long> list = new ArrayList<>();
for (Tuple2<String, Long> next : input) {
list.add(next.f1);
}
Collections.sort(list);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
+ ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
out.collect(result);
}
});
复制代码
水印的计谋,定义了一个Bounded Out-of-Orderness 的水印计谋,答应最多 5 秒的变乱乱序,在 extractTimestamp 中,提取了变乱的时间戳,并打印出每个变乱的 key 和对应的变乱时间。还维护了一个 currentMaxTimestamp 来记录当前最大的变乱时间戳:
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
return element.f1;
}
});
复制代码
完整代码如下所示,代码实现了一个基于变乱时间的流处置惩罚系统,并通过水印(Watermark)机制来处置惩罚乱序变乱:
package icu.wzk;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class WatermarkTest01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(
new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<>(split[0], Long.valueOf(split[1]));
}
}
);
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
return element.f1;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped
.assignTimestampsAndWatermarks(watermarkStrategy);
SingleOutputStreamOperator<String> res = waterMark
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
List<Long> list = new ArrayList<>();
for (Tuple2<String, Long> next : input) {
list.add(next.f1);
}
Collections.sort(list);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
+ ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
out.collect(result);
}
});
res.print();
env.execute();
}
}
复制代码
运行代码
传入数据
在控制台中,输入如下的数据:
检察结果
控制台运行结果如下:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4