Flink 窗口触发器Triggers
Triggers❝界说:触发器决定了窗口何时被触发。在Flink中,窗口的触发是通过设置定时器来实现的。
作用:控制窗口数据的聚合机会,确保数据在适当的时间点被处置惩罚和输出。
Trigger关键方法
onElement: 当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
onElement(T element, long timestamp, W window, TriggerContext ctx);
onEventTime:当事件时间计时器触发时调用,用于处置惩罚事件时间相关的触发逻辑。
onEventTime(long time, W window, TriggerContext ctx);
onProcessingTime :当处置惩罚时间计时器触发时调用,这里时间指机器处置惩罚时间,而不考虑时间自己的时间。见后文ProcessingTimeTrigger实现
onProcessingTime(long time, W window, TriggerContext ctx);
clear 当窗口被删除时调用,用于清理窗口的状态和定时器。
clear(W window, TriggerContext ctx);
内置Trigger
Flink提供了多种内置的触发器,以下为几种常用类型:
[*] EventTimeTrigger 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。
[*] ProcessingTimeTrigger 工作原理:基于处置惩罚时长(即机器的系统时间)来触发窗口计算。当处置惩罚时间到达窗口的结束时间时,触发窗口计算。
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
[*] CountTrigger 工作原理:根据窗口内元素的数目来触发计算。当窗口内的元素数目到达预设的阈值时,触发窗口计算。
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
[*] ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处置惩罚时间)时触发计算。实用场景:实用于需要周期性处置惩罚数据的场景,如实时监控、周期性报表等。
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
}
[*] DeltaTrigger 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。实用场景:实用于需要基于数据变革量举行处置惩罚的场景,如异常检测、趋势分析等。
if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
lastElementState.update(element);
return TriggerResult.FIRE;
}
自界说一个Trigger
实现一个CountTrigger 窗口元素数目到达阈值时,触发计算
package com.codetonight.datastream.trigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
public class CountTrigger<T> extends Trigger<T, GlobalWindow> {
private final long countThreshold;
private long count = 0L;
public CountTrigger(long countThreshold) {
this.countThreshold = countThreshold;
}
@Override
public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
count++;
if (count >= countThreshold) {
// 触发窗口并清除计数器
count = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
// 其他方法(onEventTime, onProcessingTime, onMerge, clear)可以留空或实现特定的逻辑
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
count = 0L;
}
}
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class FlinkGlobalWindowExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> source = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);
// 应用全局窗口和自定义触发器
DataStream<Long> result = source
.keyBy(value -> 1)
.windowAll(GlobalWindows.create())
.trigger(new CountTrigger<>(5)) // 当接收到5个元素时触发
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
return value1 + value2;
}
});
// 打印结果
result.print();
// 执行作业
env.execute("Flink Global Window Example");
}
}
Evictor
Flink 的窗口模型允许在指定 WindowAssigner 和 Trigger 之外,还可以选择性地指定一个 Evictor。
Evictor 的功能是在触发器触发后,且在窗口函数应用之前和/或之后,从窗口中移除元素。为了实现这一功能,Evictor 接口界说了两个方法:
public interface Evictor<T, W extends Window> extends Serializable {
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
}
通过这两个方法,Evictor 提供了在窗口生命周期中灵活控制元素保留与移除的能力。
内置Evictor
这些 Evictor 可以单独使用,也可以与 Flink 的 WindowAssigner 和 Trigger 一起使用, 以创建复杂而强大的窗口处置惩罚逻辑。通过灵活组合这些组件, Flink 用户可以处置惩罚各种实时数据流场景,包罗滑动窗口、滚动窗口、会话窗口等。
CountEvictor: 功能:保留窗口中用户指定的元素数目,并从窗口缓冲区的开头丢弃剩余的元素。应用场景:当你只需要保留窗口中最新的 N 个元素时,这个 Evictor 非常有效。
DeltaEvictor: 移除逻辑代码比力清晰:
[*] 取窗口最后一个元素lastElement
[*] 所有元素与lastElement 比力计算出差值( Delta )
[*] 差值( Delta ) 超过阈值则移除
DeltaFunction用于计算两个元素之间的差值
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<T> element = iterator.next();
if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())
>= this.threshold) {
iterator.remove();
}
}
}
TimeEvictor: 功能:基于时间戳来移除窗口中的元素。它接受一个时间间隔(以毫秒为单位),对于给定的窗口,它会找到元素中的最大时间戳 max_ts,并移除所偶然间戳小于 max_ts 减去指定时间间隔的元素。应用场景:当你盼望基于时间戳来过滤窗口中的旧元素时,这个 Evictor 非常有效。
TimeEvictor evcit 方法代码逻辑,方法定名很清晰。
[*] 取窗口元素最大的时间戳 currentTime,
[*] 保留的时间戳阈值evictCutoff = currentTime -windowSize
[*] 循环遍历移除不在evictCutoff 之前的元素
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
long currentTime = getMaxTimestamp(elements);
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
if (record.getTimestamp() <= evictCutoff) {
iterator.remove();
}
}
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]