pyflink的窗口

张裕  金牌会员 | 2024-9-19 17:17:41 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 344|帖子 344|积分 1032

PyFlink 中的窗口使用教程

在流处置惩罚应用中,窗口(Window)是一个非常紧张的概念,它用于对无界的数据流举行切分,使得我们可以对流中的数据实行聚合、计数、排序等使用。PyFlink 提供了丰富的窗口类型和使用,可以对流数据举行时间和计数等维度的切片,举行实时的数据处置惩罚。
在本教程中,我们将介绍 PyFlink 中的几种常见窗口类型,并展示如何使用窗口举行数据处置惩罚。
1. 安装 PyFlink

在开始之前,确保你已经安装了 PyFlink:
  1. pip install apache-flink
复制代码
2. 什么是窗口?

窗口(Window)是 Flink 处置惩罚无界数据流的核心技术,它将无限的数据流分别为有限的块,这样可以对这些块举行聚合、计数等使用。常见的窗口类型包括:


  • 滚动窗口(Tumbling Window):将数据流分别为不重叠的固定长度时间段。
  • 滑动窗口(Sliding Window):将数据流分别为固定长度的时间段,这些时间段可以相互重叠。
  • 会话窗口(Session Window):基于数据的活动时间来分别数据流,窗口之间有间隔(即活动的间歇)。
  • 计数窗口(Count Window):基于事件的数量而非时间分别窗口。
3. PyFlink 中的窗口使用

在 PyFlink 中,窗口通常和时间、事件一起使用,通过对数据流应用窗口函数来实行聚合使用。以下是几种常见的窗口使用。
4. 流环境设置

在 PyFlink 中,窗口使用通常在流模式下举行。首先,我们必要设置流环境并定义一些底子数据流。
  1. from pyflink.datastream import StreamExecutionEnvironment
  2. from pyflink.table import StreamTableEnvironment, EnvironmentSettings
  3. # 创建流执行环境
  4. env = StreamExecutionEnvironment.get_execution_environment()
  5. # 创建 Table 环境
  6. settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
  7. t_env = StreamTableEnvironment.create(env, environment_settings=settings)
复制代码
5. 时间特性设置

时间特性分为两种类型:事件时间(Event Time)和 处置惩罚时间(Processing Time)。事件时间基于事件生成时的时间,而处置惩罚时间基于 Flink 系统处置惩罚事件的时间。
设置事件时间(Event Time)

事件时间必要通过在数据流中添加时间戳和水印(Watermark)来支持。
  1. # 设置事件时间属性
  2. t_env.get_config().set_local_timezone('UTC')  # 使用 UTC 时区
复制代码
6. 创建窗口

6.1 滚动窗口(Tumbling Window)

滚动窗口会将数据流分别为固定长度的时间段,并且这些时间段互不重叠。
  1. from pyflink.table.window import Tumble
  2. from pyflink.table import expressions as expr
  3. # 创建示例表
  4. t_env.execute_sql("""
  5.     CREATE TEMPORARY TABLE source_table (
  6.         user_id STRING,
  7.         item STRING,
  8.         amount DOUBLE,
  9.         event_time TIMESTAMP(3),
  10.         WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
  11.     ) WITH (
  12.         'connector' = 'datagen'
  13.     )
  14. """)
  15. # 定义滚动窗口,窗口大小为10分钟
  16. result_table = t_env.from_path("source_table") \
  17.     .window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \
  18.     .group_by(expr.col("w"), expr.col("user_id")) \
  19.     .select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))
  20. # 输出查询结果
  21. result_table.execute().print()
复制代码
6.2 滑动窗口(Sliding Window)

滑动窗口将数据分别为固定长度的时间段,这些时间段可以相互重叠。窗口的滑动步长定义了相邻窗口的开始时间。
  1. from pyflink.table.window import Slide
  2. # 定义滑动窗口,窗口大小为10分钟,滑动步长为5分钟
  3. result_table = t_env.from_path("source_table") \
  4.     .window(Slide.over(expr.lit(10).minutes).every(expr.lit(5).minutes).on(expr.col("event_time")).alias("w")) \
  5.     .group_by(expr.col("w"), expr.col("user_id")) \
  6.     .select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))
  7. # 输出查询结果
  8. result_table.execute().print()
复制代码
6.3 会话窗口(Session Window)

会话窗口基于数据的活动时间和不活动时间来分别数据流。假如一段时间内没有新的事件到达,窗口会结束。
  1. from pyflink.table.window import Session
  2. # 定义会话窗口,不活动间隔为30分钟
  3. result_table = t_env.from_path("source_table") \
  4.     .window(Session.with_gap(expr.lit(30).minutes).on(expr.col("event_time")).alias("w")) \
  5.     .group_by(expr.col("w"), expr.col("user_id")) \
  6.     .select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))
  7. # 输出查询结果
  8. result_table.execute().print()
复制代码
6.4 计数窗口(Count Window)

计数窗口是基于记录的数量来分别窗口,而不是基于时间。比方,每 1000 条记录形成一个窗口。
  1. from pyflink.table.window import Tumble
  2. # 定义计数窗口,每 1000 条记录形成一个窗口
  3. result_table = t_env.from_path("source_table") \
  4.     .window(Tumble.over(expr.lit(1000).rows).on(expr.col("event_time")).alias("w")) \
  5.     .group_by(expr.col("w"), expr.col("user_id")) \
  6.     .select(expr.col("user_id"), expr.col("amount").sum.alias("total_spent"), expr.col("w").end.alias("window_end"))
  7. # 输出查询结果
  8. result_table.execute().print()
复制代码
7. 自定义窗口聚合函数

除了使用内置的窗口聚合函数(如 SUM, COUNT 等),你还可以自定义窗口聚合逻辑。
自定义聚合函数

  1. from pyflink.table.udf import AggregateFunction, udaf
  2. class AvgAggregateFunction(AggregateFunction):
  3.     def get_value(self, accumulator):
  4.         return accumulator[0] / accumulator[1] if accumulator[1] > 0 else 0
  5.     def create_accumulator(self):
  6.         return [0, 0]  # sum, count
  7.     def accumulate(self, accumulator, value):
  8.         accumulator[0] += value
  9.         accumulator[1] += 1
  10. # 注册自定义聚合函数
  11. avg_udaf = udaf(AvgAggregateFunction(), result_type='DOUBLE')
  12. # 使用自定义聚合函数计算窗口内平均值
  13. result_table = t_env.from_path("source_table") \
  14.     .window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \
  15.     .group_by(expr.col("w"), expr.col("user_id")) \
  16.     .select(expr.col("user_id"), avg_udaf(expr.col("amount")).alias("average_spent"), expr.col("w").end.alias("window_end"))
  17. # 输出查询结果
  18. result_table.execute().print()
复制代码
8. 完备示例

以下是一个包含窗口使用的完备 PyFlink 示例:
  1. from pyflink.table.window import Tumble
  2. from pyflink.table import expressions as expr
  3. from pyflink.table.udf import AggregateFunction, udaf
  4. from pyflink.datastream import StreamExecutionEnvironment
  5. from pyflink.table import StreamTableEnvironment, EnvironmentSettings
  6. # 设置流执行环境
  7. env = StreamExecutionEnvironment.get_execution_environment()
  8. settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
  9. t_env = StreamTableEnvironment.create(env, environment_settings=settings)
  10. # 创建示例表
  11. t_env.execute_sql("""
  12.     CREATE TEMPORARY TABLE source_table (
  13.         user_id STRING,
  14.         item STRING,
  15.         amount DOUBLE,
  16.         event_time TIMESTAMP(3),
  17.         WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
  18.     ) WITH (
  19.         'connector' = 'datagen'
  20.     )
  21. """)
  22. # 定义滚动窗口和自定义聚合函数
  23. class AvgAggregateFunction(AggregateFunction):
  24.     def get_value(self, accumulator):
  25.         return accumulator[0] / accumulator[1] if accumulator[1] > 0 else 0
  26.     def create_accumulator(self):
  27.         return [0, 0]
  28.     def accumulate(self, accumulator, value):
  29.         accumulator[0] += value
  30.         accumulator[1] += 1
  31. avg_udaf = udaf(AvgAggregateFunction(), result_type='DOUBLE')
  32. # 使用滚动窗口和自定义聚合函数
  33. result_table = t_env.from_path("source_table") \
  34.     .window(Tumble.over(expr.lit(10).minutes).on(expr.col("event_time")).alias("w")) \
  35.     .group
  36. _by(expr.col("w"), expr.col("user_id")) \
  37.     .select(expr.col("user_id"), avg_udaf(expr.col("amount")).alias("average_spent"), expr.col("w").end.alias("window_end"))
  38. # 输出结果
  39. result_table.execute().print()
复制代码
9. 总结

在 PyFlink 中,窗口是流处置惩罚的核心概念之一,允许你对无界数据流举行聚合、盘算和使用。Flink 提供了丰富的窗口类型,包括滚动窗口、滑动窗口、会话窗口和计数窗口,以满足不同场景下的需求。通过本教程,你可以学习如安在 PyFlink 中使用窗口对流数据举行处置惩罚,并通过自定义函数来实现更复杂的盘算逻辑。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张裕

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表