定时器
让 Flink 流处理步伐对处理时间和变乱时间的变化作出反应。通常在KeyProcessFunction中使用。
基于处理时间或者变乱时间处理过一个元素之后,注册一个定时器,然后在指定的时间运行。
最常见的显式使用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的差别:
- 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在体系时间戳到达Timer设定的时间戳时触发。
- 变乱时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印到达或凌驾Timer设定的时间戳时触发。
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
- currentProcessingTime(): Long 返回当前处理时间
- currentWatermark(): Long 返回当前watermark的时间戳
- registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达指定时间时,触发timer。
- registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
- deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。假如没有这个时间戳的定时器,则不执行。
- deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的变乱时间定时器,假如没有此时间戳的定时器,则不执行。
1.基于处理时间的定时器
- SingleOutputStreamOperator<WaterSensor> stream = env
- .socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
- .map(value -> {
- String[] datas = value.split(",");
- return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
- });
- stream
- .keyBy(WaterSensor::getId)
- .process(new KeyedProcessFunction<String, WaterSensor, String>() {
- @Override
- public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
- // 处理时间过后5s后触发定时器
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
- out.collect(value.toString());
- }
- // 定时器被触发之后, 回调这个方法
- // 参数1: 触发器被触发的时间
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
- System.out.println(timestamp);
- out.collect("我被触发了....");
- }
- })
- .print();
复制代码 2.基于变乱时间的定时器
变乱进展依据的是watermark
- SingleOutputStreamOperator<WaterSensor> stream = env
- .socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
- .map(value -> {
- String[] datas = value.split(",");
- return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
- });
- WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
- .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
- .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
- stream
- .assignTimestampsAndWatermarks(wms)
- .keyBy(WaterSensor::getId)
- .process(new KeyedProcessFunction<String, WaterSensor, String>() {
- @Override
- public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
- System.out.println(ctx.timestamp());
- ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
- out.collect(value.toString());
- }
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
- System.out.println("定时器被触发.....");
- }
- })
- .print();
复制代码 3.定时器练习
监控水位传感器的水位值,假如水位值在(处理时间)5秒内一连上上,则报警。
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- env.socketTextStream("hadoop162",9999)
- .map(value->{
- String[] split = value.split(",");
- return new WaterSensor(split[0],Long.valueOf(split[1])*1000,Integer.valueOf(split[2]));
- })
- .assignTimestampsAndWatermarks(WatermarkStrategy.
- <WaterSensor>forMonotonousTimestamps()
- .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){
- @Override
- public long extractTimestamp(WaterSensor element, long recordTimestamp) {
- return element.getTs();
- }
- })
- )
- .keyBy(WaterSensor::getId)
- .process(new KeyedProcessFunction<String, WaterSensor, String>() {
- private long time;
- private int lastVc = 0;
- private boolean isFirst = true;
- @Override
- public void processElement(WaterSensor value,
- Context ctx,
- Collector<String> out) throws Exception {
- if (value.getVc() > lastVc){
- if (isFirst){
- //水位上升
- time =ctx.timestamp() +5000L;
- ctx.timerService().registerEventTimeTimer(time);
- isFirst=false;
- }
- }else {
- //水位下降或者不变
- ctx.timerService().deleteEventTimeTimer(time);
- time = ctx.timestamp() + 5000L;
- ctx.timerService().registerEventTimeTimer(time);
- }
- lastVc = value.getVc();
- }
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
- out.collect(ctx.getCurrentKey()+"水位已经连续5上升。。。。");
- isFirst=true;
- }
- })
- .print();
-
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |