大连密封材料 发表于 2024-7-18 13:16:12

Flink中定时器的使用

定时器

让 Flink 流处理步伐对处理时间和变乱时间的变化作出反应。通常在KeyProcessFunction中使用。
基于处理时间或者变乱时间处理过一个元素之后,注册一个定时器,然后在指定的时间运行。
最常见的显式使用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的差别:


[*]处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在体系时间戳到达Timer设定的时间戳时触发。
[*]变乱时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印到达或凌驾Timer设定的时间戳时触发。

Context和OnTimerContext所持有的TimerService对象拥有以下方法:


[*]currentProcessingTime(): Long 返回当前处理时间
[*]currentWatermark(): Long 返回当前watermark的时间戳
[*]registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达指定时间时,触发timer。
[*]registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
[*]deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。假如没有这个时间戳的定时器,则不执行。
[*]deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的变乱时间定时器,假如没有此时间戳的定时器,则不执行。
1.基于处理时间的定时器
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999)// 在socket终端只输入毫秒级别的时间戳
.map(value -> {
      String[] datas = value.split(",");
      return new WaterSensor(datas, Long.valueOf(datas), Integer.valueOf(datas));

});

stream
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
          // 处理时间过后5s后触发定时器
          ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
          out.collect(value.toString());
      }


      // 定时器被触发之后, 回调这个方法
      // 参数1: 触发器被触发的时间
      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
          System.out.println(timestamp);
          out.collect("我被触发了....");
      }
})
.print();
2.基于变乱时间的定时器
变乱进展依据的是watermark
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999)// 在socket终端只输入毫秒级别的时间戳
.map(value -> {
      String[] datas = value.split(",");
      return new WaterSensor(datas, Long.valueOf(datas), Integer.valueOf(datas));

});

WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
stream
.assignTimestampsAndWatermarks(wms)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
          System.out.println(ctx.timestamp());
          ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
          out.collect(value.toString());
      }

      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
          System.out.println("定时器被触发.....");
      }
})
.print();
3.定时器练习
监控水位传感器的水位值,假如水位值在(处理时间)5秒内一连上上,则报警。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
      env.setParallelism(1);

      env.socketTextStream("hadoop162",9999)
                .map(value->{
                  String[] split = value.split(",");
                  return new WaterSensor(split,Long.valueOf(split)*1000,Integer.valueOf(split));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <WaterSensor>forMonotonousTimestamps()
                  .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){
                        @Override
                        public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                            return element.getTs();
                        }
                  })
                )
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                  privatelongtime;
                  privateintlastVc = 0;
                  privatebooleanisFirst = true;

                  @Override
                  public void processElement(WaterSensor value,
                                             Context ctx,
                                             Collector<String> out) throws Exception {
                        if (value.getVc() > lastVc){
                            if (isFirst){
                              //水位上升
                              time =ctx.timestamp() +5000L;
                              ctx.timerService().registerEventTimeTimer(time);
                              isFirst=false;
                            }
                        }else {
                            //水位下降或者不变
                            ctx.timerService().deleteEventTimeTimer(time);

                            time = ctx.timestamp() + 5000L;
                            ctx.timerService().registerEventTimeTimer(time);
                        }
                        lastVc = value.getVc();
                  }

                  @Override
                  public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey()+"水位已经连续5上升。。。。");
                        isFirst=true;
                  }
                })
                .print();
               
      try {
            env.execute();
      } catch (Exception e) {
            e.printStackTrace();
      }
    }



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink中定时器的使用