一文搞懂FLink WaterMark源码

打印 上一主题 下一主题

主题 883|帖子 883|积分 2649

1. waterMark 源码分析

flink的watermark(水位线)是针对EventTime的盘算,它用来标识应该何时触发窗口盘算,随着数据不停地进入flink步伐,事件的EventTime也将慢慢增大,随之watermark水位线也将逐步上涨,当watermark上涨超过窗口的结束时间,将开始触发窗口盘算。
本文重要分析一下watermark在flink内部的工作原理,包罗watermark的产生、流传、触发盘算的过程。
起首来看看WaterMark的结构,可以看到Watermark的结构非常简单,只有一个时间戳timestamp
  1. public final class Watermark implements Serializable {
  2.    public Watermark(long timestamp) {
  3.       this.timestamp = timestamp;
  4.    }
  5.    public long getTimestamp() {
  6.       return timestamp;
  7.    }
  8.    public String getFormattedTimestamp() {
  9.       return TS_FORMATTER.get().format(new Date(timestamp));
  10.    }
  11. }
复制代码
1.1.1 设置 waterMark

  1. source.assignTimestampsAndWatermarks(WatermarkStrategy
  2.         .forBoundedOutOfOrderness(Duration.ofSeconds(5))
  3.         .withTimestampAssigner());
复制代码
起首来看assignTimestampsAndWatermarks()方法:
  1. public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
  2.       WatermarkStrategy<T> watermarkStrategy) {
  3.    final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
  4.    final TimestampsAndWatermarksOperator<T> operator =
  5.       new TimestampsAndWatermarksOperator<>(cleanedStrategy);
  6.    // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship and chain
  7.    final int inputParallelism = getTransformation().getParallelism();
  8.    return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
  9.       .setParallelism(inputParallelism);
  10. }
复制代码
可以看到assignTimestampsAndWatermarks()会产生一个新的DataStream,同时会创建一个TimestampsAndWatermarksOperator,看来这个方法和普通的map、filter算子是类似的。
下面附上该代码的执行筹划图,可以看到TimestampsAndWatermarksOperator就是一个普通的operator,可以和filter/map operator进行chain。
1.1.2 打开WatermarksOperator

在Task启动时,会调用operator.open()方法,看看TimestampsAndWatermarksOperator的open方法干了啥
  1. public class TimestampsAndWatermarksOperator<T>
  2.       extends AbstractStreamOperator<T>
  3.       implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
  4.    private static final long serialVersionUID = 1L;
  5.    private final WatermarkStrategy<T> watermarkStrategy;
  6.    /** The timestamp assigner. */
  7.    private transient TimestampAssigner<T> timestampAssigner;
  8.    /** The watermark generator, initialized during runtime. */
  9.    private transient WatermarkGenerator<T> watermarkGenerator;
  10.    /** The watermark output gateway, initialized during runtime. */
  11.    private transient WatermarkOutput wmOutput;
  12.    /** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */
  13.    private transient long watermarkInterval;
  14.    public TimestampsAndWatermarksOperator(
  15.          WatermarkStrategy<T> watermarkStrategy) {
  16.       this.watermarkStrategy = checkNotNull(watermarkStrategy);
  17.       this.chainingStrategy = ChainingStrategy.ALWAYS;
  18.    }
  19.    @Override
  20.    public void open() throws Exception {
  21.       super.open();
  22.       timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
  23.       watermarkGenerator = watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
  24.       // 实例化 WatermarkEmitter 发射器
  25.       wmOutput = new WatermarkEmitter(output, getContainingTask().getStreamStatusMaintainer());
  26.       // 从配置中获取到 waterMark 生产间隔
  27.       watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
  28.       if (watermarkInterval > 0) {
  29.          // 根据处理时间注册一个 waterMark 生产间隔闹钟
  30.          final long now = getProcessingTimeService().getCurrentProcessingTime();
  31.          getProcessingTimeService().registerTimer(now + watermarkInterval, this);
  32.       }
  33.    }
  34.    // 每个数据过来都会调用这个方法
  35.    @Override
  36.    public void processElement(final StreamRecord<T> element) throws Exception {
  37.       final T event = element.getValue();
  38.       final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
  39.       final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
  40.       element.setTimestamp(newTimestamp);
  41.       output.collect(element);
  42.       // 这里生成 waterMark
  43.       watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
  44.    }
  45.    @Override
  46.    public void onProcessingTime(long timestamp) throws Exception {
  47.       watermarkGenerator.onPeriodicEmit(wmOutput);
  48.       // 注册闹钟
  49.       final long now = getProcessingTimeService().getCurrentProcessingTime();
  50.       getProcessingTimeService().registerTimer(now + watermarkInterval, this);
  51.    }
  52.    /**
  53.     * Override the base implementation to completely ignore watermarks propagated from
  54.     * upstream, except for the "end of time" watermark.
  55.     */
  56.    @Override
  57.    public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
  58.       // if we receive a Long.MAX_VALUE watermark we forward it since it is used
  59.       // to signal the end of input and to not block watermark progress downstream
  60.       if (mark.getTimestamp() == Long.MAX_VALUE) {
  61.          wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
  62.       }
  63.    }
  64.    @Override
  65.    public void close() throws Exception {
  66.       super.close();
  67.       watermarkGenerator.onPeriodicEmit(wmOutput);
  68.    }
  69.    // ------------------------------------------------------------------------
  70.    /**
  71.     * Implementation of the {@code WatermarkEmitter}, based on the components
  72.     * that are available inside a stream operator.
  73.     */
  74.     // Watermark 发送器
  75.    private static final class WatermarkEmitter implements WatermarkOutput {
  76.       private final Output<?> output;
  77.       private final StreamStatusMaintainer statusMaintainer;
  78.       // 当前的 WaterMark
  79.       private long currentWatermark;
  80.       private boolean idle;
  81.       WatermarkEmitter(Output<?> output, StreamStatusMaintainer statusMaintainer) {
  82.          this.output = output;
  83.          this.statusMaintainer = statusMaintainer;
  84.          this.currentWatermark = Long.MIN_VALUE;
  85.       }
  86.       @Override
  87.       public void emitWatermark(Watermark watermark) {
  88.          final long ts = watermark.getTimestamp();
  89.          if (ts <= currentWatermark) {
  90.             return;
  91.          }
  92.          currentWatermark = ts;
  93.          if (idle) {
  94.             idle = false;
  95.             statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
  96.          }
  97.          // 将 waterMark 发送到下游
  98.          output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
  99.       }
  100.       @Override
  101.       public void markIdle() {
  102.          idle = true;
  103.          statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
  104.       }
  105.    }
  106. }
复制代码
每条数据来的时间,应该获取到最大的 timeStamp 作为 waterMark
  1. public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
  2.    /** The maximum timestamp encountered so far. */
  3.    private long maxTimestamp;
  4.    /** The maximum out-of-orderness that this watermark generator assumes. */
  5.    private final long outOfOrdernessMillis;
  6.    /**
  7.     * Creates a new watermark generator with the given out-of-orderness bound.
  8.     *
  9.     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
  10.     */
  11.    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
  12.       checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
  13.       checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
  14.       this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
  15.       // start so that our lowest watermark would be Long.MIN_VALUE.
  16.       this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
  17.    }
  18.    // ------------------------------------------------------------------------
  19.    @Override
  20.    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
  21.       // 获取最大的 timeStamp作为 WaterMark
  22.       maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
  23.    }
  24.    @Override
  25.    public void onPeriodicEmit(WatermarkOutput output) {
  26.       // 周期性将 WaterMark 发送出去
  27.       output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
  28.    }
  29. }
复制代码
可以看到open()方法就是注册了一个定时任务,这个定时任务触发的间隔时间就是在步伐里设置的
setAutoWatermarkInterval(interval)这个值,默认是200ms,到达时间之后将会触发target.onProcessingTime(timestamp)
  1. // 当闹钟时间到达之后调用这个方法
  2. public void onProcessingTime(long timestamp) throws Exception {
  3.       // 会调用这个方法
  4.       watermarkGenerator.onPeriodicEmit(wmOutput);
  5.       final long now = getProcessingTimeService().getCurrentProcessingTime();
  6.       // 注册闹钟
  7.       getProcessingTimeService().registerTimer(now + watermarkInterval, this);
  8.    }
复制代码
终极会调用到WatermarkEmitter发射器的 emitWatermark 方法,会通过 output.emitWatermark 讲当前的 WaterMark 发送出去
  1. @Override
  2. public void emitWatermark(Watermark watermark) {
  3.    final long ts = watermark.getTimestamp();
  4.    if (ts <= currentWatermark) {
  5.       return;
  6.    }
  7.    currentWatermark = ts;
  8.    if (idle) {
  9.       idle = false;
  10.       statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
  11.    }
  12.    output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
  13. }
复制代码
1.1.3 发送 WaterMark

  1. //CountingOutput类
  2. public void emitWatermark(Watermark mark) {
  3.    output.emitWatermark(mark);
  4. }
  5. //ChainingOutput类
  6. public void emitWatermark(Watermark mark) {
  7.    try {
  8.       // 设置当前 WaterMark
  9.       watermarkGauge.setCurrentWatermark(mark.getTimestamp());
  10.       if (streamStatusProvider.getStreamStatus().isActive()) {
  11.          // 处理 WaterMark
  12.          operator.processWatermark(mark);
  13.       }
  14.    }
  15.    catch (Exception e) {
  16.       throw new ExceptionInChainedOperatorException(e);
  17.    }
  18. }
  19. //AbstractStreamOperator类
  20. public void processWatermark(Watermark mark) throws Exception {
  21.     //mapOperator中timeServiceManager == null
  22.    if (timeServiceManager != null) {
  23.       timeServiceManager.advanceWatermark(mark);
  24.    }
  25.    output.emitWatermark(mark);
  26. }
  27. //CountingOutput类
  28. public void emitWatermark(Watermark mark) {
  29.    output.emitWatermark(mark);
  30. }
  31. //RecordWriterOutput类
  32. public void emitWatermark(Watermark mark) {
  33.    watermarkGauge.setCurrentWatermark(mark.getTimestamp());
  34.    serializationDelegate.setInstance(mark);
  35.    if (streamStatusProvider.getStreamStatus().isActive()) {
  36.       try {
  37.          // 将 waterMark 信息广播发送
  38.          recordWriter.broadcastEmit(serializationDelegate);
  39.       } catch (Exception e) {
  40.          throw new RuntimeException(e.getMessage(), e);
  41.       }
  42.    }
  43. }
  44. //RecordWriter类
  45. public void broadcastEmit(T record) throws IOException, InterruptedException {
  46.    checkErroneous();
  47.    serializer.serializeRecord(record);
  48.    boolean pruneAfterCopying = false;
  49.    //这里是将watermark广播出去,每一个下游通道都要发送,也就是说会发送到下游的每一个task
  50.    for (int channel : broadcastChannels) {
  51.       if (copyFromSerializerToTargetChannel(channel)) {
  52.          pruneAfterCopying = true;
  53.       }
  54.    }
  55.    // Make sure we don't hold onto the large intermediate serialization buffer for too long
  56.    if (pruneAfterCopying) {
  57.       serializer.prune();
  58.    }
  59. }
复制代码
上述代码说明白watermark在本例中的代码调用过程。
在本例中,watermarkOperator和map是chain在一起的,watermarkOperator会将watermark直接通报给map,mapOperator会调用processWatermark()方法来处置惩罚watermark,在mapOperator中timeServiceManager == null,所以mapOperator对watermark不做任务处置惩罚,而是直接将其送达出去。
mapOperator持有的Output是RecordWriterOutput,RecordWriterOutput它会通过RecordWriter将watermark广播到下游的所有通道,即发送给下游的所有task。也就是说,上游的一个task在更新了watermark之后,会将watermark广播给下游的所有task。
3.1.4 下游Task接收并处置惩罚watermark

下游task是一个OneInputStreamTask,通过数据处置惩罚器StreamInputProcessor.processInput()来处置惩罚接收到的数据信息
  1. @Override
  2. public InputStatus processInput() throws Exception {
  3.    // 发送下一个
  4.    InputStatus status = input.emitNext(output);
  5.    if (status == InputStatus.END_OF_INPUT) {
  6.       operatorChain.endHeadOperatorInput(1);
  7.    }
  8.    return status;
  9. }
  10. // StreamTaskNetworkInput 类
  11. @Override
  12. public InputStatus emitNext(DataOutput<T> output) throws Exception {
  13.    while (true) {
  14.       // get the stream element from the deserializer
  15.       if (currentRecordDeserializer != null) {
  16.          DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
  17.          if (result.isBufferConsumed()) {
  18.             currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
  19.             currentRecordDeserializer = null;
  20.          }
  21.          if (result.isFullRecord()) {
  22.             // 处理数据
  23.             processElement(deserializationDelegate.getInstance(), output);
  24.             return InputStatus.MORE_AVAILABLE;
  25.          }
  26.       }
  27.       Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
  28.       if (bufferOrEvent.isPresent()) {
  29.          // return to the mailbox after receiving a checkpoint barrier to avoid processing of
  30.          // data after the barrier before checkpoint is performed for unaligned checkpoint mode
  31.          if (bufferOrEvent.get().isEvent() && bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
  32.             return InputStatus.MORE_AVAILABLE;
  33.          }
  34.          processBufferOrEvent(bufferOrEvent.get());
  35.       } else {
  36.          if (checkpointedInputGate.isFinished()) {
  37.             checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
  38.             return InputStatus.END_OF_INPUT;
  39.          }
  40.          return InputStatus.NOTHING_AVAILABLE;
  41.       }
  42.    }
  43. }
  44. private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
  45.    // 如果是一个数据,那么通过 output 往下游发送数据
  46.    if (recordOrMark.isRecord()){
  47.       output.emitRecord(recordOrMark.asRecord());
  48.    }
  49.    // 如果是一个 waterMark,那么通过 output 往下游发送 waterMark
  50.    else if (recordOrMark.isWatermark()) {
  51.       statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
  52.    } else if (recordOrMark.isLatencyMarker()) {
  53.       output.emitLatencyMarker(recordOrMark.asLatencyMarker());
  54.    } else if (recordOrMark.isStreamStatus()) {
  55.       statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
  56.    } else {
  57.       throw new UnsupportedOperationException("Unknown type of StreamElement");
  58.    }
  59. }
复制代码
StreamInputProcessor会调用statusWatermarkValve.inputWatermark()来处置惩罚接收到watermark信息。看下代码:
  1. public void inputWatermark(Watermark watermark, int channelIndex) {
  2.    // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
  3.    if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
  4.       long watermarkMillis = watermark.getTimestamp();
  5.       // if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
  6.       //channelStatuses是当前task对每个inputChannel(也就是每个上游task)的状态信息记录,
  7.       //当新的watermark值大于inputChannel的watermark,就会进行调整
  8.       if (watermarkMillis > channelStatuses[channelIndex].watermark) {
  9.          channelStatuses[channelIndex].watermark = watermarkMillis;
  10.          // previously unaligned input channels are now aligned if its watermark has caught up
  11.          if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
  12.             channelStatuses[channelIndex].isWatermarkAligned = true;
  13.          }
  14.          // now, attempt to find a new min watermark across all aligned channels
  15.          //从各个inputChannel的watermark里找到最小的的watermark进行处理
  16.          findAndOutputNewMinWatermarkAcrossAlignedChannels();
  17.       }
  18.    }
  19. }
  20. private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
  21.    long newMinWatermark = Long.MAX_VALUE;
  22.    boolean hasAlignedChannels = false;
  23.    // determine new overall watermark by considering only watermark-aligned channels across all channels
  24.    //从所有的inputChannels的watermark里找到最小的的watermark
  25.    for (InputChannelStatus channelStatus : channelStatuses) {
  26.       if (channelStatus.isWatermarkAligned) {
  27.          hasAlignedChannels = true;
  28.          newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
  29.       }
  30.    }
  31.    // we acknowledge and output the new overall watermark if it really is aggregated
  32.    // from some remaining aligned channel, and is also larger than the last output watermark
  33.    //如果最小的watermark大于之前发送过的watermark,则调用outputHandler进行处理
  34.    if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
  35.       lastOutputWatermark = newMinWatermark;
  36.       outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
  37.    }
  38. }
复制代码
上述代码的大抵实现是,当上游一个task将watermark广播到下游的所有channel(可以明白成下游所有task)之后,下游的task会更新对上游inputChannel记录状态信息中的watermark值,下游每个task都记录这上游所有task的状态值。然后下游task再从所有上游inputChannel(即上游所有task)中选出一个最小值的watermark,如果这个watermark大于最近已经发送的watermark,那么就调用outputHandler对新watermark进行处置惩罚。一般环境下,这个outputHandler就是ForwardingValveOutputHandler。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

九天猎人

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表