ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink中定时器的使用 [打印本页]

作者: 大连密封材料    时间: 2024-7-18 13:16
标题: Flink中定时器的使用
定时器

让 Flink 流处理步伐对处理时间和变乱时间的变化作出反应。通常在KeyProcessFunction中使用。
基于处理时间或者变乱时间处理过一个元素之后,注册一个定时器,然后在指定的时间运行。
最常见的显式使用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的差别:


Context和OnTimerContext所持有的TimerService对象拥有以下方法:

1.基于处理时间的定时器
  1. SingleOutputStreamOperator<WaterSensor> stream = env
  2.   .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  3.   .map(value -> {
  4.       String[] datas = value.split(",");
  5.       return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
  6.   });
  7. stream
  8.   .keyBy(WaterSensor::getId)
  9.   .process(new KeyedProcessFunction<String, WaterSensor, String>() {
  10.       @Override
  11.       public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  12.           // 处理时间过后5s后触发定时器
  13.           ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
  14.           out.collect(value.toString());
  15.       }
  16.       // 定时器被触发之后, 回调这个方法
  17.       // 参数1: 触发器被触发的时间
  18.       @Override
  19.       public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  20.           System.out.println(timestamp);
  21.           out.collect("我被触发了....");
  22.       }
  23.   })
  24.   .print();
复制代码
2.基于变乱时间的定时器
变乱进展依据的是watermark
  1. SingleOutputStreamOperator<WaterSensor> stream = env
  2.   .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  3.   .map(value -> {
  4.       String[] datas = value.split(",");
  5.       return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
  6.   });
  7. WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
  8.   .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  9.   .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
  10. stream
  11.   .assignTimestampsAndWatermarks(wms)
  12.   .keyBy(WaterSensor::getId)
  13.   .process(new KeyedProcessFunction<String, WaterSensor, String>() {
  14.       @Override
  15.       public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
  16.           System.out.println(ctx.timestamp());
  17.           ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
  18.           out.collect(value.toString());
  19.       }
  20.       @Override
  21.       public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  22.           System.out.println("定时器被触发.....");
  23.       }
  24.   })
  25.   .print();
复制代码
3.定时器练习
监控水位传感器的水位值,假如水位值在(处理时间)5秒内一连上上,则报警。
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  2.         env.setParallelism(1);
  3.         env.socketTextStream("hadoop162",9999)
  4.                 .map(value->{
  5.                     String[] split = value.split(",");
  6.                     return new WaterSensor(split[0],Long.valueOf(split[1])*1000,Integer.valueOf(split[2]));
  7.                 })
  8.                 .assignTimestampsAndWatermarks(WatermarkStrategy.
  9.                         <WaterSensor>forMonotonousTimestamps()
  10.                     .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){
  11.                         @Override
  12.                         public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  13.                             return element.getTs();
  14.                         }
  15.                     })
  16.                 )
  17.                 .keyBy(WaterSensor::getId)
  18.                 .process(new KeyedProcessFunction<String, WaterSensor, String>() {
  19.                     private  long  time;
  20.                     private  int  lastVc = 0;
  21.                     private  boolean  isFirst = true;
  22.                     @Override
  23.                     public void processElement(WaterSensor value,
  24.                                                Context ctx,
  25.                                                Collector<String> out) throws Exception {
  26.                         if (value.getVc() > lastVc){
  27.                             if (isFirst){
  28.                                 //水位上升
  29.                                 time =ctx.timestamp() +5000L;
  30.                                 ctx.timerService().registerEventTimeTimer(time);
  31.                                 isFirst=false;
  32.                             }
  33.                         }else {
  34.                             //水位下降或者不变
  35.                             ctx.timerService().deleteEventTimeTimer(time);
  36.                             time = ctx.timestamp() + 5000L;
  37.                             ctx.timerService().registerEventTimeTimer(time);
  38.                         }
  39.                         lastVc = value.getVc();
  40.                     }
  41.                     @Override
  42.                     public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  43.                         out.collect(ctx.getCurrentKey()+"水位已经连续5上升。。。。");
  44.                         isFirst=true;
  45.                     }
  46.                 })
  47.                 .print();
  48.                
  49.         try {
  50.             env.execute();
  51.         } catch (Exception e) {
  52.             e.printStackTrace();
  53.         }
  54.     }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4