Flink学习连载文章13--FlinkSQL高级部门

打印 上一主题 下一主题

主题 829|帖子 829|积分 2487

eventTime

测试数据如下:
  1. {"username":"zs","price":20,"event_time":"2023-07-17 10:10:10"}
  2. {"username":"zs","price":15,"event_time":"2023-07-17 10:10:30"}
  3. {"username":"zs","price":20,"event_time":"2023-07-17 10:10:40"}
  4. {"username":"zs","price":20,"event_time":"2023-07-17 10:11:03"}
  5. {"username":"zs","price":20,"event_time":"2023-07-17 10:11:04"}
  6. {"username":"zs","price":20,"event_time":"2023-07-17 10:12:04"}
  7. {"username":"zs","price":20,"event_time":"2023-07-17 11:12:04"}
  8. {"username":"zs","price":20,"event_time":"2023-07-17 11:12:04"}
  9. {"username":"zs","price":20,"event_time":"2023-07-17 12:12:04"}
  10. {"username":"zs","price":20,"event_time":"2023-07-18 12:12:04"}
复制代码
需求:每隔1分钟统计这1分钟的每个用户的总消费金额和消费次数
需要用到滚动窗口

编写好sql:
  1. CREATE TABLE table1 (
  2.   `username` string,
  3.   `price` int,
  4.   `event_time` TIMESTAMP(3),
  5.   watermark for event_time as event_time - interval '3' second
  6. ) WITH (
  7.   'connector' = 'kafka',
  8.   'topic' = 'topic1',
  9.   'properties.bootstrap.servers' = 'bigdata01:9092',
  10.   'properties.group.id' = 'g1',
  11.   'scan.startup.mode' = 'latest-offset',
  12.   'format' = 'json'
  13. );
  14. 编写sql:
  15. select
  16.    window_start,
  17.    window_end,
  18.    username,
  19.    count(1) zongNum,
  20.    sum(price) totalMoney
  21.    from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))
  22. group by window_start,window_end,username;
复制代码
分享一个错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval) requires the timecol is a time attribute type, but is VARCHAR(2147483647).
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)

说明创建窗口的时候,使用的字段不是时间字段,需要写成时间字段TIMESTAMP(3),使用了eventtime需要添加水印,否则报错。
需求:按照滚动窗口和EventTime进行统计,每隔1分钟统计每个人的消费总额是多少
  1. package com.bigdata.day08;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. /**
  6. * @基本功能:
  7. * @program:FlinkDemo
  8. * @author: 闫哥
  9. * @create:2023-11-28 14:12:28
  10. **/
  11. public class _03EventTimeGunDongWindowDemo {
  12.     public static void main(String[] args) throws Exception {
  13.         //1. env-准备环境
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.setParallelism(1);
  16.         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
  17.         //2. 创建表
  18.         tenv.executeSql("CREATE TABLE table1 (\n" +
  19.                         "  `username` String,\n" +
  20.                         "  `price` int,\n" +
  21.                         "  `event_time` TIMESTAMP(3),\n" +
  22.                         "   watermark for event_time as event_time - interval '3' second\n" +
  23.                         ") WITH (\n" +
  24.                         "  'connector' = 'kafka',\n" +
  25.                         "  'topic' = 'topic1',\n" +
  26.                         "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  27.                         "  'properties.group.id' = 'testGroup1',\n" +
  28.                         "  'scan.startup.mode' = 'group-offsets',\n" +
  29.                         "  'format' = 'json'\n" +
  30.                         ")");
  31.         //3. 通过sql语句统计结果
  32.         tenv.executeSql("select \n" +
  33.                         "   window_start,\n" +
  34.                         "   window_end,\n" +
  35.                         "   username,\n" +
  36.                         "   count(1) zongNum,\n" +
  37.                         "   sum(price) totalMoney \n" +
  38.                         "   from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second))\n" +
  39.                         "group by window_start,window_end,username").print();
  40.         //4. sink-数据输出
  41.         //5. execute-执行
  42.         env.execute();
  43.     }
  44. }
复制代码
统计效果如下:


测试一下滑动窗口,每隔10秒钟,盘算前1分钟的数据:
  1. package com.bigdata.day08;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. /**
  6. * @基本功能:
  7. * @program:FlinkDemo
  8. * @author: 闫哥
  9. * @create:2023-11-28 14:12:28
  10. **/
  11. public class _03EventTimeGunDongWindowDemo {
  12.     public static void main(String[] args) throws Exception {
  13.         //1. env-准备环境
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.setParallelism(1);
  16.         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
  17.         //2. 创建表
  18.         tenv.executeSql("CREATE TABLE table1 (\n" +
  19.                 "  `username` String,\n" +
  20.                 "  `price` int,\n" +
  21.                 "  `event_time` TIMESTAMP(3),\n" +
  22.                 "   watermark for event_time as event_time - interval '3' second\n" +
  23.                 ") WITH (\n" +
  24.                 "  'connector' = 'kafka',\n" +
  25.                 "  'topic' = 'topic1',\n" +
  26.                 "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  27.                 "  'properties.group.id' = 'testGroup1',\n" +
  28.                 "  'scan.startup.mode' = 'group-offsets',\n" +
  29.                 "  'format' = 'json'\n" +
  30.                 ")");
  31.         //3. 通过sql语句统计结果
  32.         tenv.executeSql("select \n" +
  33.                 "   window_start,\n" +
  34.                 "   window_end,\n" +
  35.                 "   username,\n" +
  36.                 "   count(1) zongNum,\n" +
  37.                 "   sum(price) totalMoney \n" +
  38.                 "   from table(HOP(TABLE table1, DESCRIPTOR(event_time), INTERVAL '10' second,INTERVAL '60' second))\n" +
  39.                 "group by window_start,window_end,username").print();
  40.         //4. sink-数据输出
  41.         //5. execute-执行
  42.         env.execute();
  43.     }
  44. }
复制代码
效果如图所示:




  1. package com.bigdata.day08;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. /**
  6. * @基本功能:
  7. * @program:FlinkDemo
  8. * @author: 闫哥
  9. * @create:2023-11-28 14:12:28
  10. **/
  11. public class _03EventTimeGunDongWindowDemo {
  12.     public static void main(String[] args) throws Exception {
  13.         //1. env-准备环境
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.setParallelism(1);
  16.         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
  17.         //2. 创建表
  18.         tenv.executeSql("CREATE TABLE table1 (\n" +
  19.                 "  `username` String,\n" +
  20.                 "  `price` int,\n" +
  21.                 "  `event_time` TIMESTAMP(3),\n" +
  22.                 "   watermark for event_time as event_time - interval '3' second\n" +
  23.                 ") WITH (\n" +
  24.                 "  'connector' = 'kafka',\n" +
  25.                 "  'topic' = 'topic1',\n" +
  26.                 "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  27.                 "  'properties.group.id' = 'testGroup1',\n" +
  28.                 "  'scan.startup.mode' = 'group-offsets',\n" +
  29.                 "  'format' = 'json'\n" +
  30.                 ")");
  31.         //3. 通过sql语句统计结果
  32.         tenv.executeSql("select \n" +
  33.                 "   window_start,\n" +
  34.                 "   window_end,\n" +
  35.                 "   username,\n" +
  36.                 "   count(1) zongNum,\n" +
  37.                 "   sum(price) totalMoney \n" +
  38.                 "   from table(CUMULATE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '1' hours,INTERVAL '1' days))\n" +
  39.                 "group by window_start,window_end,username").print();
  40.         //4. sink-数据输出
  41.         //5. execute-执行
  42.         env.execute();
  43.     }
  44. }
复制代码
累积窗口演示效果:


processTime

测试数据:
  1. {"username":"zs","price":20}
  2. {"username":"lisi","price":15}
  3. {"username":"lisi","price":20}
  4. {"username":"zs","price":20}
  5. {"username":"zs","price":20}
  6. {"username":"zs","price":20}
  7. {"username":"zs","price":20}
复制代码
  1. /**
  2. * 滚动窗口大小1分钟 延迟时间3秒
  3. *
  4. * {"username":"zs","price":20}
  5. * {"username":"lisi","price":15}
  6. * {"username":"lisi","price":20}
  7. * {"username":"zs","price":20}
  8. * {"username":"zs","price":20}
  9. * {"username":"zs","price":20}
  10. * {"username":"zs","price":20}
  11. *
  12. */
  13. package com.bigdata.day08;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  16. /**
  17. * @基本功能:
  18. * @program:FlinkDemo
  19. * @author: 闫哥
  20. * @create:2023-11-28 14:12:28
  21. **/
  22. public class _04ProcessingTimeGunDongWindowDemo {
  23.     public static void main(String[] args) throws Exception {
  24.         //1. env-准备环境
  25.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  26.         env.setParallelism(1);
  27.         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
  28.         //2. 创建表
  29.         tenv.executeSql("CREATE TABLE table1 (\n" +
  30.                 "  `username` String,\n" +
  31.                 "  `price` int,\n" +
  32.                 "  `event_time` as proctime()\n" +
  33.                 ") WITH (\n" +
  34.                 "  'connector' = 'kafka',\n" +
  35.                 "  'topic' = 'topic1',\n" +
  36.                 "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  37.                 "  'properties.group.id' = 'testGroup1',\n" +
  38.                 "  'scan.startup.mode' = 'group-offsets',\n" +
  39.                 "  'format' = 'json'\n" +
  40.                 ")");
  41.         //3. 通过sql语句统计结果
  42.         tenv.executeSql("select \n" +
  43.                 "   window_start,\n" +
  44.                 "   window_end,\n" +
  45.                 "   username,\n" +
  46.                 "   count(1) zongNum,\n" +
  47.                 "   sum(price) totalMoney \n" +
  48.                 "   from table(TUMBLE(TABLE table1, DESCRIPTOR(event_time), INTERVAL '60' second ))\n" +
  49.                 "group by window_start,window_end,username").print();
  50.         //4. sink-数据输出
  51.         //5. execute-执行
  52.         env.execute();
  53.     }
  54. }
复制代码
盘算效果:


效果需要等1分钟,才能表现出来,不要着急!
窗口分为滚动和滑动,时间分为事件时间和处置惩罚时间,两两组合,4个案例。
以下是滑动窗口+处置惩罚时间:
  1. package com.bigdata.sql;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. /**
  6. * @基本功能:
  7. * @program:FlinkDemo
  8. * @author: 闫哥
  9. * @create:2024-11-29 14:28:19
  10. **/
  11. public class _04_FlinkSQLProcessTime_HOP {
  12.     public static void main(String[] args) throws Exception {
  13.         //1. env-准备环境
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  16.         // 获取tableEnv对象
  17.         // 通过env 获取一个table 环境
  18.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  19.         tEnv.executeSql("CREATE TABLE table1 (\n" +
  20.                         "  `username` string,\n" +
  21.                         "  `price` int,\n" +
  22.                         "  `event_time` as proctime() \n"+
  23.                         ") WITH (\n" +
  24.                         "  'connector' = 'kafka',\n" +
  25.                         "  'topic' = 'topic1',\n" +
  26.                         "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  27.                         "  'properties.group.id' = 'g1',\n" +
  28.                         "  'scan.startup.mode' = 'latest-offset',\n" +
  29.                         "  'format' = 'json'\n" +
  30.                         ")");
  31.         // 语句中的 ; 不能添加
  32.         tEnv.executeSql("select \n" +
  33.                         "   window_start,\n" +
  34.                         "   window_end,\n" +
  35.                         "   username,\n" +
  36.                         "   count(1) zongNum,\n" +
  37.                         "   sum(price) totalMoney \n" +
  38.                         "   from table(HOP(TABLE table1, DESCRIPTOR(event_time),INTERVAL '10' second, INTERVAL '60' second))\n" +
  39.                         "group by window_start,window_end,username").print();
  40.         //5. execute-执行
  41.         env.execute();
  42.     }
  43. }
复制代码


测试时假如你的控制台不出数据,触发不了,请进入如下操作:
1、重新创建一个新的 topic,分区数为 1
2、kafka 对接的 server,写全 bigdata01:9092,bigdata02:9092,bigdata03:9092
二、窗口TopN(不是新的技术)

需求:在每个小时内找出点击量最多的Top 3网页。
  1. 测试数据
  2. {"ts": "2023-09-05 12:00:00", "page_id": 1, "clicks": 100}
  3. {"ts": "2023-09-05 12:01:00", "page_id": 2, "clicks": 90}
  4. {"ts": "2023-09-05 12:10:00", "page_id": 3, "clicks": 110}
  5. {"ts": "2023-09-05 12:20:00", "page_id": 4, "clicks": 23}
  6. {"ts": "2023-09-05 12:30:00", "page_id": 5, "clicks": 456}
  7. {"ts": "2023-09-05 13:10:00", "page_id": 5, "clicks": 456}
复制代码
  1. 假如没有每隔1小时的需求,仅仅是统计点击量最多的Top 3网页,结果如下
  2. select * from (
  3. select
  4.     page_id,
  5.     totalSum,
  6.     row_number() over (order by totalSum desc) px
  7.   from (
  8.      select page_id,
  9.       sum(clicks)  totalSum
  10.       from kafka_page_clicks group by page_id )  ) where px <=3;
复制代码
根据以上代码,添加滚动窗口的写法:
  1. select
  2.     window_start,
  3.     window_end,
  4.     page_id,
  5.     sum(clicks) totalSum  
  6.     from
  7.    table (
  8.      tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR )
  9.          )
  10.     group by window_start,window_end,page_id;
  11. 在这个基础之上添加排名的写法:
  12. select
  13.    window_start,
  14.    window_end,
  15.    page_id,
  16.    pm
  17.   from   (
  18. select
  19.     window_start,
  20.     window_end,
  21.     page_id,
  22.     row_number() over(partition by window_start,window_end order by totalSum desc ) pm
  23.   from (
  24. select
  25.     window_start,
  26.     window_end,
  27.     page_id,
  28.     sum(clicks) totalSum  
  29.     from
  30.    table (
  31.      tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR )
  32.          )
  33.     group by window_start,window_end,page_id ) t2 ) t1  where pm <= 3;
复制代码
编写建表语句:
  1. {"ts": "2023-09-05 12:00:00", "page_id": 1, "clicks": 100}
  2. CREATE TABLE kafka_page_clicks (
  3.   `ts` TIMESTAMP(3),
  4.   `page_id` int,
  5.   `clicks` int,
  6.   watermark for ts as ts - interval '3' second
  7. ) WITH (
  8.   'connector' = 'kafka',
  9.   'topic' = 'topic1',
  10.   'properties.bootstrap.servers' = 'bigdata01:9092',
  11.   'properties.group.id' = 'g1',
  12.   'scan.startup.mode' = 'latest-offset',
  13.   'format' = 'json'
  14. )
复制代码
  1. package com.bigdata.day08;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. /**
  6. * @基本功能:
  7. * @program:FlinkDemo
  8. * @author: 闫哥
  9. * @create:2023-11-28 15:23:46
  10. **/
  11. public class _05TopNDemo {
  12.     public static void main(String[] args) throws Exception {
  13.         //1. env-准备环境
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         // ctrl + y 删除光标所在的那一行数据  ctrl + d 复制当前行
  16.         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
  17.         //2. source-加载数据
  18.         // 一定要注意:ts 是一个年月日时分秒的数据,所以在建表时一定要是TIMESTAMP,否则进行WATERMARK 报错
  19.         // 因为使用的是event_time 所以,需要指定WATERMARK
  20.         tenv.executeSql("CREATE TABLE kafka_page_clicks (" +
  21.                 "    `ts` TIMESTAMP(3),\n" +
  22.                 "    page_id INT,\n" +
  23.                 "    clicks INT,\n" +
  24.                 "  WATERMARK FOR ts AS ts - INTERVAL '10' SECOND \n" +
  25.                 ") WITH (\n" +
  26.                 "    'connector' = 'kafka',\n" +
  27.                 "    'topic' = 'topic1',\n" +
  28.                 "    'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  29.                 "   'scan.startup.mode' = 'group-offsets',\n" +
  30.                 "    'format' = 'json'\n" +
  31.                 ")");
  32.         tenv.executeSql("select \n" +
  33.                 "   window_start,\n" +
  34.                 "   window_end,\n" +
  35.                 "   page_id,\n" +
  36.                 "   pm\n" +
  37.                 "  from   (\n" +
  38.                 "select \n" +
  39.                 "    window_start,\n" +
  40.                 "    window_end,\n" +
  41.                 "    page_id,\n" +
  42.                 "    row_number() over(partition by window_start,window_end order by totalSum desc ) pm\n" +
  43.                 "  from (\n" +
  44.                 "select \n" +
  45.                 "    window_start,\n" +
  46.                 "    window_end,\n" +
  47.                 "    page_id,\n" +
  48.                 "    sum(clicks) totalSum  \n" +
  49.                 "    from \n" +
  50.                 "   table ( \n" +
  51.                 "     tumble( table kafka_page_clicks, descriptor(ts), INTERVAL '1' HOUR ) \n" +
  52.                 "         ) \n" +
  53.                 "    group by window_start,window_end,page_id ) t2 ) t1  where pm <= 3").print();
  54.         //4. sink-数据输出
  55.         //5. execute-执行
  56.         env.execute();
  57.     }
  58. }
复制代码
最后的运行效果如下:



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

欢乐狗

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表