8.Flink变乱驱动应用、处理函数(Process Functions),KeyedProcessFuncti ...

打印 上一主题 下一主题

主题 843|帖子 843|积分 2531

目录


Flink专栏目录(点击进入…)


  
变乱驱动应用

处理函数(Process Functions)

ProcessFunction 将变乱处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建变乱驱动应用步调的基础。它和 RichFlatMapFunction 非常相似, 但是增长了 Timer。
处理函数(Process Functions) 是一种非常灵活的操作,答应开发者对流中的变乱进行低层次的访问与操作,适用于复杂的状态管理、时间处理(变乱时间和处理时间)、窗口操作等场景。
Flink 提供了两种常见的 Process Functions:
(1)KeyedProcessFunction
适用于已经通过 keyBy 分组的数据流。每个键(Key)都有本身的独立状态和定时器。
(2)ProcessFunction
适用于未分组(即未通过 keyBy 操作)的数据流。
(1)KeyedProcessFunction

KeyedProcessFunction<K, I, O> 是用于处理键控流(KeyedStream)的函数。它的输入数据流通过 keyBy() 进行了分组,流中的数据根据键值分别,每个键对应一条逻辑上独立的流。KeyedProcessFunction主要用于在这些键控流上实行状态管理和变乱时间操作
主要功能
(1)状态管理
可以使用 ValueState、ListState、MapState 等在函数内部存储状态。
(2)定时器(Timers)
可以在流处理过程中注册定时器,根据变乱时间或处理时间触发。
(3)自定义处理逻辑
可以对每个流中的变乱进行详细的处理,乃至可以发出新的变乱或修改原始变乱。
KeyedProcessFunction结构
  1. KeyedProcessFunction<KEY, IN, OUT>
复制代码
参数描述KEY数据流通过 keyBy() 操作天生的键的范例IN输入流中的元素范例OUT处理函数输出流的元素范例 方法描述processElement()对每个流元素进行处理的核心方法,答应访问上下文(上下文提供键、定时器等信息)onTimer()当注册的定时器触发时会调用该方法,可用于处理变乱时间窗口、延迟变乱等场景 典型应用场景
(1)复杂的状态操作
例如:基于键的变乱累加。
(2)变乱时间驱动的操作
如延迟处理、时间窗口结束时的聚合操作。
示例:
  1. public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Event, Result> {
  2.    
  3.     // 定义状态
  4.     private transient ValueState<Integer> countState;
  5.     @Override
  6.     public void open(Configuration parameters) throws Exception {
  7.         // 初始化状态
  8.         ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
  9.             "countState",
  10.             Integer.class
  11.         );
  12.         countState = getRuntimeContext().getState(descriptor);
  13.     }
  14.     @Override
  15.     public void processElement(Event event, Context context, Collector<Result> collector) throws Exception {
  16.         // 获取当前状态值
  17.         Integer currentCount = countState.value();
  18.         if (currentCount == null) {
  19.             currentCount = 0;
  20.         }
  21.         // 状态更新逻辑
  22.         currentCount += 1;
  23.         countState.update(currentCount);
  24.         // 注册一个基于事件时间的定时器
  25.         context.timerService().registerEventTimeTimer(event.getEventTime() + 5000L);
  26.     }
  27.     @Override
  28.     public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
  29.         // 定时器触发后的操作
  30.         Integer count = countState.value();
  31.         out.collect(new Result("Key: " + ctx.getCurrentKey(), "Count: " + count));
  32.     }
  33.    
  34. }
复制代码
(2)ProcessFunction 示例

ProcessFunction<I, O> 是用于处理非键控流(未经过 keyBy() 操作的 DataStream)的函数。它提供了对流中的每个元素的灵活处理本领,但没有键的概念,因此不能进行基于键的状态管理。
主要功能
(1)处理流中的每个元素
可以对流中的每个变乱进行处理、过滤、修改等操作。
(2)注册定时器
可以根据处理时间或变乱时间注册定时器。
(3)灵活控制输出
可以对流中的变乱进行操作后,输出到差别的流或收集器。
ProcessFunction结构
  1. ProcessFunction<IN, OUT>
复制代码
参数描述IN输入流中的元素范例OUT输出流中的元素范例 ProcessFunction方法
方法描述processElement()这是 ProcessFunction 的核心方法,用于处理流中的每个元素。它提供了对上下文信息的访问,但上下文中不包含键的概念onTimer()当注册的定时器触发时调用,用于实行定时操作,通常用于处理时间相关的逻辑 典型应用场景
(1)未分组流的处理
如在流中对每个元素进行转换、过滤等操作。
(2)基于时间的触发器
基于处理时间进行的定时操作或变乱驱动操作。
示例:
  1. public class MyProcessFunction extends ProcessFunction<Event, Result> {
  2.     @Override
  3.     public void processElement(Event event, Context context, Collector<Result> collector) throws Exception {
  4.         // 输出事件信息
  5.         collector.collect(new Result("Received event with ID: " + event.getId()));
  6.         
  7.         // 注册一个处理时间定时器
  8.         context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 10000L);
  9.     }
  10.     @Override
  11.     public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
  12.         // 处理定时器触发后的逻辑
  13.         out.collect(new Result("Timer triggered at: " + timestamp));
  14.     }
  15. }
复制代码

旁路输出(Side Outputs)

旁路输出(Side Outputs) 是一种答应将处理流中的变乱分流到多个输出的机制。使用侧输出可以将处理过程中符合特定条件的变乱输出到差别的流中,从而避免在主输出流中产生不相关或无关的数据。
主要特点
(1)灵活性
可以根据业务逻辑创建差别的侧输出流,处理多样化的需求。
(2)轻松管理多输出
无需复杂的分支或条件判断,轻松将数据拆分为多个流。
典型应用场景
(1)过滤或分流数据
将不符合主处理逻辑的变乱分到侧输出。
(2)异常处理(exceptions)
在数据流处理中,错误或异常变乱被输出到专门的错误流。
(3)多维度盘算
可以从同一数据流中提取差别维度的数据并输出到多个差别的流。
(4)延迟的变乱(late events)
(5)operator 告警(operational alerts),如与外部服务的连接超时
(6)格式错误的变乱(malformed events)
使用步骤
(1)定义侧输出标签
首先必要定义一个 OutputTag,它标识某种范例的侧输出流。
(2)天生侧输出
在 ProcessFunction 或 KeyedProcessFunction 中,通过 context.output() 方法将数据发送到侧输出。
(3)获取侧输出流
在主处理流中,通过 getSideOutput() 方法来提取侧输出流。
代码示例

1、定义 OutputTag

OutputTag 用于标识侧输出流。它通常是通过泛型指定范例的。
旁路输出标签,用于标识侧输出流的范例。在定义时,必须明确输出流的数据范例。Flink 使用它来区分差别的侧输出流。
  1. // 定义一个字符串类型的侧输出流标签
  2. final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
复制代码
2、天生侧输出

在 ProcessFunction 或 KeyedProcessFunction 中,可以根据业务逻辑将数据发送到侧输出。
在 processElement() 方法中,通过 context.output() 方法将变乱发送到侧输出流中。context.output(OutputTag<T>, T) 必要两个参数,一个是 OutputTag,另一个是输出的值。
  1. public class MyProcessFunction extends ProcessFunction<Event, Event> {
  2.     @Override
  3.     public void processElement(Event event, Context context, Collector<Event> collector) throws Exception {
  4.         // 将满足条件的事件输出到主流
  5.         if (event.isValid()) {
  6.             collector.collect(event);
  7.         } else {
  8.             // 将不符合条件的事件输出到侧输出流
  9.             context.output(outputTag, "Invalid event with ID: " + event.getId());
  10.         }
  11.     }
  12.    
  13. }
复制代码
3、获取侧输出流

在主流处理完之后,可以通过 getSideOutput() 方法获取侧输出流。
主数据流处理完成后,可以通过此方法从主流中提取侧输出流。提取出来的流可以像平凡的 DataStream 一样继续进行处理。
  1. DataStream<Event> mainStream = ...;  // 主流
  2. // 获取侧输出流
  3. DataStream<String> sideOutputStream = mainStream.getSideOutput(outputTag);
  4. // 处理侧输出流
  5. sideOutputStream.print("Side Output:");
复制代码
注意事项

(1)每个侧输出流的数据范例必须唯一
每个 OutputTag 只能指定一种输出范例,不能同时输出多种范例的数据。
(2)主流与侧输出流是独立的
主流和侧输出流是完全独立的,彼此之间不会相互影响。
(3)侧输出的性能
侧输出通常不消于大规模数据分发,更多用于辅助的分支处理。对于大规模数据分发,推荐使用 Flink 的流分区机制或专门的流分发策略。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81428

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表