38、Flink 的 WindowAssigner 之 GlobalWindows 示例

打印 上一主题 下一主题

主题 814|帖子 814|积分 2442

1、注意
使用 GlobalWindows 需要自定义 Trigger,否则窗口中的数据不会被计算。
2、代码示例
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  4. import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
  5. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  6. import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
  7. import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
  8. import org.apache.flink.util.Collector;
  9. public class _05_WindowAssignerGlobal {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         DataStreamSource<String> input = env.socketTextStream("localhost", 8888);
  13.         // 此窗口模式仅在指定了自定义的 trigger 时有用,否则计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据
  14.         input
  15.                 .keyBy(e -> e)
  16.                 // 多并行 Task
  17.                 .window(GlobalWindows.create())
  18.                 .trigger(new Trigger<String, GlobalWindow>() {
  19.                     @Override
  20.                     public TriggerResult onElement(String s, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  21.                         return null;
  22.                     }
  23.                     @Override
  24.                     public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  25.                         return null;
  26.                     }
  27.                     @Override
  28.                     public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  29.                         return null;
  30.                     }
  31.                     @Override
  32.                     public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
  33.                     }
  34.                 })
  35.                 .apply(new WindowFunction<String, String, String, GlobalWindow>() {
  36.                     @Override
  37.                     public void apply(String s, GlobalWindow globalWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
  38.                         for (String res : iterable) {
  39.                             collector.collect(res);
  40.                         }
  41.                     }
  42.                 })
  43.                 .print();
  44.         env.execute();
  45.     }
  46. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

慢吞云雾缓吐愁

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表