Flink-sql-整理-窗口函数

一给  金牌会员 | 2025-3-23 02:51:56 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 948|帖子 948|积分 2844

1 窗口表值函数(Windowing TVFs)

1.滚动窗口
2.滑动窗口
3.累积窗口
4.会话窗口 (目前仅支持流模式)
**简述:**逻辑上,每个元素可以应用于一个或多个窗口,这取决于所使用的
窗口表值函数。例如:滑动窗口可以把单个元素分配给多个窗口。是 分组窗口函数 (已颠末时)的替代方案。
1 滚动窗口(TUMBLE)

TUMBLE 的返回值包罗原始表的所有列和附加的三个用于指定窗口的列,分别是:“window_start”,“window_end”,“window_time”。
  1. TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
  2. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
复制代码


  • data :拥偶然间属性列的表。
  • timecol :列形貌符,决定命据的哪个时间属性列应该映射到窗口。
  • size :窗口的大小(时长)。
  • offset :窗口的偏移量 [非必填]。
  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. |        name |                   type | null | key | extras |                       watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. |     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
  7. |       price |         DECIMAL(10, 2) | true |     |        |                                 |
  8. |        item |                 STRING | true |     |        |                                 |
  9. +-------------+------------------------+------+-----+--------+---------------------------------+
  10. Flink SQL> SELECT * FROM Bid;
  11. +------------------+-------+------+
  12. |          bidtime | price | item |
  13. +------------------+-------+------+
  14. | 2020-04-15 08:05 |  4.00 | C    |
  15. | 2020-04-15 08:07 |  2.00 | A    |
  16. | 2020-04-15 08:09 |  5.00 | D    |
  17. | 2020-04-15 08:11 |  3.00 | B    |
  18. | 2020-04-15 08:13 |  1.00 | E    |
  19. | 2020-04-15 08:17 |  6.00 | F    |
  20. +------------------+-------+------+
  21. Flink SQL> SELECT * FROM TABLE(
  22.    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
  23. -- or with the named params
  24. -- note: the DATA param must be the first
  25. Flink SQL> SELECT * FROM TABLE(
  26.    TUMBLE(
  27.      DATA => TABLE Bid,
  28.      TIMECOL => DESCRIPTOR(bidtime),
  29.      SIZE => INTERVAL '10' MINUTES));
  30. +------------------+-------+------+------------------+------------------+-------------------------+
  31. |          bidtime | price | item |     window_start |       window_end |            window_time  |
  32. +------------------+-------+------+------------------+------------------+-------------------------+
  33. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  34. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  35. | 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  36. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  37. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  38. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  39. +------------------+-------+------+------------------+------------------+-------------------------+
  40. -- apply aggregation on the tumbling windowed table
  41. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  42.   FROM TABLE(
  43.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  44.   GROUP BY window_start, window_end;
  45. +------------------+------------------+-------------+
  46. |     window_start |       window_end | total_price |
  47. +------------------+------------------+-------------+
  48. | 2020-04-15 08:00 | 2020-04-15 08:10 |       11.00 |
  49. | 2020-04-15 08:10 | 2020-04-15 08:20 |       10.00 |
  50. +------------------+------------------+-------------+
复制代码
2 滑动窗口(HOP)

  1. HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
  2. HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
复制代码


  • data:拥偶然间属性列的表。
  • timecol:列形貌符,决定命据的哪个时间属性列应该映射到窗口。
  • slide:窗口的滑动步长。
  • size:窗口的大小(时长)。
  • offset:窗口的偏移量 [非必填]
  1. > SELECT * FROM TABLE(
  2.     HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
  3. -- or with the named params
  4. -- note: the DATA param must be the first
  5. > SELECT * FROM TABLE(
  6.     HOP(
  7.       DATA => TABLE Bid,
  8.       TIMECOL => DESCRIPTOR(bidtime),
  9.       SLIDE => INTERVAL '5' MINUTES,
  10.       SIZE => INTERVAL '10' MINUTES));
  11. +------------------+-------+------+------------------+------------------+-------------------------+
  12. |          bidtime | price | item |     window_start |       window_end |           window_time   |
  13. +------------------+-------+------+------------------+------------------+-------------------------+
  14. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  15. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  16. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  17. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  18. | 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  19. | 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  20. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  21. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  22. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  23. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  24. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  25. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
  26. +------------------+-------+------+------------------+------------------+-------------------------+
  27. -- apply aggregation on the hopping windowed table
  28. > SELECT window_start, window_end, SUM(price) AS total_price
  29.   FROM TABLE(
  30.     HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  31.   GROUP BY window_start, window_end;
  32. +------------------+------------------+-------------+
  33. |     window_start |       window_end | total_price |
  34. +------------------+------------------+-------------+
  35. | 2020-04-15 08:00 | 2020-04-15 08:10 |       11.00 |
  36. | 2020-04-15 08:05 | 2020-04-15 08:15 |       15.00 |
  37. | 2020-04-15 08:10 | 2020-04-15 08:20 |       10.00 |
  38. | 2020-04-15 08:15 | 2020-04-15 08:25 |        6.00 |
  39. +------------------+------------------+-------------+
复制代码
3 累积窗口(CUMULATE)

CUMULATE 函数有三个必传参数,一个可选参数:
  1. CUMULATE(TABLE EventTable, DESCRIPTOR(event_time), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
复制代码


  • data :拥偶然间属性列的表。
  • timecol :列形貌符,决定命据的哪个时间属性列应该映射到窗口。
  • size :窗口的大小(时长)。
  • offset :窗口的偏移量 [非必填]
  1. > SELECT * FROM TABLE(
  2.     CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
  3. -- or with the named params
  4. -- note: the DATA param must be the first
  5. > SELECT * FROM TABLE(
  6.     CUMULATE(
  7.       DATA => TABLE Bid,
  8.       TIMECOL => DESCRIPTOR(bidtime),
  9.       STEP => INTERVAL '2' MINUTES,
  10.       SIZE => INTERVAL '10' MINUTES));
  11. +------------------+-------+------+------------------+------------------+-------------------------+
  12. |          bidtime | price | item |     window_start |       window_end |            window_time  |
  13. +------------------+-------+------+------------------+------------------+-------------------------+
  14. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
  15. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
  16. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  17. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
  18. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  19. | 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  20. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
  21. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  22. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
  23. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  24. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  25. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  26. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
  27. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  28. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  29. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  30. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  31. +------------------+-------+------+------------------+------------------+-------------------------+
  32. -- apply aggregation on the cumulating windowed table
  33. > SELECT window_start, window_end, SUM(price) AS total_price
  34.   FROM TABLE(
  35.     CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  36.   GROUP BY window_start, window_end;
  37. +------------------+------------------+-------------+
  38. |     window_start |       window_end | total_price |
  39. +------------------+------------------+-------------+
  40. | 2020-04-15 08:00 | 2020-04-15 08:06 |        4.00 |
  41. | 2020-04-15 08:00 | 2020-04-15 08:08 |        6.00 |
  42. | 2020-04-15 08:00 | 2020-04-15 08:10 |       11.00 |
  43. | 2020-04-15 08:10 | 2020-04-15 08:12 |        3.00 |
  44. | 2020-04-15 08:10 | 2020-04-15 08:14 |        4.00 |
  45. | 2020-04-15 08:10 | 2020-04-15 08:16 |        4.00 |
  46. | 2020-04-15 08:10 | 2020-04-15 08:18 |       10.00 |
  47. | 2020-04-15 08:10 | 2020-04-15 08:20 |       10.00 |
  48. +------------------+------------------+-------------+
复制代码
4 会话窗口(SESSION)

**简述:**比如,定义一个不生动间隙时长为 10 分钟的会话窗口。 假如同一用户两个事件之间的时间隔断小于 10 分钟,这些事件将会被归入到同一个会话窗口中。 假如在最新事件后的 10 分钟内没有数据,那么这个会话窗口将会关闭,并被发送到下游。 随后的事件将会被分配到一个新的会话窗口里。
  1. SESSION 有三个必填参数和一个可选参数:
  2. SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
  3. SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
复制代码


  • data:拥偶然间属性列的表。
  • keycols:列形貌符,决定会话窗口应该使用哪些列来分区数据。
  • timecol:列形貌符,决定命据的哪个时间属性列应该映射到窗口。
  • gap:两个事件被以为属于同一个会话窗口的最大时间隔断。
  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. |        name |                   type | null | key | extras |                       watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. |     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
  7. |       price |         DECIMAL(10, 2) | true |     |        |                                 |
  8. |        item |                 STRING | true |     |        |                                 |
  9. +-------------+------------------------+------+-----+--------+---------------------------------+
  10. Flink SQL> SELECT * FROM Bid;
  11. +------------------+-------+------+
  12. |          bidtime | price | item |
  13. +------------------+-------+------+
  14. | 2020-04-15 08:07 |  4.00 | A    |
  15. | 2020-04-15 08:06 |  2.00 | A    |
  16. | 2020-04-15 08:09 |  5.00 | B    |
  17. | 2020-04-15 08:08 |  3.00 | A    |
  18. | 2020-04-15 08:17 |  1.00 | B    |
  19. +------------------+-------+------+
  20. -- session window with partition keys
  21. > SELECT * FROM TABLE(
  22.     SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
  23. -- or with the named params  
  24. -- note: the DATA param must be the first   
  25. > SELECT * FROM TABLE(
  26.     SESSION(
  27.       DATA => TABLE Bid PARTITION BY item,
  28.       TIMECOL => DESCRIPTOR(bidtime),
  29.       GAP => INTERVAL '5' MINUTES);
  30. +------------------+-------+------+------------------+------------------+-------------------------+
  31. |          bidtime | price | item |     window_start |       window_end |             window_time |
  32. +------------------+-------+------+------------------+------------------+-------------------------+
  33. | 2020-04-15 08:07 |  4.00 | A    | 2020-04-15 08:06 | 2020-04-15 08:13 | 2020-04-15 08:12:59.999 |
  34. | 2020-04-15 08:06 |  2.00 | A    | 2020-04-15 08:06 | 2020-04-15 08:13 | 2020-04-15 08:12:59.999 |
  35. | 2020-04-15 08:08 |  3.00 | A    | 2020-04-15 08:06 | 2020-04-15 08:13 | 2020-04-15 08:12:59.999 |
  36. | 2020-04-15 08:09 |  5.00 | B    | 2020-04-15 08:09 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  37. | 2020-04-15 08:17 |  1.00 | B    | 2020-04-15 08:17 | 2020-04-15 08:22 | 2020-04-15 08:21:59.999 |
  38. +------------------+-------+------+------------------+------------------+-------------------------+
  39. -- apply aggregation on the session windowed table with partition keys
  40. > SELECT window_start, window_end, item, SUM(price) AS total_price
  41.   FROM TABLE(
  42.       SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  43.   GROUP BY item, window_start, window_end;
  44. +------------------+------------------+------+-------------+
  45. |     window_start |       window_end | item | total_price |
  46. +------------------+------------------+------+-------------+
  47. | 2020-04-15 08:06 | 2020-04-15 08:13 | A    |        9.00 |
  48. | 2020-04-15 08:09 | 2020-04-15 08:14 | B    |        5.00 |
  49. | 2020-04-15 08:17 | 2020-04-15 08:22 | B    |        1.00 |
  50. +------------------+------------------+------+-------------+
  51. -- session window without partition keys
  52. > SELECT * FROM TABLE(
  53.     SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
  54. -- or with the named params
  55. -- note: the DATA param must be the first
  56. > SELECT * FROM TABLE(
  57.     SESSION(
  58.       DATA => TABLE Bid,
  59.       TIMECOL => DESCRIPTOR(bidtime),
  60.       GAP => INTERVAL '5' MINUTES);
  61. +------------------+-------+------+------------------+------------------+-------------------------+
  62. |          bidtime | price | item |     window_start |       window_end |             window_time |
  63. +------------------+-------+------+------------------+------------------+-------------------------+
  64. | 2020-04-15 08:07 |  4.00 | A    | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  65. | 2020-04-15 08:06 |  2.00 | A    | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  66. | 2020-04-15 08:08 |  3.00 | A    | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  67. | 2020-04-15 08:09 |  5.00 | B    | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  68. | 2020-04-15 08:17 |  1.00 | B    | 2020-04-15 08:17 | 2020-04-15 08:22 | 2020-04-15 08:21:59.999 |
  69. +------------------+-------+------+------------------+------------------+-------------------------+
  70. -- apply aggregation on the session windowed table without partition keys
  71. > SELECT window_start, window_end, SUM(price) AS total_price
  72.   FROM TABLE(
  73.       SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  74.   GROUP BY window_start, window_end;
  75. +------------------+------------------+-------------+
  76. |     window_start |       window_end | total_price |
  77. +------------------+------------------+-------------+
  78. | 2020-04-15 08:06 | 2020-04-15 08:14 |       14.00 |
  79. | 2020-04-15 08:17 | 2020-04-15 08:22 |        1.00 |
  80. +------------------+------------------+-------------+
复制代码
窗口偏移

Offset 就是可用理解把窗口函数所弄出来的窗口团体向前向后自动offset (真实数据所在窗口肯定包含真实数据的范围)
Offset 可选参数,可以用来改变窗口的分配。可以是正大概负的区间。默认环境下窗口的偏移是 0。不同的偏移值可以决定记载分配的窗口。 例如:在 10 分钟大小的滚动窗口下,时间戳为 2021-06-30 00:00:04 的数据会被分配到那个窗口呢?
  1. - 当 offset 为 -16 MINUTE,数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。
  2. - 当 offset 为 -6 MINUTE,数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。
  3. - 当 offset 为 -4 MINUTE,数据会分配到窗口 [2021-06-29 23:56:00, 2021-06-30 00:06:00)。
  4. - 当 offset 为 0,数据会分配到窗口 [2021-06-30 00:00:00, 2021-06-30 00:10:00)。
  5. - 当 offset 为 4 MINUTE,数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。
  6. - 当 offset 为 6 MINUTE,数据会分配到窗口 [2021-06-29 23:56:00, 2021-06-30 00:06:00)。
  7. - 当 offset 为 16 MINUTE,数据会分配到窗口 [2021-06-29 23:56:00, 2021-06-30 00:06:00)。
复制代码
  我们可以发现,有些不同的窗口偏移参数对窗口分配的影响是一样的。在上面的例子中,-16 MINUTE,-6 MINUTE 和 4
MINUTE 对 10 分钟大小的滚动窗口结果雷同。
  **
   *注意:窗口偏移只影响窗口的分配,并不会影响 Watermark ***
  1. -- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
  2. --  window table-valued function should be used with aggregate operation,
  3. --  this example is just used for explaining the syntax and the data produced by table-valued function.
  4. Flink SQL> SELECT * FROM TABLE(
  5.    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
  6. -- or with the named params
  7. -- note: the DATA param must be the first and `OFFSET` should be wrapped with double quotes
  8. Flink SQL> SELECT * FROM TABLE(
  9.    TUMBLE(
  10.      DATA => TABLE Bid,
  11.      TIMECOL => DESCRIPTOR(bidtime),
  12.      SIZE => INTERVAL '10' MINUTES,
  13.      `OFFSET` => INTERVAL '1' MINUTES));
  14. +------------------+-------+------+------------------+------------------+-------------------------+
  15. |          bidtime | price | item |     window_start |       window_end |            window_time  |
  16. +------------------+-------+------+------------------+------------------+-------------------------+
  17. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  18. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  19. | 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  20. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  21. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  22. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  23. +------------------+-------+------+------------------+------------------+-------------------------+
  24. -- apply aggregation on the tumbling windowed table
  25. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  26.   FROM TABLE(
  27.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  28.   GROUP BY window_start, window_end;
  29. +------------------+------------------+-------------+
  30. |     window_start |       window_end | total_price |
  31. +------------------+------------------+-------------+
  32. | 2020-04-15 08:01 | 2020-04-15 08:11 |       11.00 |
  33. | 2020-04-15 08:11 | 2020-04-15 08:21 |       10.00 |
  34. +------------------+------------------+-------------+
复制代码
窗口聚合 group by + tvf

概述:
   窗口聚合是通过 GROUP BY 子句定义的,其特性是包含 窗口表值函数 产生的 “window_start” 和 “window_end”
列和平凡的 GROUP BY 子句一样,窗口聚合对于每个组会盘算出一行数据。
  1. -- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
  2. --  window table-valued function should be used with aggregate operation,
  3. --  this example is just used for explaining the syntax and the data produced by table-valued function.
  4. Flink SQL> SELECT * FROM TABLE(
  5.    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
  6. -- or with the named params
  7. -- note: the DATA param must be the first and `OFFSET` should be wrapped with double quotes
  8. Flink SQL> SELECT * FROM TABLE(
  9.    TUMBLE(
  10.      DATA => TABLE Bid,
  11.      TIMECOL => DESCRIPTOR(bidtime),
  12.      SIZE => INTERVAL '10' MINUTES,
  13.      `OFFSET` => INTERVAL '1' MINUTES));
  14. +------------------+-------+------+------------------+------------------+-------------------------+
  15. |          bidtime | price | item |     window_start |       window_end |            window_time  |
  16. +------------------+-------+------+------------------+------------------+-------------------------+
  17. | 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  18. | 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  19. | 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  20. | 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  21. | 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  22. | 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  23. +------------------+-------+------+------------------+------------------+-------------------------+
  24. -- apply aggregation on the tumbling windowed table
  25. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  26.   FROM TABLE(
  27.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  28.   GROUP BY window_start, window_end;
  29. +------------------+------------------+-------------+
  30. |     window_start |       window_end | total_price |
  31. +------------------+------------------+-------------+
  32. | 2020-04-15 08:01 | 2020-04-15 08:11 |       11.00 |
  33. | 2020-04-15 08:11 | 2020-04-15 08:21 |       10.00 |
  34. +------------------+------------------+-------------+
复制代码
聚合案例
  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. |        name |                   type | null | key | extras |                       watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. |     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
  7. |       price |         DECIMAL(10, 2) | true |     |        |                                 |
  8. |        item |                 STRING | true |     |        |                                 |
  9. | supplier_id |                 STRING | true |     |        |                                 |
  10. +-------------+------------------------+------+-----+--------+---------------------------------+
  11. Flink SQL> SELECT * FROM Bid;
  12. +------------------+-------+------+-------------+
  13. |          bidtime | price | item | supplier_id |
  14. +------------------+-------+------+-------------+
  15. | 2020-04-15 08:05 | 4.00  | C    | supplier1   |
  16. | 2020-04-15 08:07 | 2.00  | A    | supplier1   |
  17. | 2020-04-15 08:09 | 5.00  | D    | supplier2   |
  18. | 2020-04-15 08:11 | 3.00  | B    | supplier2   |
  19. | 2020-04-15 08:13 | 1.00  | E    | supplier1   |
  20. | 2020-04-15 08:17 | 6.00  | F    | supplier2   |
  21. +------------------+-------+------+-------------+
  22. -- tumbling window aggregation
  23. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  24.   FROM TABLE(
  25.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  26.   GROUP BY window_start, window_end;
  27. +------------------+------------------+-------------+
  28. |     window_start |       window_end | total_price |
  29. +------------------+------------------+-------------+
  30. | 2020-04-15 08:00 | 2020-04-15 08:10 |       11.00 |
  31. | 2020-04-15 08:10 | 2020-04-15 08:20 |       10.00 |
  32. +------------------+------------------+-------------+
  33. -- hopping window aggregation
  34. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  35.   FROM TABLE(
  36.     HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  37.   GROUP BY window_start, window_end;
  38. +------------------+------------------+-------------+
  39. |     window_start |       window_end | total_price |
  40. +------------------+------------------+-------------+
  41. | 2020-04-15 08:00 | 2020-04-15 08:10 |       11.00 |
  42. | 2020-04-15 08:05 | 2020-04-15 08:15 |       15.00 |
  43. | 2020-04-15 08:10 | 2020-04-15 08:20 |       10.00 |
  44. | 2020-04-15 08:15 | 2020-04-15 08:25 |        6.00 |
  45. +------------------+------------------+-------------+
  46. -- cumulative window aggregation
  47. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  48.   FROM TABLE(
  49.     CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  50.   GROUP BY window_start, window_end;
  51. +------------------+------------------+-------------+
  52. |     window_start |       window_end | total_price |
  53. +------------------+------------------+-------------+
  54. | 2020-04-15 08:00 | 2020-04-15 08:06 |        4.00 |
  55. | 2020-04-15 08:00 | 2020-04-15 08:08 |        6.00 |
  56. | 2020-04-15 08:00 | 2020-04-15 08:10 |       11.00 |
  57. | 2020-04-15 08:10 | 2020-04-15 08:12 |        3.00 |
  58. | 2020-04-15 08:10 | 2020-04-15 08:14 |        4.00 |
  59. | 2020-04-15 08:10 | 2020-04-15 08:16 |        4.00 |
  60. | 2020-04-15 08:10 | 2020-04-15 08:18 |       10.00 |
  61. | 2020-04-15 08:10 | 2020-04-15 08:20 |       10.00 |
  62. +------------------+------------------+-------------+
  63. -- session window aggregation with partition keys
  64. Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
  65.            FROM TABLE(
  66.                SESSION(TABLE Bid PARTITION BY supplier_id, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES))
  67.            GROUP BY window_start, window_end, supplier_id;
  68. +------------------+------------------+-------------+-------------+
  69. |     window_start |       window_end | supplier_id | total_price |
  70. +------------------+------------------+-------------+-------------+
  71. | 2020-04-15 08:05 | 2020-04-15 08:09 | supplier1   |        6.00 |
  72. | 2020-04-15 08:09 | 2020-04-15 08:13 | supplier2   |        8.00 |
  73. | 2020-04-15 08:13 | 2020-04-15 08:15 | supplier1   |        1.00 |
  74. | 2020-04-15 08:17 | 2020-04-15 08:19 | supplier2   |        6.00 |
  75. +------------------+------------------+-------------+-------------+
  76. -- session window aggregation without partition keys
  77. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  78.            FROM TABLE(
  79.                SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES))
  80.            GROUP BY window_start, window_end;
  81. +------------------+------------------+-------------+
  82. |     window_start |       window_end | total_price |
  83. +------------------+------------------+-------------+
  84. | 2020-04-15 08:05 | 2020-04-15 08:15 |       15.00 |
  85. | 2020-04-15 08:17 | 2020-04-15 08:19 |        6.00 |
  86. +------------------+------------------+-------------+
复制代码
GROUPING SETS
  1. Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
  2.   FROM TABLE(
  3.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  4.   GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
  5. +------------------+------------------+-------------+-------------+
  6. |     window_start |       window_end | supplier_id | total_price |
  7. +------------------+------------------+-------------+-------------+
  8. | 2020-04-15 08:00 | 2020-04-15 08:10 |      (NULL) |       11.00 |
  9. | 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier2 |        5.00 |
  10. | 2020-04-15 08:00 | 2020-04-15 08:10 |   supplier1 |        6.00 |
  11. | 2020-04-15 08:10 | 2020-04-15 08:20 |      (NULL) |       10.00 |
  12. | 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier2 |        9.00 |
  13. | 2020-04-15 08:10 | 2020-04-15 08:20 |   supplier1 |        1.00 |
  14. +------------------+------------------+-------------+-------------+
复制代码
ROLLUP
下面这个查询和上个例子中的结果是一样的。
  1. SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
  2. FROM TABLE(
  3.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  4. GROUP BY window_start, window_end, ROLLUP (supplier_id);
复制代码
CUBE
CUBE 是一种特定通用范例 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。
下面两个查询是等效的。
  1. SELECT window_start, window_end, item, supplier_id, SUM(price) AS total_price
  2.   FROM TABLE(
  3.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  4.   GROUP BY window_start, window_end, CUBE (supplier_id, item);
  5. SELECT window_start, window_end, item, supplier_id, SUM(price) AS total_price
  6.   FROM TABLE(
  7.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  8.   GROUP BY window_start, window_end, GROUPING SETS (
  9.       (supplier_id, item),
  10.       (supplier_id      ),
  11.       (             item),
  12.       (                 )
  13. )
复制代码
多级窗口聚合
   下面展示了一个多级窗口聚合:第一个窗口聚合后把时间属性传递给第二个窗口聚合。 要带上 window_start, window_end,
window_time 事件时间字段
  1. -- tumbling 5 minutes for each supplier_id
  2. CREATE VIEW window1 AS
  3. -- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF.
  4. SELECT window_start AS window_5mintumble_start, window_end AS window_5mintumble_end, window_time AS rowtime, SUM(price) AS partial_price
  5.   FROM TABLE(
  6.     TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  7.   GROUP BY supplier_id, window_start, window_end, window_time;
  8. -- tumbling 10 minutes on the first window
  9. SELECT window_start, window_end, SUM(partial_price) AS total_price
  10.   FROM TABLE(
  11.       TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))
  12.   GROUP BY window_start, window_end;
复制代码
2 分组窗口函数 (flink 窗口函数)

窗口函数sql demo
  1. CREATE TABLE Orders (
  2.   user       BIGINT,
  3.   product    STRING,
  4.   amount     INT,
  5.   order_time TIMESTAMP(3),
  6.   WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE
  7. ) WITH (...);
  8. SELECT
  9.   user,
  10.   TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
  11.   SUM(amount) FROM Orders
  12. GROUP BY
  13.   TUMBLE(order_time, INTERVAL '1' DAY),
  14.   user
复制代码
3 总结区别

   总结区别: 窗口表值函数 : 可用根据选择窗口范例,对原数据基础上增加三列:
“window_start”,“window_end”,“window_time” 用在from 之后,类似子查询一样,共同group by
函数,选择分组字段(“window_start”,“window_end”,“window_time)共同聚合函数,进行一个窗口输出一行
窗口分组函数: 一种淘汰的使用方式,放在group by 之后,进行分组,直接共同聚函数对一个窗口的结果进行输出一行
  两种需要的一个处理事件字段大概事件时间字段,作为窗口函数字段
  4 over聚合

OVER 聚合通过排序后的范围数据为每行输入盘算出聚合值。和 GROUP BY 聚合不同, OVER 聚合不会把结果通太过组减少到一行,它会为每行输入增加一个聚合值。
1 语法demo

  1. 下面这个查询为每个订单计算前一个小时之内接收到的同一产品所有订单的总金额。
  2. SELECT order_id, order_time, amount,
  3.   SUM(amount) OVER (
  4.     PARTITION BY product
  5.     ORDER BY order_time
  6.     RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  7.   ) AS one_hour_prod_amount_sum
  8. FROM Orders
复制代码
下面总结了 OVER 窗口的语法。
  1. SELECT
  2.     agg_func(agg_col) OVER (
  3.     [PARTITION BY col1[, col2, ...]]
  4.     ORDER BY time_col
  5.     range_definition),
  6.   ...
  7. FROM ...
复制代码
2 表值函数一起适用

重点 over聚合与窗口表值行数一起用demo
  1. -- 使用窗口表值函数(这里以滚动窗口为例)和 OVER 聚合
  2. SELECT
  3.     order_time,
  4.     product_id,
  5.     quantity,
  6.     SUM(quantity) OVER (
  7.         PARTITION BY product_id
  8.         ORDER BY order_time
  9.         RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  10.     ) AS rolling_sum_quantity
  11. FROM TABLE(
  12.     TUMBLE(TABLE Orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
  13. );
  14. over()超过一个小时的时间,都是from tumble的窗口时间之内的,
  15. 总结:1 先划分窗口,2再在窗口的时间之内进行over分组聚合。
复制代码
order by

  1. OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc)的 时间属性 上。其他的排序不支持。
复制代码
PARTITION BY

  1. OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。
  2. 范围(RANGE)定义
  3. 范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。
  4. 有两种方法可以定义范围:ROWS 间隔 和 RANGE 间隔
复制代码
RANGE 隔断

RANGE 隔断是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的 RANG 隔断定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。
  1. RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
复制代码
ROW 隔断

  1. ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。
  2. ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
复制代码
2 语法:共同window使用

  1. WINDOW
  2. WINDOW 子句可用于在 SELECT 子句之外定义 OVER 窗口。它让查询可读性更好,也允许多个聚合共用一个窗口定义。
  3. SELECT order_id, order_time, amount,
  4.   SUM(amount) OVER w AS sum_amount,
  5.   AVG(amount) OVER w AS avg_amount
  6. FROM Orders
  7. WINDOW w AS (
  8.   PARTITION BY product
  9.   ORDER BY order_time
  10.   RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
复制代码
5 窗口函数的join

窗口关联就是增加时间维度到关联条件中。在此过程中,窗口关联将两个流中在同一窗口且符合 join 条件的元素 join 起来。
  1. 在流式查询中,与其他连续表上的关联不同,窗口关联不产生中间结果,只在窗口结束产生一个最终的结果。另外,窗口关联会清除不需要的中间状态 。
复制代码
通常,窗口关联和 窗口表值函数 一起使用。而且,窗口关联可以在其他基于窗口表值函数 的利用后使用,例如 窗口聚合,窗口 Top-N 和 窗口关联。
   目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件。
  语法:
  1. SELECT ...
  2. FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
  3. ON L.window_start = R.window_start
  4. AND L.window_end = R.window_end AND ...
  5. 1 INNER/LEFT/RIGHT/FULL OUTER
  6. Flink SQL> desc LeftTable;
  7. +----------+------------------------+------+-----+--------+----------------------------------+
  8. |     name |                   type | null | key | extras |                        watermark |
  9. +----------+------------------------+------+-----+--------+----------------------------------+
  10. | row_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `row_time` - INTERVAL '1' SECOND |
  11. |      num |                    INT | true |     |        |                                  |
  12. |       id |                 STRING | true |     |        |                                  |
  13. +----------+------------------------+------+-----+--------+----------------------------------+
  14. Flink SQL> SELECT * FROM LeftTable;
  15. +------------------+-----+----+
  16. |         row_time | num | id |
  17. +------------------+-----+----+
  18. | 2020-04-15 12:02 |   1 | L1 |
  19. | 2020-04-15 12:06 |   2 | L2 |
  20. | 2020-04-15 12:03 |   3 | L3 |
  21. +------------------+-----+----+
  22. Flink SQL> desc RightTable;
  23. +----------+------------------------+------+-----+--------+----------------------------------+
  24. |     name |                   type | null | key | extras |                        watermark |
  25. +----------+------------------------+------+-----+--------+----------------------------------+
  26. | row_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `row_time` - INTERVAL '1' SECOND |
  27. |      num |                    INT | true |     |        |                                  |
  28. |       id |                 STRING | true |     |        |                                  |
  29. +----------+------------------------+------+-----+--------+----------------------------------+
  30. Flink SQL> SELECT * FROM RightTable;
  31. +------------------+-----+----+
  32. |         row_time | num | id |
  33. +------------------+-----+----+
  34. | 2020-04-15 12:01 |   2 | R2 |
  35. | 2020-04-15 12:04 |   3 | R3 |
  36. | 2020-04-15 12:05 |   4 | R4 |
  37. +------------------+-----+----+
  38. Flink SQL> SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id,
  39.            COALESCE(L.window_start, R.window_start) as window_start,
  40.            COALESCE(L.window_end, R.window_end) as window_end
  41.            FROM (
  42.                SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  43.            ) L
  44.            FULL JOIN (
  45.                SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  46.            ) R
  47.            ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
  48. +-------+------+-------+------+------------------+------------------+
  49. | L_Num | L_Id | R_Num | R_Id |     window_start |       window_end |
  50. +-------+------+-------+------+------------------+------------------+
  51. |     1 |   L1 |  null | null | 2020-04-15 12:00 | 2020-04-15 12:05 |
  52. |  null | null |     2 |   R2 | 2020-04-15 12:00 | 2020-04-15 12:05 |
  53. |     3 |   L3 |     3 |   R3 | 2020-04-15 12:00 | 2020-04-15 12:05 |
  54. |     2 |   L2 |  null | null | 2020-04-15 12:05 | 2020-04-15 12:10 |
  55. |  null | null |     4 |   R4 | 2020-04-15 12:05 | 2020-04-15 12:10 |
  56. +-------+------+-------+------+------------------+------------------+
复制代码
  1. 2 SEMI
  2. 如果在同一个窗口中,左侧记录在右侧至少有一个匹配的记录时,半窗口连接(Semi Window Join)就会输出左侧的记录。
  3. Flink SQL> SELECT *
  4.            FROM (
  5.                SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  6.            ) L WHERE L.num IN (
  7.              SELECT num FROM (   
  8.                SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  9.              ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
  10. +------------------+-----+----+------------------+------------------+-------------------------+
  11. |         row_time | num | id |     window_start |       window_end |            window_time  |
  12. +------------------+-----+----+------------------+------------------+-------------------------+
  13. | 2020-04-15 12:03 |   3 | L3 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
  14. +------------------+-----+----+------------------+------------------+-------------------------+
  15. Flink SQL> SELECT *
  16.            FROM (
  17.                SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  18.            ) L WHERE EXISTS (
  19.              SELECT * FROM (
  20.                SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  21.              ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
  22. +------------------+-----+----+------------------+------------------+-------------------------+
  23. |         row_time | num | id |     window_start |       window_end |            window_time  |
  24. +------------------+-----+----+------------------+------------------+-------------------------+
  25. | 2020-04-15 12:03 |   3 | L3 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
复制代码
  1. 3 ANTI
  2. 反窗口连接(Anti Window Join)是内窗口连接(Inner Window Join)的相反操作:它包含了每个公共窗口内所有未关联上的行。
  3. Flink SQL> SELECT *
  4.            FROM (
  5.                SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  6.            ) L WHERE L.num NOT IN (
  7.              SELECT num FROM (   
  8.                SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  9.              ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
  10. +------------------+-----+----+------------------+------------------+-------------------------+
  11. |         row_time | num | id |     window_start |       window_end |            window_time  |
  12. +------------------+-----+----+------------------+------------------+-------------------------+
  13. | 2020-04-15 12:02 |   1 | L1 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
  14. | 2020-04-15 12:06 |   2 | L2 | 2020-04-15 12:05 | 2020-04-15 12:10 | 2020-04-15 12:09:59.999 |
  15. +------------------+-----+----+------------------+------------------+-------------------------+
  16. Flink SQL> SELECT *
  17.            FROM (
  18.                SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  19.            ) L WHERE NOT EXISTS (
  20.              SELECT * FROM (
  21.                SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
  22.              ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
  23. +------------------+-----+----+------------------+------------------+-------------------------+
  24. |         row_time | num | id |     window_start |       window_end |            window_time  |
  25. +------------------+-----+----+------------------+------------------+-------------------------+
  26. | 2020-04-15 12:02 |   1 | L1 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
  27. | 2020-04-15 12:06 |   2 | L2 | 2020-04-15 12:05 | 2020-04-15 12:10 | 2020-04-15 12:09:59.999 |
  28. +------------------+-----+----+------------------+------------------+-------------------------+
复制代码
  限定 Join 子句的限定 目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和
window_end 等值条件。将来,假如是滚动或滑动窗口,只需要在 join on 条件中包含窗口开始相称即可。 输入的窗口表值函数的限定
目前,关联的左右两边必须使用雷同的窗口表值函数。这个规则在将来可以扩展,比如:滚动和滑动窗口在窗口大小雷同的环境下 join。
窗口表值函数之后直接使用窗口关联的限定 目前窗口关联支持作用在滚动(TUMBLE)、滑动(HOP)和累积(CUMULATE)窗口表值函数
之上,但是还不支持会话窗口(SESSION)。
  6 表值窗口函数应用场景总结:

   一、滚动窗口(TUMBLE) 实用需求:
  

  • 当你需要将数据划分成固定大小且不重叠的窗口,对每个窗口内的数据进行独立的聚合利用时,通常使用滚动窗口。例如,每小时、每天或每 10 分钟的统计分析,如盘算每小时的订单量、每 10 分钟的网站访问量等。 二、滑动窗口(SLIDE)hot 实用需求:
  • 当你需要窗口具有一定的重叠部门,以便更频仍地盘算和更新聚合结果时,使用滑动窗口。例如,实时监控体系中,需要每 5 分钟更新已往 15 分钟内的指标,大概在数据分析中需要查看不同重叠时间范围内的数据趋势。 三、会话窗口(SESSION) 实用需求:
  • 当你需要根据数据的时间隔断将数据划分为会话,对每个会话内的数据进行处理时使用。例如,分析用户在一段时间内的连续利用,当用户利用的时间隔断凌驾一定阈值(会话隔断),则以为是新的会话开始。常用于分析用户的举动序列,如在线购物会话、游戏会话等。
    四、累积窗口(CUMULATE) 实用需求:
  • 当你需要观察数据在一段时间内的累积效应,并在不同时间隔断查看累积结果时使用。例如,观察数据在不同阶段的累积和,像累积贩卖额、累积用户利用次数等,同时可以设置累积的步长和最大范围,提供数据累积的阶段性结果。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表