ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据Flink(一百一十六):Flink SQL的时间属性 [打印本页]

作者: 圆咕噜咕噜    时间: 2024-9-13 00:34
标题: 大数据Flink(一百一十六):Flink SQL的时间属性

文章目录
Flink SQL的时间属性
一、Flink 三种时间属性简介
二、Flink 三种时间属性的应用场景
三、​​​​​​​SQL 指定时间属性的两种方式
四、​​​​​​​​​​​​​​SQL 处理时间DDL界说
五、​​​​​​​​​​​​​​SQL 事件时间DDL界说


Flink SQL的时间属性

先看一下本文团体的思路:

 
一、Flink 三种时间属性简介



要注意到:


二、Flink 三种时间属性的应用场景

以上三种时间属性到底对我们的使命有啥影响呢?三种时间属性的应用场景是啥?
先说结论,在 Flink 中时间的作用:

以 滚动窗口 的聚合使命为例来介绍一下事件时间和处理时间的对比区别。


上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后盘算出 count 聚合结果(如许盘算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)。
上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)
后续 Flink SQL 使命在运行的过程中也会现实按照 cTime 的当前时间作为一小时窗口结束触发条件并盘算一个小时窗口内的数据。

还是上面谁人案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并盘算。
那么这种触发机制就是处理时间。

 
三、​​​​​​​​​​​​​​SQL 指定时间属性的两种方式

假如要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。
那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

一旦时间属性界说好,它就可以像普通列一样利用,也可以在时间相关的操作中利用。
 
四、​​​​​​​​​​​​​​SQL 处理时间DDL界说

处理时间语义下,利用当前机器的体系时间作为处理时间。它是时间的最简朴概念。它既不需要提取时间戳,也不需要生成watermark
来看看 Flink SQL 中怎样指定处理时间。

  1. CREATE TABLE user_actions (
  2.   user_name STRING,
  3.   data STRING,
  4.   -- 使用下面这句来将 user_action_time 声明为处理时间
  5.   user_action_time AS PROCTIME()
  6. ) WITH (
  7.   ...
  8. );
复制代码

点击Flink开辟平台左侧资源管理,点击上传资源,将资料中的order.csv文件进行上传。
可以点击复制按钮复制其在oss的路径。
可以在oss对应路径看到此文件

读取order.csv'文件的数据,在原本的Schema上添加一个假造的时间戳列,时间戳列由PROCTIME()函数盘算产生。建表语句如下(path背面路径需要修改) 
  1. create table proctime_ddl_table (
  2. `userid` varchar,
  3. `timestamp` bigint,
  4. `money` double,
  5. `category` varchar,
  6. `pt` AS PROCTIME()
  7. ) with (
  8. 'connector' = 'filesystem',
  9. 'path' = 'oss://lanson-bucket/artifacts/namespaces/lanson-workspace-default/order.csv',
  10. 'format' = 'csv'
  11. );
复制代码
查询表数据(调试) 
  1. select * from proctime_ddl_table;
复制代码
查询结果如下

点击左侧导航栏元数据管理,查看表信息。 


五、​​​​​​​​​​​​​​SQL 事件时间DDL界说

来看看 Flink 中怎样指定事件时间。
Event Time时间语义利用一条数据现实发生的时间作为时间属性,在Table API & SQL中这个字段通常被称为rowtime。这种模式下多次重复盘算时,盘算结果是确定的。这意味着,Event Time时间语义可以保证流处理和批处理的同一。
Event Time时间语义下,我们需要设置每条数据发生时的时间戳,并提供一个Watermark。Watermark表示迟于该时间的数据都作为迟到数据对待。

  1. CREATE TABLE user_actions (
  2.   user_name STRING,
  3.   data STRING,
  4.   user_action_time TIMESTAMP(3),
  5.   -- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  6.   -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  7.   WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
  8. ) WITH (
  9.   ...
  10. );
复制代码
在上面的DDL中,WATERMARK起到了界说Event Time时间属性的作用,在这里临时不教学,watermark知识点后续会讲到

假如想利用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。
但是现实应用中时间戳一般都是秒或者是毫秒(BIGINT 类型),那这种环境怎么办?
解决方案如下
  1. CREATE TABLE user_actions (
  2.   user_name STRING,
  3.   data STRING,
  4.   -- 1. 这个 ts 就是常见的毫秒级别时间戳
  5.   ts BIGINT,
  6.   -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
  7.   time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  8.   -- 3. 使用下面这句来将 time_ltz 声明为事件时间,并且声明 watermark 的生成规则,即 time_ltz 减 5 秒
  9.   -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  10.   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
  11. ) WITH (
  12.   ...
  13. );
复制代码

读取order.csv'文件的数据,界说现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性
建表语句如下
  1. create table eventime_ddl_table (
  2. `userid` varchar,
  3. `timestamp` bigint,
  4. `money` double,
  5. `category` varchar,
  6. rt AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
  7. watermark for rt as rt - interval '1' second
  8. ) with (
  9. 'connector' = 'filesystem',
  10. 'path' = 'oss://bucketnanfeng/artifacts/namespaces/lanson-flinkworkspace1-default/order.csv',
  11. 'format' = 'csv'
  12. );
复制代码
查询表数据(调试)
  1. select * from eventime_ddl_table;
复制代码
查询结果如下

点击左侧导航栏元数据管理,查看表信息。