Flink处理函数解析(ProcessFunction和KeyedProcessFunction)

打印 上一主题 下一主题

主题 634|帖子 634|积分 1902

Flink中的处理函数(ProcessFunction和KeyedProcessFunction)在对于数据进行颗粒化的精确计算时使用较多,处理函数提供了一个定时服务(TimerService),可以向未来注册一个定时服务,我们可以把它理解为一个闹钟,当闹钟响起时,就调用ProcessFunction中的onTimer()方法,会对数据进行一些计算。我们来解析一下这两个函数。
本文基于Flink1.14版本
ProcessFunction

ProcessFunction是Flink中的较为底层的API,当我们对于DataStream调用process函数的时候,需要在里面传入一个对象,即new ProcessFunction[] {},ProcessFunction是一个抽象类,我们看一下这个抽象类的源码:
源码解析
  1. @PublicEvolving
  2. public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
  3.     private static final long serialVersionUID = 1L;
  4.     public ProcessFunction() {
  5.     }
  6.     public abstract void processElement(I var1, ProcessFunction<I, O>.Context var2, Collector<O> var3) throws Exception;
  7.     public void onTimer(long timestamp, ProcessFunction<I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
  8.     }
  9.     public abstract class OnTimerContext extends ProcessFunction<I, O>.Context {
  10.         public OnTimerContext() {
  11.             super();
  12.         }
  13.         public abstract TimeDomain timeDomain();
  14.     }
  15.     public abstract class Context {
  16.         public Context() {
  17.         }
  18.         public abstract Long timestamp();
  19.         public abstract TimerService timerService();
  20.         public abstract <X> void output(OutputTag<X> var1, X var2);
  21.     }
  22. }
复制代码
我们可以看到ProcessFunction这个抽象类有两个泛型参数分别是输入(Input)和输出(Output)以及它继承了AbstractRichFunction。
ProcessFunction中processElement这个方法是一个抽象方法,代表实现这个抽象类时必须实现这个方法。这个方法的参数有三个I var1, ProcessFunction.Context var2, Collector var3,我们来解析一下:var1的类型是上面抽象类的泛型I,代表输入的数据,var2的类型是ProcessFunction.Context,它是 Flink 中 ProcessFunction 的一个内部抽象类,用于为 processElement 方法提供上下文信息。
ProcessFunction.Context 的主要功能:

1.获取时间戳:
timestamp(): 这个方法返回当前正在处理的事件的时间戳。如果您使用的是事件时间(EventTime),这个时间戳就是事件本身的时间戳;如果是处理时间(ProcessingTime),这个值可能是 null。
2.定时服务访问:
timerService(): 提供对 TimerService 的访问,您可以用它来注册定时器和查询当前时间。这对于需要基于时间做决策的应用来说非常重要。
3.侧输出:
output(OutputTag tag, X value): 允许您将数据发送到非主输出(即侧输出)。侧输出可以用于将数据发送到多个不同的流,或者用于处理特殊情况的数据,例如错误记录或监控事件。
最后Collector 是一个泛型接口,其中 O 表示输出数据类型,可以使用 Collector 来发射任何类型的数据。Collector 允许从一个输入产生多个输出,这在处理复杂的逻辑或进行多阶段处理时非常有用。
我们来看一个例子:

现在我们有订单数据,需要处理每个订单字符串,如果订单金额超过100,则将其标记为"大订单",否则标记为"小订单"。
假设我们的订单数据是一个简单的字符串格式,包含订单ID和订单金额,格式为 "orderId,amount":
  1.     // 示例订单数据流
  2.     val orderDataStream: DataStream[String] = env.fromElements(
  3.       "order1,100.5",
  4.       "order2,50.5",
  5.       "order3,200.0"
  6.     )
复制代码
1.定义样例类

目的是为了将数据转为对象,便于操作
  1. case class Order(orderId: String, amount: Double)
复制代码
2.使用 ProcessFunction 处理订单数据
  1.     val processedStream = orderDataStream
  2.       .process(new OrderProcessFunction)  //OrderProcessFunction为自己实现的ProcessFunction
复制代码
3.实现OrderProcessFunction
  1. class OrderProcessFunction extends ProcessFunction[String, String] {
  2.   override def processElement(
  3.       value: String,
  4.       ctx: ProcessFunction[String, String]#Context,
  5.       out: Collector[String]): Unit = {
  6.     // 解析订单数据
  7.     val parts = value.split(",")        //使用逗号分割数据
  8.     val order = Order(parts(0), parts(1).toDouble)        //将数据转为对象
  9.     // 根据金额分类订单
  10.     val orderType = if (order.amount > 100) "大订单" else "小订单"
  11.     out.collect(s"订单ID: ${order.orderId}, 类型: $orderType")        //将数据发送输出
  12.   }
  13. }
复制代码
3.打印处理后的结果
  1.     processedStream.print()
复制代码
结果为:
  1. 订单ID: order1, 类型: 大订单
  2. 订单ID: order2, 类型: 小订单
  3. 订单ID: order3, 类型: 大订单
复制代码
完整代码:
  1. import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.util.Collector// 定义订单数据的样例类case class Order(orderId: String, amount: Double)object OrderProcessing {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 示例订单数据流
  2.     val orderDataStream: DataStream[String] = env.fromElements(
  3.       "order1,100.5",
  4.       "order2,50.5",
  5.       "order3,200.0"
  6.     )    // 使用 ProcessFunction 处理订单数据    val processedStream = orderDataStream      .process(new OrderProcessFunction)    // 打印处理后的结果    processedStream.print()    env.execute("Order Processing")  }}class OrderProcessFunction extends ProcessFunction[String, String] {  override def processElement(      value: String,      ctx: ProcessFunction[String, String]#Context,      out: Collector[String]): Unit = {    // 解析订单数据    val parts = value.split(",")    val order = Order(parts(0), parts(1).toDouble)    // 根据金额分类订单    val orderType = if (order.amount > 100) "大订单" else "小订单"    out.collect(s"订单ID: ${order.orderId}, 类型: $orderType")  }}
复制代码
KeyedProcessFunction

KeyedProcessFunction允许对一个键控(keyed)流的每个元素进行操作,并提供对状态和定时器的访问。这使得它非常适合处理需要细粒度控制的场景。
键控流:在 Flink 中,一个键控流是通过 keyBy 方法从一个普通流创建的。在键控流中,所有的元素都根据指定的键分组。
KeyedProcessFunction 有两个主要方法:

processElement:

输入:每个流元素,以及上下文信息(Context)。
功能:处理每个元素,可以访问当前元素的键、状态和定时器。
使用场景:适用于每个事件的独立处理,如更新状态、发射输出、注册定时器等。
onTimer:

输入:定时器触发的时间戳,以及定时器上下文信息(OnTimerContext)。
功能:在特定时间点触发的逻辑处理。
使用场景:适用于需要基于时间的操作,如超时检测、定时输出等。


KeyedProcessFunction 还有附加特性:

状态管理:KeyedProcessFunction 支持 Flink 的状态管理,可以为每个键保存和更新状态。这在需要跟踪每个键的历史信息或执行聚合操作时非常有用。
定时器:可以注册处理时间(processing time)或事件时间(event time)定时器。定时器在指定的时间点触发,执行 onTimer 方法中的逻辑。
侧输出(Side Outputs):除了主输出流外,还可以发射数据到侧输出标签。这允许您处理如异常数据、监控事件等特殊用途的输出。
源码解析

我们来看一下KeyedProcessFunction的源码:
  1. @PublicEvolving
  2. public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
  3.     private static final long serialVersionUID = 1L;
  4.     public KeyedProcessFunction() {
  5.     }
  6.     public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;
  7.     public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
  8.     }
  9.     public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
  10.         public OnTimerContext() {
  11.             super();
  12.         }
  13.         public abstract TimeDomain timeDomain();
  14.         public abstract K getCurrentKey();
  15.     }
  16.     public abstract class Context {
  17.         public Context() {
  18.         }
  19.         public abstract Long timestamp();
  20.         public abstract TimerService timerService();
  21.         public abstract <X> void output(OutputTag<X> var1, X var2);
  22.         public abstract K getCurrentKey();
  23.     }
  24. }
复制代码
KeyedProcessFunction 是一个抽象类,用于处理键控流中的元素。它允许对每个元素进行复杂的处理,并提供了对定时器和状态的控制。其中 K (即"Key)表示键的类型,I 表示输入元素的类型,O 表示输出元素的类型。
KeyedProcessFunction继承自 AbstractRichFunction,表明 KeyedProcessFunction 可以访问 Flink 的富函数(rich function)特性,如生命周期方法和运行时上下文。
processElement方法和onTimer方法与ProcessFunction中的类似
OnTimerContext类 是 Context 的子类,为定时器提供额外的上下文信息。其中的timeDomain() 能返回当前定时器的时间域(处理时间或事件时间)。getCurrentKey() 可以返回当前处理的键。
Context 类中的 timestamp() 用于返回当前处理元素的时间戳。timerService()提供对定时器服务的访问,允许注册和取消定时器。output(OutputTag var1, X var2)允许将数据发送到侧输出。getCurrentKey()返回当前处理的键。

下面我们来看一个使用KeyedProcessFunction处理订单数据的例子:例子二:

假设我们有订单数据流,每个订单包含订单ID、用户ID、订单金额和时间戳。现在我们需要计算每个用户的订单总金额,同时对于每个用户,如果在指定时间内没有新的订单,发出一个警告消息,以及对于超过特定金额的订单,将其发送到一个侧输出。
数据源:
  1.     // 示例订单数据流
  2.     val orderDataStream: DataStream[Order] = env.fromElements(
  3.       Order("order1", "user1", 50.0, 1000),
  4.       Order("order2", "user2", 300.0, 2000),
  5.       Order("order3", "user1", 200.0, 3000),
  6.     )
复制代码
1.定义订单样例类
  1. case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)
复制代码
2.定义侧输出标签
  1. val largeOrderTag = new OutputTag[String]("large-order")
复制代码
3.使用 ProcessFunction 处理订单数据
  1.     val processedStream = orderDataStream
  2.       .keyBy(_.userId)
  3.       .process(new ComplexOrderProcessFunction)
复制代码
4.实现 ComplexOrderProcessFunction
  1. class ComplexOrderProcessFunction extends KeyedProcessFunction[String, Order, String] {
  2.   // 每个用户的订单总额
  3.   val userAmounts = mutable.HashMap[String, Double]()
  4.   // 定时器时间
  5.   val timerInterval: Long = 60000 // 1分钟
  6.   override def processElement(
  7.       order: Order,
  8.       ctx: KeyedProcessFunction[String, Order, String]#Context,
  9.       out: Collector[String]): Unit = {
  10.     // 更新用户的订单总额
  11.     val totalAmount = userAmounts.getOrElse(order.userId, 0.0) + order.amount
  12.     userAmounts(order.userId) = totalAmount
  13.     // 检查订单是否超过特定金额,如果是,则输出到侧输出
  14.     if (order.amount > 250.0) {
  15.       ctx.output(largeOrderTag, s"大额订单: ${order.orderId} - ${order.amount}")
  16.     }
  17.     // 注册定时器,当前时间加上间隔
  18.     ctx.timerService().registerEventTimeTimer(order.timestamp + timerInterval)
  19.     // 输出用户的订单总额
  20.     out.collect(s"用户 ${order.userId} 的订单总额: $totalAmount")
  21.   }
  22.   override def onTimer(
  23.       timestamp: Long,
  24.       ctx: KeyedProcessFunction[String, Order, String]#OnTimerContext,
  25.       out: Collector[String]): Unit = {
  26.     // 定时器触发,发出警告
  27.     out.collect(s"用户 ${ctx.getCurrentKey} 在过去的一分钟内没有新的订单。")
  28.   }
  29. }
复制代码
5.打印输出
  1.     // 打印主输出
  2.     processedStream.print()
  3.     // 打印侧输出
  4.     processedStream.getSideOutput(largeOrderTag).print("large-orders")
复制代码
在这个例子中:

  • 当一个订单到达时,我们更新基于用户ID的订单总额。
  • 如果订单金额超过 250,则将其作为大额订单输出到侧输出。
  • 为每个用户设置一个定时器,如果在一分钟内没有新订单,则发出警告。
  • 在主输出中,我们发射每个用户的订单总额。
结果:
  1. large-orders:1> 大额订单: order2 - 300.0
  2. 1> 用户 user2 的订单总额: 300.0
  3. 1> 用户 user2 在过去的一分钟内没有新的订单。
  4. 12> 用户 user1 的订单总额: 50.0
  5. 12> 用户 user1 的订单总额: 250.0
  6. 12> 用户 user1 在过去的一分钟内没有新的订单。
  7. 12> 用户 user1 在过去的一分钟内没有新的订单。
复制代码
完整代码:
  1. // 定义订单样例类case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)object ComplexOrderProcessing {  // 侧输出标签  val largeOrderTag = new OutputTag[String]("large-order")  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 示例订单数据流
  2.     val orderDataStream: DataStream[Order] = env.fromElements(
  3.       Order("order1", "user1", 50.0, 1000),
  4.       Order("order2", "user2", 300.0, 2000),
  5.       Order("order3", "user1", 200.0, 3000),
  6.     )    // 使用 ProcessFunction 处理订单数据    val processedStream = orderDataStream
  7.       .keyBy(_.userId)
  8.       .process(new ComplexOrderProcessFunction)    // 打印主输出
  9.     processedStream.print()
  10.     // 打印侧输出
  11.     processedStream.getSideOutput(largeOrderTag).print("large-orders")    env.execute("Complex Order Processing")  }}class ComplexOrderProcessFunction extends KeyedProcessFunction[String, Order, String] {  // 每个用户的订单总额  val userAmounts = mutable.HashMap[String, Double]()  // 定时器时间  val timerInterval: Long = 60000 // 1分钟  override def processElement(                               order: Order,                               ctx: KeyedProcessFunction[String, Order, String]#Context,                               out: Collector[String]): Unit = {    // 更新用户的订单总额    val totalAmount = userAmounts.getOrElse(order.userId, 0.0) + order.amount    userAmounts(order.userId) = totalAmount    // 检查订单是否超过特定金额,如果是,则输出到侧输出    if (order.amount > 250.0) {      ctx.output(largeOrderTag, s"大额订单: ${order.orderId} - ${order.amount}")    }    // 注册定时器,当前时间加上间隔    ctx.timerService().registerEventTimeTimer(order.timestamp + timerInterval)    // 输出用户的订单总额    out.collect(s"用户 ${order.userId} 的订单总额: $totalAmount")  }  override def onTimer(                        timestamp: Long,                        ctx: KeyedProcessFunction[String, Order, String]#OnTimerContext,                        out: Collector[String]): Unit = {    // 定时器触发,发出警告    out.collect(s"用户 ${ctx.getCurrentKey} 在过去的一分钟内没有新的订单。")  }}
复制代码
以上就是本文的全部内容啦(͏ ˉ ꈊ ˉ)✧˖°

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

前进之路

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表