ToB企服应用市场:ToB评测及商务社交产业平台
标题:
一文搞懂FLink WaterMark源码
[打印本页]
作者:
九天猎人
时间:
2024-10-12 09:38
标题:
一文搞懂FLink WaterMark源码
1. waterMark 源码分析
flink的watermark(水位线)是针对EventTime的盘算,它用来标识应该何时触发窗口盘算,随着数据不停地进入flink步伐,事件的EventTime也将慢慢增大,随之watermark水位线也将逐步上涨,当watermark上涨超过窗口的结束时间,将开始触发窗口盘算。
本文重要分析一下watermark在flink内部的工作原理,包罗watermark的产生、流传、触发盘算的过程。
起首来看看WaterMark的结构,可以看到Watermark的结构非常简单,只有一个时间戳timestamp
public final class Watermark implements Serializable {
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestamp() {
return timestamp;
}
public String getFormattedTimestamp() {
return TS_FORMATTER.get().format(new Date(timestamp));
}
}
复制代码
1.1.1 设置 waterMark
source.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner());
复制代码
起首来看assignTimestampsAndWatermarks()方法:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
final TimestampsAndWatermarksOperator<T> operator =
new TimestampsAndWatermarksOperator<>(cleanedStrategy);
// match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship and chain
final int inputParallelism = getTransformation().getParallelism();
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
复制代码
可以看到assignTimestampsAndWatermarks()会产生一个新的DataStream,同时会创建一个TimestampsAndWatermarksOperator,看来这个方法和普通的map、filter算子是类似的。
下面附上该代码的执行筹划图,可以看到TimestampsAndWatermarksOperator就是一个普通的operator,可以和filter/map operator进行chain。
1.1.2 打开WatermarksOperator
在Task启动时,会调用operator.open()方法,看看TimestampsAndWatermarksOperator的open方法干了啥
public class TimestampsAndWatermarksOperator<T>
extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
private static final long serialVersionUID = 1L;
private final WatermarkStrategy<T> watermarkStrategy;
/** The timestamp assigner. */
private transient TimestampAssigner<T> timestampAssigner;
/** The watermark generator, initialized during runtime. */
private transient WatermarkGenerator<T> watermarkGenerator;
/** The watermark output gateway, initialized during runtime. */
private transient WatermarkOutput wmOutput;
/** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */
private transient long watermarkInterval;
public TimestampsAndWatermarksOperator(
WatermarkStrategy<T> watermarkStrategy) {
this.watermarkStrategy = checkNotNull(watermarkStrategy);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
watermarkGenerator = watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
// 实例化 WatermarkEmitter 发射器
wmOutput = new WatermarkEmitter(output, getContainingTask().getStreamStatusMaintainer());
// 从配置中获取到 waterMark 生产间隔
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
// 根据处理时间注册一个 waterMark 生产间隔闹钟
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
// 每个数据过来都会调用这个方法
@Override
public void processElement(final StreamRecord<T> element) throws Exception {
final T event = element.getValue();
final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
element.setTimestamp(newTimestamp);
output.collect(element);
// 这里生成 waterMark
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
watermarkGenerator.onPeriodicEmit(wmOutput);
// 注册闹钟
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
/**
* Override the base implementation to completely ignore watermarks propagated from
* upstream, except for the "end of time" watermark.
*/
@Override
public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE) {
wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
}
}
@Override
public void close() throws Exception {
super.close();
watermarkGenerator.onPeriodicEmit(wmOutput);
}
// ------------------------------------------------------------------------
/**
* Implementation of the {@code WatermarkEmitter}, based on the components
* that are available inside a stream operator.
*/
// Watermark 发送器
private static final class WatermarkEmitter implements WatermarkOutput {
private final Output<?> output;
private final StreamStatusMaintainer statusMaintainer;
// 当前的 WaterMark
private long currentWatermark;
private boolean idle;
WatermarkEmitter(Output<?> output, StreamStatusMaintainer statusMaintainer) {
this.output = output;
this.statusMaintainer = statusMaintainer;
this.currentWatermark = Long.MIN_VALUE;
}
@Override
public void emitWatermark(Watermark watermark) {
final long ts = watermark.getTimestamp();
if (ts <= currentWatermark) {
return;
}
currentWatermark = ts;
if (idle) {
idle = false;
statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
}
// 将 waterMark 发送到下游
output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
}
@Override
public void markIdle() {
idle = true;
statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
}
}
}
复制代码
每条数据来的时间,应该获取到最大的 timeStamp 作为 waterMark
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
// 获取最大的 timeStamp作为 WaterMark
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 周期性将 WaterMark 发送出去
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
复制代码
可以看到open()方法就是注册了一个定时任务,这个定时任务触发的间隔时间就是在步伐里设置的
setAutoWatermarkInterval(interval)这个值,默认是200ms,到达时间之后将会触发target.onProcessingTime(timestamp)
// 当闹钟时间到达之后调用这个方法
public void onProcessingTime(long timestamp) throws Exception {
// 会调用这个方法
watermarkGenerator.onPeriodicEmit(wmOutput);
final long now = getProcessingTimeService().getCurrentProcessingTime();
// 注册闹钟
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
复制代码
终极会调用到WatermarkEmitter发射器的 emitWatermark 方法,会通过 output.emitWatermark 讲当前的 WaterMark 发送出去
@Override
public void emitWatermark(Watermark watermark) {
final long ts = watermark.getTimestamp();
if (ts <= currentWatermark) {
return;
}
currentWatermark = ts;
if (idle) {
idle = false;
statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
}
output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
}
复制代码
1.1.3 发送 WaterMark
//CountingOutput类
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
//ChainingOutput类
public void emitWatermark(Watermark mark) {
try {
// 设置当前 WaterMark
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
if (streamStatusProvider.getStreamStatus().isActive()) {
// 处理 WaterMark
operator.processWatermark(mark);
}
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
//AbstractStreamOperator类
public void processWatermark(Watermark mark) throws Exception {
//mapOperator中timeServiceManager == null
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}
//CountingOutput类
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
}
//RecordWriterOutput类
public void emitWatermark(Watermark mark) {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark);
if (streamStatusProvider.getStreamStatus().isActive()) {
try {
// 将 waterMark 信息广播发送
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
//RecordWriter类
public void broadcastEmit(T record) throws IOException, InterruptedException {
checkErroneous();
serializer.serializeRecord(record);
boolean pruneAfterCopying = false;
//这里是将watermark广播出去,每一个下游通道都要发送,也就是说会发送到下游的每一个task
for (int channel : broadcastChannels) {
if (copyFromSerializerToTargetChannel(channel)) {
pruneAfterCopying = true;
}
}
// Make sure we don't hold onto the large intermediate serialization buffer for too long
if (pruneAfterCopying) {
serializer.prune();
}
}
复制代码
上述代码说明白watermark在本例中的代码调用过程。
在本例中,watermarkOperator和map是chain在一起的,watermarkOperator会将watermark直接通报给map,mapOperator会调用processWatermark()方法来处置惩罚watermark,在mapOperator中timeServiceManager == null,所以mapOperator对watermark不做任务处置惩罚,而是直接将其送达出去。
mapOperator持有的Output是RecordWriterOutput,RecordWriterOutput它会通过RecordWriter将watermark广播到下游的所有通道,即发送给下游的所有task。也就是说,上游的一个task在更新了watermark之后,会将watermark广播给下游的所有task。
3.1.4 下游Task接收并处置惩罚watermark
下游task是一个OneInputStreamTask,通过数据处置惩罚器StreamInputProcessor.processInput()来处置惩罚接收到的数据信息
@Override
public InputStatus processInput() throws Exception {
// 发送下一个
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
operatorChain.endHeadOperatorInput(1);
}
return status;
}
// StreamTaskNetworkInput 类
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
// 处理数据
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is performed for unaligned checkpoint mode
if (bufferOrEvent.get().isEvent() && bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
return InputStatus.MORE_AVAILABLE;
}
processBufferOrEvent(bufferOrEvent.get());
} else {
if (checkpointedInputGate.isFinished()) {
checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
// 如果是一个数据,那么通过 output 往下游发送数据
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());
}
// 如果是一个 waterMark,那么通过 output 往下游发送 waterMark
else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
复制代码
StreamInputProcessor会调用statusWatermarkValve.inputWatermark()来处置惩罚接收到watermark信息。看下代码:
public void inputWatermark(Watermark watermark, int channelIndex) {
// ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
long watermarkMillis = watermark.getTimestamp();
// if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
//channelStatuses是当前task对每个inputChannel(也就是每个上游task)的状态信息记录,
//当新的watermark值大于inputChannel的watermark,就会进行调整
if (watermarkMillis > channelStatuses[channelIndex].watermark) {
channelStatuses[channelIndex].watermark = watermarkMillis;
// previously unaligned input channels are now aligned if its watermark has caught up
if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
channelStatuses[channelIndex].isWatermarkAligned = true;
}
// now, attempt to find a new min watermark across all aligned channels
//从各个inputChannel的watermark里找到最小的的watermark进行处理
findAndOutputNewMinWatermarkAcrossAlignedChannels();
}
}
}
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;
// determine new overall watermark by considering only watermark-aligned channels across all channels
//从所有的inputChannels的watermark里找到最小的的watermark
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it really is aggregated
// from some remaining aligned channel, and is also larger than the last output watermark
//如果最小的watermark大于之前发送过的watermark,则调用outputHandler进行处理
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
}
}
复制代码
上述代码的大抵实现是,当上游一个task将watermark广播到下游的所有channel(可以明白成下游所有task)之后,下游的task会更新对上游inputChannel记录状态信息中的watermark值,下游每个task都记录这上游所有task的状态值。然后下游task再从所有上游inputChannel(即上游所有task)中选出一个最小值的watermark,如果这个watermark大于最近已经发送的watermark,那么就调用outputHandler对新watermark进行处置惩罚。一般环境下,这个outputHandler就是ForwardingValveOutputHandler。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4