去皮卡多 发表于 2024-10-23 21:06:40

Flink 窗口触发器Triggers

Triggers

   ❝界说:触发器决定了窗口何时被触发。在Flink中,窗口的触发是通过设置定时器来实现的。
作用:控制窗口数据的聚合机会,确保数据在适当的时间点被处置惩罚和输出。
Trigger关键方法

onElement: 当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
onElement(T element, long timestamp, W window, TriggerContext ctx);
onEventTime:当事件时间计时器触发时调用,用于处置惩罚事件时间相关的触发逻辑。
onEventTime(long time, W window, TriggerContext ctx);
onProcessingTime :当处置惩罚时间计时器触发时调用,这里时间指机器处置惩罚时间,而不考虑时间自己的时间。见后文ProcessingTimeTrigger实现
onProcessingTime(long time, W window, TriggerContext ctx);
clear 当窗口被删除时调用,用于清理窗口的状态和定时器。
clear(W window, TriggerContext ctx);
内置Trigger

Flink提供了多种内置的触发器,以下为几种常用类型:


[*] EventTimeTrigger 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。
[*] ProcessingTimeTrigger 工作原理:基于处置惩罚时长(即机器的系统时间)来触发窗口计算。当处置惩罚时间到达窗口的结束时间时,触发窗口计算。
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }


[*] CountTrigger 工作原理:根据窗口内元素的数目来触发计算。当窗口内的元素数目到达预设的阈值时,触发窗口计算。
  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
  count.add(1L);
  if (count.get() >= maxCount) {
      count.clear();
      return TriggerResult.FIRE;
  }
  return TriggerResult.CONTINUE;


[*] ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处置惩罚时间)时触发计算。实用场景:实用于需要周期性处置惩罚数据的场景,如实时监控、周期性报表等。
       if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }



[*] DeltaTrigger 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。实用场景:实用于需要基于数据变革量举行处置惩罚的场景,如异常检测、趋势分析等。

    if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
        lastElementState.update(element);
        return TriggerResult.FIRE;
    }
        
自界说一个Trigger

实现一个CountTrigger 窗口元素数目到达阈值时,触发计算
package com.codetonight.datastream.trigger;

import org.apache.flink.streaming.api.windowing.triggers.Trigger;

import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;  
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;  
  
public class CountTrigger<T> extends Trigger<T, GlobalWindow> {  
  
    private final long countThreshold;  
    private long count = 0L;  
  
    public CountTrigger(long countThreshold) {  
        this.countThreshold = countThreshold;  
    }  
  
    @Override  
    public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {  
        count++;  
        if (count >= countThreshold) {  
            // 触发窗口并清除计数器
            count = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }  
        return TriggerResult.CONTINUE;  
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    // 其他方法(onEventTime, onProcessingTime, onMerge, clear)可以留空或实现特定的逻辑  
  
    @Override  
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

        count = 0L;  
    }  

}
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;  
import org.apache.flink.streaming.api.windowing.windows.Window;  
  
public class FlinkGlobalWindowExample {  
  
    public static void main(String[] args) throws Exception {  
  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        DataStream<Long> source = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);  
  
        // 应用全局窗口和自定义触发器  
        DataStream<Long> result = source
            .keyBy(value -> 1)
          
            .windowAll(GlobalWindows.create())
            .trigger(new CountTrigger<>(5)) // 当接收到5个元素时触发
            .reduce(new ReduceFunction<Long>() {  
                @Override  
                public Long reduce(Long value1, Long value2) {  
                    return value1 + value2;  
                }  
            });  
  
        // 打印结果  
        result.print();  
  
        // 执行作业  
        env.execute("Flink Global Window Example");  
    }  
}
Evictor

Flink 的窗口模型允许在指定 WindowAssigner 和 Trigger 之外,还可以选择性地指定一个 Evictor。
Evictor 的功能是在触发器触发后,且在窗口函数应用之前和/或之后,从窗口中移除元素。为了实现这一功能,Evictor 接口界说了两个方法:
public interface Evictor<T, W extends Window> extends Serializable {


    void evictBefore(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);


    void evictAfter(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);


}

通过这两个方法,Evictor 提供了在窗口生命周期中灵活控制元素保留与移除的能力。
内置Evictor

这些 Evictor 可以单独使用,也可以与 Flink 的 WindowAssigner 和 Trigger 一起使用, 以创建复杂而强大的窗口处置惩罚逻辑。通过灵活组合这些组件, Flink 用户可以处置惩罚各种实时数据流场景,包罗滑动窗口、滚动窗口、会话窗口等。
CountEvictor: 功能:保留窗口中用户指定的元素数目,并从窗口缓冲区的开头丢弃剩余的元素。应用场景:当你只需要保留窗口中最新的 N 个元素时,这个 Evictor 非常有效。
DeltaEvictor: 移除逻辑代码比力清晰:

[*] 取窗口最后一个元素lastElement
[*] 所有元素与lastElement 比力计算出差值( Delta )
[*] 差值( Delta ) 超过阈值则移除
DeltaFunction用于计算两个元素之间的差值
    private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
        TimestampedValue<T> lastElement = Iterables.getLast(elements);
        for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {
            TimestampedValue<T> element = iterator.next();
            if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())
                    >= this.threshold) {
                iterator.remove();
            }
        }
    }
TimeEvictor: 功能:基于时间戳来移除窗口中的元素。它接受一个时间间隔(以毫秒为单位),对于给定的窗口,它会找到元素中的最大时间戳 max_ts,并移除所偶然间戳小于 max_ts 减去指定时间间隔的元素。应用场景:当你盼望基于时间戳来过滤窗口中的旧元素时,这个 Evictor 非常有效。
TimeEvictor evcit 方法代码逻辑,方法定名很清晰。

[*] 取窗口元素最大的时间戳 currentTime,
[*] 保留的时间戳阈值evictCutoff = currentTime -windowSize
[*] 循环遍历移除不在evictCutoff 之前的元素
    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (!hasTimestamp(elements)) {
            return;
        }

        long currentTime = getMaxTimestamp(elements);
        long evictCutoff = currentTime - windowSize;

        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                iterator.hasNext(); ) {
            TimestampedValue<Object> record = iterator.next();
            if (record.getTimestamp() <= evictCutoff) {
                iterator.remove();
            }
        }
    }
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink 窗口触发器Triggers