37、Flink 的窗口函数(Window Functions)详解

打印 上一主题 下一主题

主题 525|帖子 525|积分 1575

窗口函数(Window Functions)

a)概述

界说了 window assigner 之后,必要指定当窗口触发之后,怎样计算每个窗口中的数据, 即 window function。
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction;


  • 前两者执行更高效,由于 Flink 可以在每条数据到达窗口后进行增量聚合(incrementally aggregate);
  • 而 ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。
使用 ProcessWindowFunction 的窗口转换操作没有别的两种函数高效,由于 Flink 在窗口触发前必须缓存里面的所有数据; ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 合并来提高效率,既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction 接收窗口的 metadata。
b)ReduceFunction

ReduceFunction 指定两条输入数据怎样合并起来产生一条输出数据,输入和输出数据的类型必须相同
Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
示例:对窗口内元组的第二个属性求和。
  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.     .keyBy(<key selector>)
  4.     .window(<window assigner>)
  5.     .reduce(new ReduceFunction<Tuple2<String, Long>>() {
  6.       public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
  7.         return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
  8.       }
  9.     });
复制代码
c)AggregateFunction

ReduceFunction 是 AggregateFunction 的特别情况; AggregateFunction 接收三个参数:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。
输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)。
与 ReduceFunction 相同,Flink 会在输入数据到达窗口时直接进行增量聚合。
示例:计算窗口内所有元素第二个属性的均匀值。
  1. private static class AverageAggregate
  2.     implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  3.   @Override
  4.   public Tuple2<Long, Long> createAccumulator() {
  5.     return new Tuple2<>(0L, 0L);
  6.   }
  7.   @Override
  8.   public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
  9.     return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  10.   }
  11.   @Override
  12.   public Double getResult(Tuple2<Long, Long> accumulator) {
  13.     return ((double) accumulator.f0) / accumulator.f1;
  14.   }
  15.   @Override
  16.   public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
  17.     return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  18.   }
  19. }
  20. DataStream<Tuple2<String, Long>> input = ...;
  21. input
  22.     .keyBy(<key selector>)
  23.     .window(<window assigner>)
  24.     .aggregate(new AverageAggregate());
复制代码
d)ProcessWindowFunction

ProcessWindowFunction 具备 Iterable 能获取窗口内所有的元素 ,以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活;ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 由于窗口中的数据无法被增量聚合,而必要在窗口触发前缓存所有数据。
ProcessWindowFunction 的函数签名如下:
  1. public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
  2.     /**
  3.      * Evaluates the window and outputs none or several elements.
  4.      *
  5.      * @param key The key for which this window is evaluated.
  6.      * @param context The context in which the window is being evaluated.
  7.      * @param elements The elements in the window being evaluated.
  8.      * @param out A collector for emitting elements.
  9.      *
  10.      * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  11.      */
  12.     public abstract void process(
  13.             KEY key,
  14.             Context context,
  15.             Iterable<IN> elements,
  16.             Collector<OUT> out) throws Exception;
  17.     /**
  18.      * Deletes any state in the {@code Context} when the Window expires (the watermark passes its
  19.      * {@code maxTimestamp} + {@code allowedLateness}).
  20.      *
  21.      * @param context The context to which the window is being evaluated
  22.      * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  23.      */
  24.     public void clear(Context context) throws Exception {}
  25.     /**
  26.      * The context holding window metadata.
  27.      */
  28.     public abstract class Context implements java.io.Serializable {
  29.         /**
  30.          * Returns the window that is being evaluated.
  31.          */
  32.         public abstract W window();
  33.         /** Returns the current processing time. */
  34.         public abstract long currentProcessingTime();
  35.         /** Returns the current event-time watermark. */
  36.         public abstract long currentWatermark();
  37.         /**
  38.          * State accessor for per-key and per-window state.
  39.          *
  40.          * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
  41.          * by implementing {@link ProcessWindowFunction#clear(Context)}.
  42.          */
  43.         public abstract KeyedStateStore windowState();
  44.         /**
  45.          * State accessor for per-key global state.
  46.          */
  47.         public abstract KeyedStateStore globalState();
  48.     }
  49. }
复制代码
key 参数由 keyBy() 中指定的 KeySelector 选出;假如是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且必要手动将它转换为正确巨细的 tuple 才能提取 key。
示例:使用 ProcessWindowFunction 对窗口中的元素计数,并且将窗口自己的信息一同输出。
  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.   .keyBy(t -> t.f0)
  4.   .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  5.   .process(new MyProcessWindowFunction());
  6. public class MyProcessWindowFunction
  7.     extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  8.   @Override
  9.   public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
  10.     long count = 0;
  11.     for (Tuple2<String, Long> in: input) {
  12.       count++;
  13.     }
  14.     out.collect("Window: " + context.window() + "count: " + count);
  15.   }
  16. }
复制代码
e)增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时间进行增量聚合,当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果;即实现了增量聚合窗口的元素并且从 ProcessWindowFunction 中获得窗口的元数据。
使用 ReduceFunction 增量聚合
示例:将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。
  1. DataStream<SensorReading> input = ...;
  2. input
  3.   .keyBy(<key selector>)
  4.   .window(<window assigner>)
  5.   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
  6. // Function definitions
  7. private static class MyReduceFunction implements ReduceFunction<SensorReading> {
  8.   public SensorReading reduce(SensorReading r1, SensorReading r2) {
  9.       return r1.value() > r2.value() ? r2 : r1;
  10.   }
  11. }
  12. private static class MyProcessWindowFunction
  13.     extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
  14.   public void process(String key,
  15.                     Context context,
  16.                     Iterable<SensorReading> minReadings,
  17.                     Collector<Tuple2<Long, SensorReading>> out) {
  18.       SensorReading min = minReadings.iterator().next();
  19.       out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  20.   }
  21. }
复制代码
使用 AggregateFunction 增量聚合
示例:将 AggregateFunction 与 ProcessWindowFunction 组合,计算均匀值并与窗口对应的 key 一同输出。
  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.   .keyBy(<key selector>)
  4.   .window(<window assigner>)
  5.   .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
  6. // Function definitions
  7. private static class AverageAggregate
  8.     implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  9.   @Override
  10.   public Tuple2<Long, Long> createAccumulator() {
  11.     return new Tuple2<>(0L, 0L);
  12.   }
  13.   @Override
  14.   public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
  15.     return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  16.   }
  17.   @Override
  18.   public Double getResult(Tuple2<Long, Long> accumulator) {
  19.     return ((double) accumulator.f0) / accumulator.f1;
  20.   }
  21.   @Override
  22.   public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
  23.     return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  24.   }
  25. }
  26. private static class MyProcessWindowFunction
  27.     extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
  28.   public void process(String key,
  29.                     Context context,
  30.                     Iterable<Double> averages,
  31.                     Collector<Tuple2<String, Double>> out) {
  32.       Double average = averages.iterator().next();
  33.       out.collect(new Tuple2<>(key, average));
  34.   }
  35. }
复制代码
f)在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state,ProcessWindowFunction 还可以使用作用域仅为“当前正在处理的窗口”的 keyed state
per-window 中的 window 对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口,具体情况取决于窗口的界说,根据具体的 key 和时间段会产生诸多不同的窗口实例。
Per-window state 假如处理有 1000 种不同 key 的变乱,并且目前所有变乱都处于 [12:00, 13:00) 时间窗口内,那么将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。
process() 接收到的 Context 对象中有两个方法允许访问以下两种 state:


  • globalState(),访问全局的 keyed state
  • windowState(), 访问作用域仅限于当前窗口的 keyed state
假如大概将一个 window 触发多次(比如当迟到数据会再次触发窗口计算, 或自界说了根据推测提前触发窗口的 trigger),那么这个功能将非常有用,这时大概必要在 per-window state 中储存关于之前触发的信息或触发的总次数。
当使用窗口状态时,肯定记得在删除窗口时扫除这些状态,应该界说在 clear() 方法中
WindowFunction(已过时)
在某些可以使用 ProcessWindowFunction 的地方,也可以使用 WindowFunction;它是旧版的 ProcessWindowFunction,只能提供更少的情况信息且缺少一些高级的功能,比如 per-window state。
WindowFunction 的函数签名如下:
  1. public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
  2.   /**
  3.    * Evaluates the window and outputs none or several elements.
  4.    *
  5.    * @param key The key for which this window is evaluated.
  6.    * @param window The window that is being evaluated.
  7.    * @param input The elements in the window being evaluated.
  8.    * @param out A collector for emitting elements.
  9.    *
  10.    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  11.    */
  12.   void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
  13. }
复制代码
可以像下比方许使用:
  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.     .keyBy(<key selector>)
  4.     .window(<window assigner>)
  5.     .apply(new MyWindowFunction());
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表