Flink中定时器的使用

打印 上一主题 下一主题

主题 455|帖子 455|积分 1365

定时器

让 Flink 流处理步伐对处理时间和变乱时间的变化作出反应。通常在KeyProcessFunction中使用。
基于处理时间或者变乱时间处理过一个元素之后,注册一个定时器,然后在指定的时间运行。
最常见的显式使用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的差别:


  • 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在体系时间戳到达Timer设定的时间戳时触发。
  • 变乱时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印到达或凌驾Timer设定的时间戳时触发。

Context和OnTimerContext所持有的TimerService对象拥有以下方法:


  • currentProcessingTime(): Long 返回当前处理时间
  • currentWatermark(): Long 返回当前watermark的时间戳
  • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达指定时间时,触发timer
  • registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。假如没有这个时间戳的定时器,则不执行。
  • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的变乱时间定时器,假如没有此时间戳的定时器,则不执行。
1.基于处理时间的定时器
  1. SingleOutputStreamOperator<WaterSensor> stream = env
  2.   .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  3.   .map(value -> {
  4.       String[] datas = value.split(",");
  5.       return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
  6.   });
  7. stream
  8.   .keyBy(WaterSensor::getId)
  9.   .process(new KeyedProcessFunction<String, WaterSensor, String>() {
  10.       @Override
  11.       public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  12.           // 处理时间过后5s后触发定时器
  13.           ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
  14.           out.collect(value.toString());
  15.       }
  16.       // 定时器被触发之后, 回调这个方法
  17.       // 参数1: 触发器被触发的时间
  18.       @Override
  19.       public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  20.           System.out.println(timestamp);
  21.           out.collect("我被触发了....");
  22.       }
  23.   })
  24.   .print();
复制代码
2.基于变乱时间的定时器
变乱进展依据的是watermark
  1. SingleOutputStreamOperator<WaterSensor> stream = env
  2.   .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  3.   .map(value -> {
  4.       String[] datas = value.split(",");
  5.       return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
  6.   });
  7. WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
  8.   .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  9.   .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
  10. stream
  11.   .assignTimestampsAndWatermarks(wms)
  12.   .keyBy(WaterSensor::getId)
  13.   .process(new KeyedProcessFunction<String, WaterSensor, String>() {
  14.       @Override
  15.       public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  16.           System.out.println(ctx.timestamp());
  17.           ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
  18.           out.collect(value.toString());
  19.       }
  20.       @Override
  21.       public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  22.           System.out.println("定时器被触发.....");
  23.       }
  24.   })
  25.   .print();
复制代码
3.定时器练习
监控水位传感器的水位值,假如水位值在(处理时间)5秒内一连上上,则报警。
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  2.         env.setParallelism(1);
  3.         env.socketTextStream("hadoop162",9999)
  4.                 .map(value->{
  5.                     String[] split = value.split(",");
  6.                     return new WaterSensor(split[0],Long.valueOf(split[1])*1000,Integer.valueOf(split[2]));
  7.                 })
  8.                 .assignTimestampsAndWatermarks(WatermarkStrategy.
  9.                         <WaterSensor>forMonotonousTimestamps()
  10.                     .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){
  11.                         @Override
  12.                         public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  13.                             return element.getTs();
  14.                         }
  15.                     })
  16.                 )
  17.                 .keyBy(WaterSensor::getId)
  18.                 .process(new KeyedProcessFunction<String, WaterSensor, String>() {
  19.                     private  long  time;
  20.                     private  int  lastVc = 0;
  21.                     private  boolean  isFirst = true;
  22.                     @Override
  23.                     public void processElement(WaterSensor value,
  24.                                                Context ctx,
  25.                                                Collector<String> out) throws Exception {
  26.                         if (value.getVc() > lastVc){
  27.                             if (isFirst){
  28.                                 //水位上升
  29.                                 time =ctx.timestamp() +5000L;
  30.                                 ctx.timerService().registerEventTimeTimer(time);
  31.                                 isFirst=false;
  32.                             }
  33.                         }else {
  34.                             //水位下降或者不变
  35.                             ctx.timerService().deleteEventTimeTimer(time);
  36.                             time = ctx.timestamp() + 5000L;
  37.                             ctx.timerService().registerEventTimeTimer(time);
  38.                         }
  39.                         lastVc = value.getVc();
  40.                     }
  41.                     @Override
  42.                     public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  43.                         out.collect(ctx.getCurrentKey()+"水位已经连续5上升。。。。");
  44.                         isFirst=true;
  45.                     }
  46.                 })
  47.                 .print();
  48.                
  49.         try {
  50.             env.execute();
  51.         } catch (Exception e) {
  52.             e.printStackTrace();
  53.         }
  54.     }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连密封材料

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表