eventTime
测试数据如下:
- {"username":"zs","price":20,"event_time":"2023-07-17 10:10:10"}
- {"username":"zs","price":15,"event_time":"2023-07-17 10:10:30"}
- {"username":"zs","price":20,"event_time":"2023-07-17 10:10:40"}
- {"username":"zs","price":20,"event_time":"2023-07-17 10:11:03"}
- {"username":"zs","price":20,"event_time":"2023-07-17 10:11:04"}
- {"username":"zs","price":20,"event_time":"2023-07-17 10:12:04"}
- {"username":"zs","price":20,"event_time":"2023-07-17 11:12:04"}
- {"username":"zs","price":20,"event_time":"2023-07-17 11:12:04"}
- {"username":"zs","price":20,"event_time":"2023-07-17 12:12:04"}
- {"username":"zs","price":20,"event_time":"2023-07-18 12:12:04"}
复制代码 需求:每隔1分钟统计这1分钟的每个用户的总消费金额和消费次数
需要用到滚动窗口
编写好sql:
- CREATE TABLE table1 (
- `username` string,
- `price` int,
- `event_time` TIMESTAMP(3),
- watermark for event_time as event_time - interval '3' second
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'topic1',
- 'properties.bootstrap.servers' = 'bigdata01:9092',
- 'properties.group.id' = 'g1',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'json'
- );
- 编写sql:
- select
- window_start,
- window_end,
- username,
- count(1) zongNum,
- sum(price) totalMoney
- from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))
- group by window_start,window_end,username;
复制代码 分享一个错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval) requires the timecol is a time attribute type, but is VARCHAR(2147483647).
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
说明创建窗口的时候,使用的字段不是时间字段,需要写成时间字段TIMESTAMP(3),使用了eventtime需要添加水印,否则报错。
需求:按照滚动窗口和EventTime进行统计,每隔1分钟统计每个人的消费总额是多少
- package com.bigdata.day08;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- /**
- * @基本功能:
- * @program:FlinkDemo
- * @author: 闫哥
- * @create:2023-11-28 14:12:28
- **/
- public class _03EventTimeGunDongWindowDemo {
- public static void main(String[] args) throws Exception {
- //1. env-准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
- //2. 创建表
- tenv.executeSql("CREATE TABLE table1 (\n" +
- " `username` String,\n" +
- " `price` int,\n" +
- " `event_time` TIMESTAMP(3),\n" +
- " watermark for event_time as event_time - interval '3' second\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topic1',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'testGroup1',\n" +
- " 'scan.startup.mode' = 'group-offsets',\n" +
- " 'format' = 'json'\n" +
- ")");
- //3. 通过sql语句统计结果
- tenv.executeSql("select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " username,\n" +
- " count(1) zongNum,\n" +
- " sum(price) totalMoney \n" +
- " from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))\n" +
- "group by window_start,window_end,username").print();
- //4. sink-数据输出
- //5. execute-执行
- env.execute();
- }
- }
复制代码 统计效果如下:
测试一下滑动窗口,每隔10秒钟,盘算前1分钟的数据:
- package com.bigdata.day08;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- /**
- * @基本功能:
- * @program:FlinkDemo
- * @author: 闫哥
- * @create:2023-11-28 14:12:28
- **/
- public class _03EventTimeGunDongWindowDemo {
- public static void main(String[] args) throws Exception {
- //1. env-准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
- //2. 创建表
- tenv.executeSql("CREATE TABLE table1 (\n" +
- " `username` String,\n" +
- " `price` int,\n" +
- " `event_time` TIMESTAMP(3),\n" +
- " watermark for event_time as event_time - interval '3' second\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topic1',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'testGroup1',\n" +
- " 'scan.startup.mode' = 'group-offsets',\n" +
- " 'format' = 'json'\n" +
- ")");
- //3. 通过sql语句统计结果
- tenv.executeSql("select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " username,\n" +
- " count(1) zongNum,\n" +
- " sum(price) totalMoney \n" +
- " from table(HOP(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second,INTERVAL '60' second))\n" +
- "group by window_start,window_end,username").print();
- //4. sink-数据输出
- //5. execute-执行
- env.execute();
- }
- }
复制代码 效果如图所示:
- package com.bigdata.day08;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- /**
- * @基本功能:
- * @program:FlinkDemo
- * @author: 闫哥
- * @create:2023-11-28 14:12:28
- **/
- public class _03EventTimeGunDongWindowDemo {
- public static void main(String[] args) throws Exception {
- //1. env-准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
- //2. 创建表
- tenv.executeSql("CREATE TABLE table1 (\n" +
- " `username` String,\n" +
- " `price` int,\n" +
- " `event_time` TIMESTAMP(3),\n" +
- " watermark for event_time as event_time - interval '3' second\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topic1',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'testGroup1',\n" +
- " 'scan.startup.mode' = 'group-offsets',\n" +
- " 'format' = 'json'\n" +
- ")");
- //3. 通过sql语句统计结果
- tenv.executeSql("select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " username,\n" +
- " count(1) zongNum,\n" +
- " sum(price) totalMoney \n" +
- " from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '1' hours,INTERVAL '1' days))\n" +
- "group by window_start,window_end,username").print();
- //4. sink-数据输出
- //5. execute-执行
- env.execute();
- }
- }
复制代码 累积窗口演示效果:
processTime
测试数据:
- {"username":"zs","price":20}
- {"username":"lisi","price":15}
- {"username":"lisi","price":20}
- {"username":"zs","price":20}
- {"username":"zs","price":20}
- {"username":"zs","price":20}
- {"username":"zs","price":20}
复制代码- /**
- * 滚动窗口大小1分钟 延迟时间3秒
- *
- * {"username":"zs","price":20}
- * {"username":"lisi","price":15}
- * {"username":"lisi","price":20}
- * {"username":"zs","price":20}
- * {"username":"zs","price":20}
- * {"username":"zs","price":20}
- * {"username":"zs","price":20}
- *
- */
- package com.bigdata.day08;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- /**
- * @基本功能:
- * @program:FlinkDemo
- * @author: 闫哥
- * @create:2023-11-28 14:12:28
- **/
- public class _04ProcessingTimeGunDongWindowDemo {
- public static void main(String[] args) throws Exception {
- //1. env-准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
- //2. 创建表
- tenv.executeSql("CREATE TABLE table1 (\n" +
- " `username` String,\n" +
- " `price` int,\n" +
- " `event_time` as proctime()\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topic1',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'testGroup1',\n" +
- " 'scan.startup.mode' = 'group-offsets',\n" +
- " 'format' = 'json'\n" +
- ")");
- //3. 通过sql语句统计结果
- tenv.executeSql("select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " username,\n" +
- " count(1) zongNum,\n" +
- " sum(price) totalMoney \n" +
- " from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second ))\n" +
- "group by window_start,window_end,username").print();
- //4. sink-数据输出
- //5. execute-执行
- env.execute();
- }
- }
复制代码 盘算效果:
效果需要等1分钟,才能表现出来,不要着急!
窗口分为滚动和滑动,时间分为事件时间和处置惩罚时间,两两组合,4个案例。
以下是滑动窗口+处置惩罚时间:
- package com.bigdata.sql;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- /**
- * @基本功能:
- * @program:FlinkDemo
- * @author: 闫哥
- * @create:2024-11-29 14:28:19
- **/
- public class _04_FlinkSQLProcessTime_HOP {
- public static void main(String[] args) throws Exception {
- //1. env-准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
- // 获取tableEnv对象
- // 通过env 获取一个table 环境
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql("CREATE TABLE table1 (\n" +
- " `username` string,\n" +
- " `price` int,\n" +
- " `event_time` as proctime() \n"+
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topic1',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'g1',\n" +
- " 'scan.startup.mode' = 'latest-offset',\n" +
- " 'format' = 'json'\n" +
- ")");
- // 语句中的 ; 不能添加
- tEnv.executeSql("select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " username,\n" +
- " count(1) zongNum,\n" +
- " sum(price) totalMoney \n" +
- " from table(HOP(TABLE table1, DESCRIPTOR(event_time),INTERVAL '10' second, INTERVAL '60' second))\n" +
- "group by window_start,window_end,username").print();
- //5. execute-执行
- env.execute();
- }
- }
复制代码
测试时假如你的控制台不出数据,触发不了,请进入如下操作:
1、重新创建一个新的 topic,分区数为 1
2、kafka 对接的 server,写全 bigdata01:9092,bigdata02:9092,bigdata03:9092
二、窗口TopN(不是新的技术)
需求:在每个小时内找出点击量最多的Top 3网页。
- 测试数据
- {"ts": "2023-09-05 12:00:00", "page_id": 1, "clicks": 100}
- {"ts": "2023-09-05 12:01:00", "page_id": 2, "clicks": 90}
- {"ts": "2023-09-05 12:10:00", "page_id": 3, "clicks": 110}
- {"ts": "2023-09-05 12:20:00", "page_id": 4, "clicks": 23}
- {"ts": "2023-09-05 12:30:00", "page_id": 5, "clicks": 456}
- {"ts": "2023-09-05 13:10:00", "page_id": 5, "clicks": 456}
复制代码- 假如没有每隔1小时的需求,仅仅是统计点击量最多的Top 3网页,结果如下
- select * from (
- select
- page_id,
- totalSum,
- row_number() over (order by totalSum desc) px
- from (
- select page_id,
- sum(clicks) totalSum
- from kafka_page_clicks group by page_id ) ) where px <=3;
复制代码 根据以上代码,添加滚动窗口的写法:
- select
- window_start,
- window_end,
- page_id,
- sum(clicks) totalSum
- from
- table (
- tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR )
- )
- group by window_start,window_end,page_id;
- 在这个基础之上添加排名的写法:
- select
- window_start,
- window_end,
- page_id,
- pm
- from (
- select
- window_start,
- window_end,
- page_id,
- row_number() over(partition by window_start,window_end order by totalSum desc ) pm
- from (
- select
- window_start,
- window_end,
- page_id,
- sum(clicks) totalSum
- from
- table (
- tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR )
- )
- group by window_start,window_end,page_id ) t2 ) t1 where pm <= 3;
复制代码 编写建表语句:
- {"ts": "2023-09-05 12:00:00", "page_id": 1, "clicks": 100}
- CREATE TABLE kafka_page_clicks (
- `ts` TIMESTAMP(3),
- `page_id` int,
- `clicks` int,
- watermark for ts as ts - interval '3' second
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'topic1',
- 'properties.bootstrap.servers' = 'bigdata01:9092',
- 'properties.group.id' = 'g1',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'json'
- )
复制代码- package com.bigdata.day08;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- /**
- * @基本功能:
- * @program:FlinkDemo
- * @author: 闫哥
- * @create:2023-11-28 15:23:46
- **/
- public class _05TopNDemo {
- public static void main(String[] args) throws Exception {
- //1. env-准备环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // ctrl + y 删除光标所在的那一行数据 ctrl + d 复制当前行
- StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
- //2. source-加载数据
- // 一定要注意:ts 是一个年月日时分秒的数据,所以在建表时一定要是TIMESTAMP,否则进行WATERMARK 报错
- // 因为使用的是event_time 所以,需要指定WATERMARK
- tenv.executeSql("CREATE TABLE kafka_page_clicks (" +
- " `ts` TIMESTAMP(3),\n" +
- " page_id INT,\n" +
- " clicks INT,\n" +
- " WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topic1',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'scan.startup.mode' = 'group-offsets',\n" +
- " 'format' = 'json'\n" +
- ")");
- tenv.executeSql("select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " page_id,\n" +
- " pm\n" +
- " from (\n" +
- "select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " page_id,\n" +
- " row_number() over(partition by window_start,window_end order by totalSum desc ) pm\n" +
- " from (\n" +
- "select \n" +
- " window_start,\n" +
- " window_end,\n" +
- " page_id,\n" +
- " sum(clicks) totalSum \n" +
- " from \n" +
- " table ( \n" +
- " tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR ) \n" +
- " ) \n" +
- " group by window_start,window_end,page_id ) t2 ) t1 where pm <= 3").print();
- //4. sink-数据输出
- //5. execute-执行
- env.execute();
- }
- }
复制代码 最后的运行效果如下:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |