ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Flink 窗口触发器(Trigger)(一)
[打印本页]
作者:
王國慶
时间:
2024-8-28 02:09
标题:
Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(二)
Flink的窗口触发器(Trigger)是流处置惩罚中一个非常关键的概念,它界说了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清算)。
一、基本概念
界说
:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。
作用
:控制窗口数据的聚合机会,确保数据在得当的时间点被处置惩罚和输出。
二、类型
Flink提供了多种内置的触发器,以下几种为常用类型:
EventTimeTrigger
工作原理
:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于便是当前的水印时,立即触发窗口计算。
适用场景
:适用于须要基于事件时间进行处置惩罚的场景,如金融生意业务、日记分析等。
ProcessingTimeTrigger
工作原理
:基于处置惩罚时间(即机器的系统时间)来触发窗口计算。当处置惩罚时间达到窗口的竣事时间时,触发窗口计算。
适用场景
:适用于对时间精度要求不高的场景,或者当事件时间无法正确获取时。
CountTrigger
工作原理
:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
适用场景
:适用于须要基于数据量进行处置惩罚的场景,如批量数据处置惩罚、流量分析等。
ContinuousEventTimeTrigger
和
ContinuousProcessingTimeTrigger
工作原理
:根据间隔时间周期性触发窗口计算,或者当窗口的竣事时间小于当前的时间(事件时间或处置惩罚时间)时触发计算。
适用场景
:适用于须要周期性处置惩罚数据的场景,如实时监控、周期性报表等。
DeltaTrigger
工作原理
:根据接入数据计算出的Delta指标是否凌驾指定的阈值来触发窗口计算。
适用场景
:适用于须要基于数据变化量进行处置惩罚的场景,如异常检测、趋势分析等。
PurgingTrigger
工作原理
:将其他触发器作为参数转换为Purge类型的触发器,在触发计算后清除窗口内的数据。
适用场景
:适用于须要在计算完成后立即清除窗口数据的场景,以节流存储空间。
三、关键方法
触发器通常包含以下几个关键方法:
onElement
(T element, long timestamp, W window, TriggerContext ctx)
当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
onEventTime
(long time, W window, TriggerContext ctx)
当事件时间计时器触发时调用,用于处置惩罚事件时间相关的触发逻辑。
onProcessingTime
(long time, W window, TriggerContext ctx)
当处置惩罚时间计时器触发时调用,用于处置惩罚处置惩罚时间相关的触发逻辑。
onMerge
(W window, OnMergeContext ctx)(可选)
当两个窗口合并时调用,用于合并窗口的状态和定时器。
clear
(W window, TriggerContext ctx)
当窗口被删除时调用,用于清算窗口的状态和定时器。
四、行为
触发器在触发时会返回一个TriggerResult枚举值,以决定窗口的后续行为。常见的TriggerResult值包括:
CONTINUE
:表现不进行任何操纵,等候下一个触发条件。
FIRE
:表现触发窗口计算并输出结果,但窗口状态保持稳固。
PURGE
:表现不触发窗口计算,只清除窗口内的数据和状态。
FIRE_AND_PURGE
:表现触发窗口计算并输出结果,然后清除窗口内的数据和状态。
Flink的窗口触发器是流处置惩罚中非常机动且强盛的工具,它答应开发者根据现实需求界说窗口的触发条件和触发后的行为。通过选择合适的触发器和配置相应的参数,可以实现高效、正确的流数据处置惩罚。
五、Trigger
EventTimeTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {
}
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
public boolean canMerge() {
return true;
}
public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
public String toString() {
return "EventTimeTrigger()";
}
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
复制代码
ProcessingTimeTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {
}
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
return TriggerResult.FIRE;
}
public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
public boolean canMerge() {
return true;
}
public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
public String toString() {
return "ProcessingTimeTrigger()";
}
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
复制代码
ProcessingTimeoutTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import java.time.Duration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;
@PublicEvolving
public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private final Trigger<T, W> nestedTrigger;
private final long interval;
private final boolean resetTimerOnNewRecord;
private final boolean shouldClearOnTimeout;
private final ValueStateDescriptor<Long> timeoutStateDesc;
private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
this.nestedTrigger = nestedTrigger;
this.interval = interval;
this.resetTimerOnNewRecord = resetTimerOnNewRecord;
this.shouldClearOnTimeout = shouldClearOnTimeout;
this.timeoutStateDesc = new ValueStateDescriptor("timeout", LongSerializer.INSTANCE);
}
public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
if (triggerResult.isFire()) {
this.clear(window, ctx);
return triggerResult;
} else {
ValueState<Long> timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
Long timeoutTimestamp = (Long)timeoutState.value();
if (timeoutTimestamp != null && this.resetTimerOnNewRecord) {
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
timeoutState.clear();
timeoutTimestamp = null;
}
if (timeoutTimestamp == null) {
timeoutState.update(nextFireTimestamp);
ctx.registerProcessingTimeTimer(nextFireTimestamp);
}
return triggerResult;
}
}
public TriggerResult onProcessingTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
if (this.shouldClearOnTimeout) {
this.clear(window, ctx);
}
return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
}
public TriggerResult onEventTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
if (this.shouldClearOnTimeout) {
this.clear(window, ctx);
}
return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
}
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
ValueState<Long> timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
Long timeoutTimestamp = (Long)timeoutTimestampState.value();
if (timeoutTimestamp != null) {
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
timeoutTimestampState.clear();
}
this.nestedTrigger.clear(window, ctx);
}
public String toString() {
return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
}
public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout) {
return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), false, true);
}
public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
}
}
复制代码
CountTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc;
private CountTrigger(long maxCount) {
this.stateDesc = new ReducingStateDescriptor("count", new Sum(), LongSerializer.INSTANCE);
this.maxCount = maxCount;
}
public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
count.add(1L);
if ((Long)count.get() >= this.maxCount) {
count.clear();
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();
}
public boolean canMerge() {
return true;
}
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(this.stateDesc);
}
public String toString() {
return "CountTrigger(" + this.maxCount + ")";
}
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
private Sum() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
复制代码
ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
private final ReducingStateDescriptor<Long> stateDesc;
private ContinuousEventTimeTrigger(long interval) {
this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
this.interval = interval;
}
public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
if (fireTimestampState.get() == null) {
this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE;
} else {
ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
Long fireTimestamp = (Long)fireTimestampState.get();
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
}
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
Long timestamp = (Long)fireTimestamp.get();
if (timestamp != null) {
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}
}
public boolean canMerge() {
return true;
}
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(this.stateDesc);
Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
if (nextFireTimestamp != null) {
ctx.registerEventTimeTimer(nextFireTimestamp);
}
}
public String toString() {
return "ContinuousEventTimeTrigger(" + this.interval + ")";
}
@VisibleForTesting
public long getInterval() {
return this.interval;
}
public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
return new ContinuousEventTimeTrigger(interval.toMilliseconds());
}
private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
fireTimestampState.add(nextFireTimestamp);
ctx.registerEventTimeTimer(nextFireTimestamp);
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
private Min() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
复制代码
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
private final ReducingStateDescriptor<Long> stateDesc;
private ContinuousProcessingTimeTrigger(long interval) {
this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
this.interval = interval;
}
public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestampState.get() == null) {
this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
}
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
if (((Long)fireTimestampState.get()).equals(time)) {
fireTimestampState.clear();
this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
Long timestamp = (Long)fireTimestamp.get();
if (timestamp != null) {
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
}
}
public boolean canMerge() {
return true;
}
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(this.stateDesc);
Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
if (nextFireTimestamp != null) {
ctx.registerProcessingTimeTimer(nextFireTimestamp);
}
}
@VisibleForTesting
public long getInterval() {
return this.interval;
}
public String toString() {
return "ContinuousProcessingTimeTrigger(" + this.interval + ")";
}
public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());
}
private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
fireTimestampState.add(nextFireTimestamp);
ctx.registerProcessingTimeTimer(nextFireTimestamp);
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
private Min() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
复制代码
DeltaTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
@PublicEvolving
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private final DeltaFunction<T> deltaFunction;
private final double threshold;
private final ValueStateDescriptor<T> stateDesc;
private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
this.deltaFunction = deltaFunction;
this.threshold = threshold;
this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
}
public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
if (lastElementState.value() == null) {
lastElementState.update(element);
return TriggerResult.CONTINUE;
} else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
lastElementState.update(element);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
((ValueState)ctx.getPartitionedState(this.stateDesc)).clear();
}
public String toString() {
return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + ")";
}
public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
return new DeltaTrigger(threshold, deltaFunction, stateSerializer);
}
}
复制代码
PurgingTrigger
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;
@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private Trigger<T, W> nestedTrigger;
private PurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}
public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
TriggerResult triggerResult = this.nestedTrigger.onEventTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(time, window, ctx);
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
this.nestedTrigger.clear(window, ctx);
}
public boolean canMerge() {
return this.nestedTrigger.canMerge();
}
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
this.nestedTrigger.onMerge(window, ctx);
}
public String toString() {
return "PurgingTrigger(" + this.nestedTrigger.toString() + ")";
}
public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new PurgingTrigger(nestedTrigger);
}
@VisibleForTesting
public Trigger<T, W> getNestedTrigger() {
return this.nestedTrigger;
}
}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4