1. waterMark 源码分析
- public final class Watermark implements Serializable {
- public Watermark(long timestamp) {
- this.timestamp = timestamp;
- }
- public long getTimestamp() {
- return timestamp;
- }
- public String getFormattedTimestamp() {
- return TS_FORMATTER.get().format(new Date(timestamp));
- }
- }
复制代码 1.1.1 设置 waterMark
- source.assignTimestampsAndWatermarks(WatermarkStrategy
- .forBoundedOutOfOrderness(Duration.ofSeconds(5))
- .withTimestampAssigner());
复制代码 起首来看assignTimestampsAndWatermarks()方法:
- public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
- WatermarkStrategy<T> watermarkStrategy) {
- final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
- final TimestampsAndWatermarksOperator<T> operator =
- new TimestampsAndWatermarksOperator<>(cleanedStrategy);
- // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship and chain
- final int inputParallelism = getTransformation().getParallelism();
- return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
- .setParallelism(inputParallelism);
- }
复制代码 可以看到assignTimestampsAndWatermarks()会产生一个新的DataStream,同时会创建一个TimestampsAndWatermarksOperator,看来这个方法和普通的map、filter算子是类似的。
下面附上该代码的执行筹划图,可以看到TimestampsAndWatermarksOperator就是一个普通的operator,可以和filter/map operator进行chain。
1.1.2 打开WatermarksOperator
- public class TimestampsAndWatermarksOperator<T>
- extends AbstractStreamOperator<T>
- implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
- private static final long serialVersionUID = 1L;
- private final WatermarkStrategy<T> watermarkStrategy;
- /** The timestamp assigner. */
- private transient TimestampAssigner<T> timestampAssigner;
- /** The watermark generator, initialized during runtime. */
- private transient WatermarkGenerator<T> watermarkGenerator;
- /** The watermark output gateway, initialized during runtime. */
- private transient WatermarkOutput wmOutput;
- /** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */
- private transient long watermarkInterval;
- public TimestampsAndWatermarksOperator(
- WatermarkStrategy<T> watermarkStrategy) {
- this.watermarkStrategy = checkNotNull(watermarkStrategy);
- this.chainingStrategy = ChainingStrategy.ALWAYS;
- }
- @Override
- public void open() throws Exception {
- super.open();
- timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
- watermarkGenerator = watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
- // 实例化 WatermarkEmitter 发射器
- wmOutput = new WatermarkEmitter(output, getContainingTask().getStreamStatusMaintainer());
- // 从配置中获取到 waterMark 生产间隔
- watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
- if (watermarkInterval > 0) {
- // 根据处理时间注册一个 waterMark 生产间隔闹钟
- final long now = getProcessingTimeService().getCurrentProcessingTime();
- getProcessingTimeService().registerTimer(now + watermarkInterval, this);
- }
- }
- // 每个数据过来都会调用这个方法
- @Override
- public void processElement(final StreamRecord<T> element) throws Exception {
- final T event = element.getValue();
- final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
- final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
- element.setTimestamp(newTimestamp);
- output.collect(element);
- // 这里生成 waterMark
- watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
- }
- @Override
- public void onProcessingTime(long timestamp) throws Exception {
- watermarkGenerator.onPeriodicEmit(wmOutput);
- // 注册闹钟
- final long now = getProcessingTimeService().getCurrentProcessingTime();
- getProcessingTimeService().registerTimer(now + watermarkInterval, this);
- }
- /**
- * Override the base implementation to completely ignore watermarks propagated from
- * upstream, except for the "end of time" watermark.
- */
- @Override
- public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
- // if we receive a Long.MAX_VALUE watermark we forward it since it is used
- // to signal the end of input and to not block watermark progress downstream
- if (mark.getTimestamp() == Long.MAX_VALUE) {
- wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
- }
- }
- @Override
- public void close() throws Exception {
- super.close();
- watermarkGenerator.onPeriodicEmit(wmOutput);
- }
- // ------------------------------------------------------------------------
- /**
- * Implementation of the {@code WatermarkEmitter}, based on the components
- * that are available inside a stream operator.
- */
- // Watermark 发送器
- private static final class WatermarkEmitter implements WatermarkOutput {
- private final Output<?> output;
- private final StreamStatusMaintainer statusMaintainer;
- // 当前的 WaterMark
- private long currentWatermark;
- private boolean idle;
- WatermarkEmitter(Output<?> output, StreamStatusMaintainer statusMaintainer) {
- this.output = output;
- this.statusMaintainer = statusMaintainer;
- this.currentWatermark = Long.MIN_VALUE;
- }
- @Override
- public void emitWatermark(Watermark watermark) {
- final long ts = watermark.getTimestamp();
- if (ts <= currentWatermark) {
- return;
- }
- currentWatermark = ts;
- if (idle) {
- idle = false;
- statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
- }
- // 将 waterMark 发送到下游
- output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
- }
- @Override
- public void markIdle() {
- idle = true;
- statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
- }
- }
- }
复制代码 每条数据来的时间,应该获取到最大的 timeStamp 作为 waterMark
- public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
- /** The maximum timestamp encountered so far. */
- private long maxTimestamp;
- /** The maximum out-of-orderness that this watermark generator assumes. */
- private final long outOfOrdernessMillis;
- /**
- * Creates a new watermark generator with the given out-of-orderness bound.
- *
- * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
- */
- public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
- checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
- checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
- this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
- // start so that our lowest watermark would be Long.MIN_VALUE.
- this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
- }
- // ------------------------------------------------------------------------
- @Override
- public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
- // 获取最大的 timeStamp作为 WaterMark
- maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
- }
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- // 周期性将 WaterMark 发送出去
- output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
- }
- }
复制代码 可以看到open()方法就是注册了一个定时任务,这个定时任务触发的间隔时间就是在步伐里设置的
- // 当闹钟时间到达之后调用这个方法
- public void onProcessingTime(long timestamp) throws Exception {
- // 会调用这个方法
- watermarkGenerator.onPeriodicEmit(wmOutput);
- final long now = getProcessingTimeService().getCurrentProcessingTime();
- // 注册闹钟
- getProcessingTimeService().registerTimer(now + watermarkInterval, this);
- }
复制代码 终极会调用到WatermarkEmitter发射器的 emitWatermark 方法,会通过 output.emitWatermark 讲当前的 WaterMark 发送出去
- @Override
- public void emitWatermark(Watermark watermark) {
- final long ts = watermark.getTimestamp();
- if (ts <= currentWatermark) {
- return;
- }
- currentWatermark = ts;
- if (idle) {
- idle = false;
- statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
- }
- output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
- }
复制代码 1.1.3 发送 WaterMark
- //CountingOutput类
- public void emitWatermark(Watermark mark) {
- output.emitWatermark(mark);
- }
- //ChainingOutput类
- public void emitWatermark(Watermark mark) {
- try {
- // 设置当前 WaterMark
- watermarkGauge.setCurrentWatermark(mark.getTimestamp());
- if (streamStatusProvider.getStreamStatus().isActive()) {
- // 处理 WaterMark
- operator.processWatermark(mark);
- }
- }
- catch (Exception e) {
- throw new ExceptionInChainedOperatorException(e);
- }
- }
- //AbstractStreamOperator类
- public void processWatermark(Watermark mark) throws Exception {
- //mapOperator中timeServiceManager == null
- if (timeServiceManager != null) {
- timeServiceManager.advanceWatermark(mark);
- }
- output.emitWatermark(mark);
- }
- //CountingOutput类
- public void emitWatermark(Watermark mark) {
- output.emitWatermark(mark);
- }
- //RecordWriterOutput类
- public void emitWatermark(Watermark mark) {
- watermarkGauge.setCurrentWatermark(mark.getTimestamp());
- serializationDelegate.setInstance(mark);
- if (streamStatusProvider.getStreamStatus().isActive()) {
- try {
- // 将 waterMark 信息广播发送
- recordWriter.broadcastEmit(serializationDelegate);
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
- }
- //RecordWriter类
- public void broadcastEmit(T record) throws IOException, InterruptedException {
- checkErroneous();
- serializer.serializeRecord(record);
- boolean pruneAfterCopying = false;
- //这里是将watermark广播出去,每一个下游通道都要发送,也就是说会发送到下游的每一个task
- for (int channel : broadcastChannels) {
- if (copyFromSerializerToTargetChannel(channel)) {
- pruneAfterCopying = true;
- }
- }
- // Make sure we don't hold onto the large intermediate serialization buffer for too long
- if (pruneAfterCopying) {
- serializer.prune();
- }
- }
复制代码 上述代码说明白watermark在本例中的代码调用过程。
在本例中,watermarkOperator和map是chain在一起的,watermarkOperator会将watermark直接通报给map,mapOperator会调用processWatermark()方法来处置惩罚watermark,在mapOperator中timeServiceManager == null,所以mapOperator对watermark不做任务处置惩罚,而是直接将其送达出去。
3.1.4 下游Task接收并处置惩罚watermark
- @Override
- public InputStatus processInput() throws Exception {
- // 发送下一个
- InputStatus status = input.emitNext(output);
- if (status == InputStatus.END_OF_INPUT) {
- operatorChain.endHeadOperatorInput(1);
- }
- return status;
- }
- // StreamTaskNetworkInput 类
- @Override
- public InputStatus emitNext(DataOutput<T> output) throws Exception {
- while (true) {
- // get the stream element from the deserializer
- if (currentRecordDeserializer != null) {
- DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
- if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
- currentRecordDeserializer = null;
- }
- if (result.isFullRecord()) {
- // 处理数据
- processElement(deserializationDelegate.getInstance(), output);
- return InputStatus.MORE_AVAILABLE;
- }
- }
- Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
- if (bufferOrEvent.isPresent()) {
- // return to the mailbox after receiving a checkpoint barrier to avoid processing of
- // data after the barrier before checkpoint is performed for unaligned checkpoint mode
- if (bufferOrEvent.get().isEvent() && bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
- return InputStatus.MORE_AVAILABLE;
- }
- processBufferOrEvent(bufferOrEvent.get());
- } else {
- if (checkpointedInputGate.isFinished()) {
- checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
- return InputStatus.END_OF_INPUT;
- }
- return InputStatus.NOTHING_AVAILABLE;
- }
- }
- }
- private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
- // 如果是一个数据,那么通过 output 往下游发送数据
- if (recordOrMark.isRecord()){
- output.emitRecord(recordOrMark.asRecord());
- }
- // 如果是一个 waterMark,那么通过 output 往下游发送 waterMark
- else if (recordOrMark.isWatermark()) {
- statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
- } else if (recordOrMark.isLatencyMarker()) {
- output.emitLatencyMarker(recordOrMark.asLatencyMarker());
- } else if (recordOrMark.isStreamStatus()) {
- statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
- } else {
- throw new UnsupportedOperationException("Unknown type of StreamElement");
- }
- }
复制代码 StreamInputProcessor会调用statusWatermarkValve.inputWatermark()来处置惩罚接收到watermark信息。看下代码:
- public void inputWatermark(Watermark watermark, int channelIndex) {
- // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
- if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
- long watermarkMillis = watermark.getTimestamp();
- // if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
- //channelStatuses是当前task对每个inputChannel(也就是每个上游task)的状态信息记录,
- //当新的watermark值大于inputChannel的watermark,就会进行调整
- if (watermarkMillis > channelStatuses[channelIndex].watermark) {
- channelStatuses[channelIndex].watermark = watermarkMillis;
- // previously unaligned input channels are now aligned if its watermark has caught up
- if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
- channelStatuses[channelIndex].isWatermarkAligned = true;
- }
- // now, attempt to find a new min watermark across all aligned channels
- //从各个inputChannel的watermark里找到最小的的watermark进行处理
- findAndOutputNewMinWatermarkAcrossAlignedChannels();
- }
- }
- }
- private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
- long newMinWatermark = Long.MAX_VALUE;
- boolean hasAlignedChannels = false;
- // determine new overall watermark by considering only watermark-aligned channels across all channels
- //从所有的inputChannels的watermark里找到最小的的watermark
- for (InputChannelStatus channelStatus : channelStatuses) {
- if (channelStatus.isWatermarkAligned) {
- hasAlignedChannels = true;
- newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
- }
- }
- // we acknowledge and output the new overall watermark if it really is aggregated
- // from some remaining aligned channel, and is also larger than the last output watermark
- //如果最小的watermark大于之前发送过的watermark,则调用outputHandler进行处理
- if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
- lastOutputWatermark = newMinWatermark;
- outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
- }
- }
复制代码 上述代码的大抵实现是,当上游一个task将watermark广播到下游的所有channel(可以明白成下游所有task)之后,下游的task会更新对上游inputChannel记录状态信息中的watermark值,下游每个task都记录这上游所有task的状态值。然后下游task再从所有上游inputChannel(即上游所有task)中选出一个最小值的watermark,如果这个watermark大于最近已经发送的watermark,那么就调用outputHandler对新watermark进行处置惩罚。一般环境下,这个outputHandler就是ForwardingValveOutputHandler。
