Flink 窗口触发器(Trigger)(一)

打印 上一主题 下一主题

主题 572|帖子 572|积分 1720

Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(二)
Flink的窗口触发器(Trigger)是流处置惩罚中一个非常关键的概念,它界说了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清算)。
一、基本概念



  • 界说:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。
  • 作用:控制窗口数据的聚合机会,确保数据在得当的时间点被处置惩罚和输出。

二、类型

Flink提供了多种内置的触发器,以下几种为常用类型:

  • EventTimeTrigger


  • 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于便是当前的水印时,立即触发窗口计算。
  • 适用场景:适用于须要基于事件时间进行处置惩罚的场景,如金融生意业务、日记分析等。

  • ProcessingTimeTrigger


  • 工作原理:基于处置惩罚时间(即机器的系统时间)来触发窗口计算。当处置惩罚时间达到窗口的竣事时间时,触发窗口计算。
  • 适用场景:适用于对时间精度要求不高的场景,或者当事件时间无法正确获取时。

  • CountTrigger


  • 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
  • 适用场景:适用于须要基于数据量进行处置惩罚的场景,如批量数据处置惩罚、流量分析等。

  • ContinuousEventTimeTriggerContinuousProcessingTimeTrigger


  • 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的竣事时间小于当前的时间(事件时间或处置惩罚时间)时触发计算。
  • 适用场景:适用于须要周期性处置惩罚数据的场景,如实时监控、周期性报表等。

  • DeltaTrigger


  • 工作原理:根据接入数据计算出的Delta指标是否凌驾指定的阈值来触发窗口计算。
  • 适用场景:适用于须要基于数据变化量进行处置惩罚的场景,如异常检测、趋势分析等。

  • PurgingTrigger


  • 工作原理:将其他触发器作为参数转换为Purge类型的触发器,在触发计算后清除窗口内的数据。
  • 适用场景:适用于须要在计算完成后立即清除窗口数据的场景,以节流存储空间。
三、关键方法

触发器通常包含以下几个关键方法:

  • onElement(T element, long timestamp, W window, TriggerContext ctx)
    当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
  • onEventTime(long time, W window, TriggerContext ctx)
    当事件时间计时器触发时调用,用于处置惩罚事件时间相关的触发逻辑。
  • onProcessingTime(long time, W window, TriggerContext ctx)
    当处置惩罚时间计时器触发时调用,用于处置惩罚处置惩罚时间相关的触发逻辑。
  • onMerge(W window, OnMergeContext ctx)(可选)
    当两个窗口合并时调用,用于合并窗口的状态和定时器。
  • clear(W window, TriggerContext ctx)
    当窗口被删除时调用,用于清算窗口的状态和定时器。
四、行为

触发器在触发时会返回一个TriggerResult枚举值,以决定窗口的后续行为。常见的TriggerResult值包括:


  • CONTINUE:表现不进行任何操纵,等候下一个触发条件。
  • FIRE:表现触发窗口计算并输出结果,但窗口状态保持稳固。
  • PURGE:表现不触发窗口计算,只清除窗口内的数据和状态。
  • FIRE_AND_PURGE:表现触发窗口计算并输出结果,然后清除窗口内的数据和状态。
   Flink的窗口触发器是流处置惩罚中非常机动且强盛的工具,它答应开发者根据现实需求界说窗口的触发条件和触发后的行为。通过选择合适的触发器和配置相应的参数,可以实现高效、正确的流数据处置惩罚。
  五、Trigger

EventTimeTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  8. @PublicEvolving
  9. public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
  10.     private static final long serialVersionUID = 1L;
  11.     private EventTimeTrigger() {
  12.     }
  13.     public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
  14.         if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
  15.             return TriggerResult.FIRE;
  16.         } else {
  17.             ctx.registerEventTimeTimer(window.maxTimestamp());
  18.             return TriggerResult.CONTINUE;
  19.         }
  20.     }
  21.     public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
  22.         return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
  23.     }
  24.     public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
  25.         return TriggerResult.CONTINUE;
  26.     }
  27.     public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
  28.         ctx.deleteEventTimeTimer(window.maxTimestamp());
  29.     }
  30.     public boolean canMerge() {
  31.         return true;
  32.     }
  33.     public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
  34.         long windowMaxTimestamp = window.maxTimestamp();
  35.         if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
  36.             ctx.registerEventTimeTimer(windowMaxTimestamp);
  37.         }
  38.     }
  39.     public String toString() {
  40.         return "EventTimeTrigger()";
  41.     }
  42.     public static EventTimeTrigger create() {
  43.         return new EventTimeTrigger();
  44.     }
  45. }
复制代码
ProcessingTimeTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  8. @PublicEvolving
  9. public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
  10.     private static final long serialVersionUID = 1L;
  11.     private ProcessingTimeTrigger() {
  12.     }
  13.     public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {
  14.         ctx.registerProcessingTimeTimer(window.maxTimestamp());
  15.         return TriggerResult.CONTINUE;
  16.     }
  17.     public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
  18.         return TriggerResult.CONTINUE;
  19.     }
  20.     public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
  21.         return TriggerResult.FIRE;
  22.     }
  23.     public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
  24.         ctx.deleteProcessingTimeTimer(window.maxTimestamp());
  25.     }
  26.     public boolean canMerge() {
  27.         return true;
  28.     }
  29.     public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
  30.         long windowMaxTimestamp = window.maxTimestamp();
  31.         if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
  32.             ctx.registerProcessingTimeTimer(windowMaxTimestamp);
  33.         }
  34.     }
  35.     public String toString() {
  36.         return "ProcessingTimeTrigger()";
  37.     }
  38.     public static ProcessingTimeTrigger create() {
  39.         return new ProcessingTimeTrigger();
  40.     }
  41. }
复制代码
ProcessingTimeoutTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import java.time.Duration;
  7. import org.apache.flink.annotation.PublicEvolving;
  8. import org.apache.flink.api.common.state.ValueState;
  9. import org.apache.flink.api.common.state.ValueStateDescriptor;
  10. import org.apache.flink.api.common.typeutils.base.LongSerializer;
  11. import org.apache.flink.streaming.api.windowing.windows.Window;
  12. @PublicEvolving
  13. public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
  14.     private static final long serialVersionUID = 1L;
  15.     private final Trigger<T, W> nestedTrigger;
  16.     private final long interval;
  17.     private final boolean resetTimerOnNewRecord;
  18.     private final boolean shouldClearOnTimeout;
  19.     private final ValueStateDescriptor<Long> timeoutStateDesc;
  20.     private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
  21.         this.nestedTrigger = nestedTrigger;
  22.         this.interval = interval;
  23.         this.resetTimerOnNewRecord = resetTimerOnNewRecord;
  24.         this.shouldClearOnTimeout = shouldClearOnTimeout;
  25.         this.timeoutStateDesc = new ValueStateDescriptor("timeout", LongSerializer.INSTANCE);
  26.     }
  27.     public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  28.         TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
  29.         if (triggerResult.isFire()) {
  30.             this.clear(window, ctx);
  31.             return triggerResult;
  32.         } else {
  33.             ValueState<Long> timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
  34.             long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
  35.             Long timeoutTimestamp = (Long)timeoutState.value();
  36.             if (timeoutTimestamp != null && this.resetTimerOnNewRecord) {
  37.                 ctx.deleteProcessingTimeTimer(timeoutTimestamp);
  38.                 timeoutState.clear();
  39.                 timeoutTimestamp = null;
  40.             }
  41.             if (timeoutTimestamp == null) {
  42.                 timeoutState.update(nextFireTimestamp);
  43.                 ctx.registerProcessingTimeTimer(nextFireTimestamp);
  44.             }
  45.             return triggerResult;
  46.         }
  47.     }
  48.     public TriggerResult onProcessingTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  49.         TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
  50.         if (this.shouldClearOnTimeout) {
  51.             this.clear(window, ctx);
  52.         }
  53.         return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
  54.     }
  55.     public TriggerResult onEventTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  56.         TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
  57.         if (this.shouldClearOnTimeout) {
  58.             this.clear(window, ctx);
  59.         }
  60.         return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
  61.     }
  62.     public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
  63.         ValueState<Long> timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
  64.         Long timeoutTimestamp = (Long)timeoutTimestampState.value();
  65.         if (timeoutTimestamp != null) {
  66.             ctx.deleteProcessingTimeTimer(timeoutTimestamp);
  67.             timeoutTimestampState.clear();
  68.         }
  69.         this.nestedTrigger.clear(window, ctx);
  70.     }
  71.     public String toString() {
  72.         return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
  73.     }
  74.     public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout) {
  75.         return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), false, true);
  76.     }
  77.     public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
  78.         return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
  79.     }
  80. }
复制代码
CountTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.api.common.functions.ReduceFunction;
  8. import org.apache.flink.api.common.state.ReducingState;
  9. import org.apache.flink.api.common.state.ReducingStateDescriptor;
  10. import org.apache.flink.api.common.typeutils.base.LongSerializer;
  11. import org.apache.flink.streaming.api.windowing.windows.Window;
  12. @PublicEvolving
  13. public class CountTrigger<W extends Window> extends Trigger<Object, W> {
  14.     private static final long serialVersionUID = 1L;
  15.     private final long maxCount;
  16.     private final ReducingStateDescriptor<Long> stateDesc;
  17.     private CountTrigger(long maxCount) {
  18.         this.stateDesc = new ReducingStateDescriptor("count", new Sum(), LongSerializer.INSTANCE);
  19.         this.maxCount = maxCount;
  20.     }
  21.     public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  22.         ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  23.         count.add(1L);
  24.         if ((Long)count.get() >= this.maxCount) {
  25.             count.clear();
  26.             return TriggerResult.FIRE;
  27.         } else {
  28.             return TriggerResult.CONTINUE;
  29.         }
  30.     }
  31.     public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
  32.         return TriggerResult.CONTINUE;
  33.     }
  34.     public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  35.         return TriggerResult.CONTINUE;
  36.     }
  37.     public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
  38.         ((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();
  39.     }
  40.     public boolean canMerge() {
  41.         return true;
  42.     }
  43.     public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
  44.         ctx.mergePartitionedState(this.stateDesc);
  45.     }
  46.     public String toString() {
  47.         return "CountTrigger(" + this.maxCount + ")";
  48.     }
  49.     public static <W extends Window> CountTrigger<W> of(long maxCount) {
  50.         return new CountTrigger(maxCount);
  51.     }
  52.     private static class Sum implements ReduceFunction<Long> {
  53.         private static final long serialVersionUID = 1L;
  54.         private Sum() {
  55.         }
  56.         public Long reduce(Long value1, Long value2) throws Exception {
  57.             return value1 + value2;
  58.         }
  59.     }
  60. }
复制代码
ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.annotation.VisibleForTesting;
  8. import org.apache.flink.api.common.functions.ReduceFunction;
  9. import org.apache.flink.api.common.state.ReducingState;
  10. import org.apache.flink.api.common.state.ReducingStateDescriptor;
  11. import org.apache.flink.api.common.typeutils.base.LongSerializer;
  12. import org.apache.flink.streaming.api.windowing.time.Time;
  13. import org.apache.flink.streaming.api.windowing.windows.Window;
  14. @PublicEvolving
  15. public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {
  16.     private static final long serialVersionUID = 1L;
  17.     private final long interval;
  18.     private final ReducingStateDescriptor<Long> stateDesc;
  19.     private ContinuousEventTimeTrigger(long interval) {
  20.         this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
  21.         this.interval = interval;
  22.     }
  23.     public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  24.         if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
  25.             return TriggerResult.FIRE;
  26.         } else {
  27.             ctx.registerEventTimeTimer(window.maxTimestamp());
  28.             ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  29.             if (fireTimestampState.get() == null) {
  30.                 this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
  31.             }
  32.             return TriggerResult.CONTINUE;
  33.         }
  34.     }
  35.     public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  36.         if (time == window.maxTimestamp()) {
  37.             return TriggerResult.FIRE;
  38.         } else {
  39.             ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  40.             Long fireTimestamp = (Long)fireTimestampState.get();
  41.             if (fireTimestamp != null && fireTimestamp == time) {
  42.                 fireTimestampState.clear();
  43.                 this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
  44.                 return TriggerResult.FIRE;
  45.             } else {
  46.                 return TriggerResult.CONTINUE;
  47.             }
  48.         }
  49.     }
  50.     public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  51.         return TriggerResult.CONTINUE;
  52.     }
  53.     public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
  54.         ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  55.         Long timestamp = (Long)fireTimestamp.get();
  56.         if (timestamp != null) {
  57.             ctx.deleteEventTimeTimer(timestamp);
  58.             fireTimestamp.clear();
  59.         }
  60.     }
  61.     public boolean canMerge() {
  62.         return true;
  63.     }
  64.     public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
  65.         ctx.mergePartitionedState(this.stateDesc);
  66.         Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
  67.         if (nextFireTimestamp != null) {
  68.             ctx.registerEventTimeTimer(nextFireTimestamp);
  69.         }
  70.     }
  71.     public String toString() {
  72.         return "ContinuousEventTimeTrigger(" + this.interval + ")";
  73.     }
  74.     @VisibleForTesting
  75.     public long getInterval() {
  76.         return this.interval;
  77.     }
  78.     public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
  79.         return new ContinuousEventTimeTrigger(interval.toMilliseconds());
  80.     }
  81.     private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
  82.         long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
  83.         fireTimestampState.add(nextFireTimestamp);
  84.         ctx.registerEventTimeTimer(nextFireTimestamp);
  85.     }
  86.     private static class Min implements ReduceFunction<Long> {
  87.         private static final long serialVersionUID = 1L;
  88.         private Min() {
  89.         }
  90.         public Long reduce(Long value1, Long value2) throws Exception {
  91.             return Math.min(value1, value2);
  92.         }
  93.     }
  94. }
复制代码
  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.annotation.VisibleForTesting;
  8. import org.apache.flink.api.common.functions.ReduceFunction;
  9. import org.apache.flink.api.common.state.ReducingState;
  10. import org.apache.flink.api.common.state.ReducingStateDescriptor;
  11. import org.apache.flink.api.common.typeutils.base.LongSerializer;
  12. import org.apache.flink.streaming.api.windowing.time.Time;
  13. import org.apache.flink.streaming.api.windowing.windows.Window;
  14. @PublicEvolving
  15. public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
  16.     private static final long serialVersionUID = 1L;
  17.     private final long interval;
  18.     private final ReducingStateDescriptor<Long> stateDesc;
  19.     private ContinuousProcessingTimeTrigger(long interval) {
  20.         this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
  21.         this.interval = interval;
  22.     }
  23.     public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  24.         ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  25.         timestamp = ctx.getCurrentProcessingTime();
  26.         if (fireTimestampState.get() == null) {
  27.             this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
  28.         }
  29.         return TriggerResult.CONTINUE;
  30.     }
  31.     public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  32.         return TriggerResult.CONTINUE;
  33.     }
  34.     public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  35.         ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  36.         if (((Long)fireTimestampState.get()).equals(time)) {
  37.             fireTimestampState.clear();
  38.             this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
  39.             return TriggerResult.FIRE;
  40.         } else {
  41.             return TriggerResult.CONTINUE;
  42.         }
  43.     }
  44.     public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
  45.         ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
  46.         Long timestamp = (Long)fireTimestamp.get();
  47.         if (timestamp != null) {
  48.             ctx.deleteProcessingTimeTimer(timestamp);
  49.             fireTimestamp.clear();
  50.         }
  51.     }
  52.     public boolean canMerge() {
  53.         return true;
  54.     }
  55.     public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
  56.         ctx.mergePartitionedState(this.stateDesc);
  57.         Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
  58.         if (nextFireTimestamp != null) {
  59.             ctx.registerProcessingTimeTimer(nextFireTimestamp);
  60.         }
  61.     }
  62.     @VisibleForTesting
  63.     public long getInterval() {
  64.         return this.interval;
  65.     }
  66.     public String toString() {
  67.         return "ContinuousProcessingTimeTrigger(" + this.interval + ")";
  68.     }
  69.     public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
  70.         return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());
  71.     }
  72.     private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
  73.         long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
  74.         fireTimestampState.add(nextFireTimestamp);
  75.         ctx.registerProcessingTimeTimer(nextFireTimestamp);
  76.     }
  77.     private static class Min implements ReduceFunction<Long> {
  78.         private static final long serialVersionUID = 1L;
  79.         private Min() {
  80.         }
  81.         public Long reduce(Long value1, Long value2) throws Exception {
  82.             return Math.min(value1, value2);
  83.         }
  84.     }
  85. }
复制代码
DeltaTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.api.common.state.ValueState;
  8. import org.apache.flink.api.common.state.ValueStateDescriptor;
  9. import org.apache.flink.api.common.typeutils.TypeSerializer;
  10. import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
  11. import org.apache.flink.streaming.api.windowing.windows.Window;
  12. @PublicEvolving
  13. public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
  14.     private static final long serialVersionUID = 1L;
  15.     private final DeltaFunction<T> deltaFunction;
  16.     private final double threshold;
  17.     private final ValueStateDescriptor<T> stateDesc;
  18.     private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
  19.         this.deltaFunction = deltaFunction;
  20.         this.threshold = threshold;
  21.         this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
  22.     }
  23.     public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  24.         ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
  25.         if (lastElementState.value() == null) {
  26.             lastElementState.update(element);
  27.             return TriggerResult.CONTINUE;
  28.         } else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
  29.             lastElementState.update(element);
  30.             return TriggerResult.FIRE;
  31.         } else {
  32.             return TriggerResult.CONTINUE;
  33.         }
  34.     }
  35.     public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
  36.         return TriggerResult.CONTINUE;
  37.     }
  38.     public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  39.         return TriggerResult.CONTINUE;
  40.     }
  41.     public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
  42.         ((ValueState)ctx.getPartitionedState(this.stateDesc)).clear();
  43.     }
  44.     public String toString() {
  45.         return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + ")";
  46.     }
  47.     public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
  48.         return new DeltaTrigger(threshold, deltaFunction, stateSerializer);
  49.     }
  50. }
复制代码
PurgingTrigger

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.flink.streaming.api.windowing.triggers;
  6. import org.apache.flink.annotation.PublicEvolving;
  7. import org.apache.flink.annotation.VisibleForTesting;
  8. import org.apache.flink.streaming.api.windowing.windows.Window;
  9. @PublicEvolving
  10. public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
  11.     private static final long serialVersionUID = 1L;
  12.     private Trigger<T, W> nestedTrigger;
  13.     private PurgingTrigger(Trigger<T, W> nestedTrigger) {
  14.         this.nestedTrigger = nestedTrigger;
  15.     }
  16.     public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
  17.         TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
  18.         return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
  19.     }
  20.     public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  21.         TriggerResult triggerResult = this.nestedTrigger.onEventTime(time, window, ctx);
  22.         return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
  23.     }
  24.     public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
  25.         TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(time, window, ctx);
  26.         return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
  27.     }
  28.     public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
  29.         this.nestedTrigger.clear(window, ctx);
  30.     }
  31.     public boolean canMerge() {
  32.         return this.nestedTrigger.canMerge();
  33.     }
  34.     public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
  35.         this.nestedTrigger.onMerge(window, ctx);
  36.     }
  37.     public String toString() {
  38.         return "PurgingTrigger(" + this.nestedTrigger.toString() + ")";
  39.     }
  40.     public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
  41.         return new PurgingTrigger(nestedTrigger);
  42.     }
  43.     @VisibleForTesting
  44.     public Trigger<T, W> getNestedTrigger() {
  45.         return this.nestedTrigger;
  46.     }
  47. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表