ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink学习之Flink SQL [打印本页]

作者: 张国伟    时间: 2024-9-9 22:55
标题: Flink学习之Flink SQL
Flink SQL

1、SQL客户端

1.1 基本使用


1.2 三种体现模式


1.3 差别的实行模式


2、常用的connector

2.1 Kafka


2.2 JDBC

   用于连接数据库,例如:MySQL、Oracle、PG、Derby
  
2.3 HDFS


2.4 HBase

  1. hbase启动顺序:
  2. zk(三台虚拟机都启动)-->hadoop(主从复制:在master端启动即可)-->hbase(在master端启动即可)
  3. hbase关闭顺序:
  4. hbase-->hadoop-->zk
  5. # 启动
  6. start-hbase.sh
  7. #关闭
  8. stop-hbase.sh
  9. # 进入HBase的客户端
  10. hbase shell
复制代码

2.5 DataGen

   用于按照指定的规则生成数据,一样寻常用于性能测试
  1. drop table if exists datagen;
  2. CREATE TABLE if not exists datagen (
  3.     id BIGINT
  4.     ,random_id BIGINT
  5.     ,name STRING
  6. ) WITH (
  7. 'connector' = 'datagen',
  8. -- optional options --
  9. 'rows-per-second'='20', -- 设置每秒钟生成的数据量
  10.    
  11.     'fields.id.kind' = 'random',
  12.     'fields.id.min'='10000000',
  13.     'fields.id.max'='99999999',
  14.     'fields.random_id.kind' = 'random',
  15.     'fields.random_id.min'='10000000',
  16.     'fields.random_id.max'='99999999',
  17.    
  18.     'fields.name.length'='5'
  19. );
复制代码
2.6 Blackhole

   用于性能测试,可以作为Sink端
  1. drop table if exists blackhole_table;
  2. CREATE TABLE if not exists  blackhole_table
  3. WITH ('connector' = 'blackhole')
  4. LIKE datagen (EXCLUDING ALL);
  5. insert into blackhole_table
  6. select * from datagen group by name;
  7. drop table if exists blackhole_table;
  8. CREATE TABLE if not exists  blackhole_table(
  9.         name String,
  10.     cnt BIGINT
  11. )
  12. WITH ('connector' = 'blackhole')
  13. ;
  14. insert into blackhole_table
  15. select name,count(*) as cnt from datagen group by name;
复制代码
2.7 Print

   将结果数据在TaskManager中输出
  1. drop table if exists print_table;
  2. CREATE TABLE if not exists print_table (
  3. name STRING,
  4. cnt BIGINT
  5. ) WITH (
  6. 'connector' = 'print'
  7. );
  8. insert into print_table
  9. select name,count(*) as cnt from datagen group by name;
复制代码
3、常用的格式

3.1 CSV

   逗号分隔符文件,并非肯定是.csv文件
  在作为Sink时的format,仅支持写入不带更新的结果
  剖析每条数据是通过次序匹配
  常用参数:
  csv.ignore-parse-errors 默认false,忽略剖析错误,不会导致步伐直接停止
  csv.field-delimiter 默认 逗号,指定数据的列分隔符
  3.2 JSON

3.2.1 json

   平凡的json格式,剖析数据是通过列名进行匹配
  同csv雷同,只支持写入不带更新的结果
  1. drop table if exists cars_json_source;
  2. CREATE TABLE if not exists cars_json_source (
  3.     car String
  4.     ,county_code INT
  5.     ,city_code INT
  6.     ,card BIGINT
  7.     ,camera_id String
  8.     ,orientation String
  9.     ,road_id BIGINT
  10.     ,`time` BIGINT
  11.     ,speed Double
  12. ) WITH (
  13.   'connector' = 'kafka',
  14.   'topic' = 'cars_json',
  15.   'properties.bootstrap.servers' = 'master:9092',
  16.   'properties.group.id' = 'grp1',
  17.   'scan.startup.mode' = 'earliest-offset',
  18.   'format' = 'json'
  19. );
复制代码
3.2.2 canal-json

   一种特殊的JSON格式
  支持写入更新的结果
  {“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}
  3.2.3 debezium-json

   同canal-json,只是数据格式有些许差别
  {“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}
  3.3 ORC

   一样寻常不消
  3.4 PARQUET

   一样寻常不消
  4、时间属性

4.1 处置惩罚时间

   基于系统的时间
  1. drop table if exists students_kafka_source;
  2. CREATE TABLE if not exists students_kafka_source (
  3.   `id` BIGINT,
  4.   `name` STRING,
  5.   `age` INT,
  6.   `gender` STRING,
  7.   `clazz` STRING,
  8.    -- 通过系统时间给表增加一列,即:处理时间
  9.    proc_time as PROCTIME()
  10. ) WITH (
  11.   'connector' = 'kafka',
  12.   'topic' = 'students1000',
  13.   'properties.bootstrap.servers' = 'master:9092',
  14.   'properties.group.id' = 'grp1',
  15.   'scan.startup.mode' = 'earliest-offset',
  16.   'format' = 'csv',
  17.   -- 是否忽略脏数据
  18.   'csv.ignore-parse-errors' = 'true'
  19. );
  20. select  clazz
  21.         ,count(*) as cnt
  22.         ,tumble_start(proc_time,INTERVAL '5' SECONDS) as window_start
  23.         ,tumble_end(proc_time,INTERVAL '5' SECONDS) as window_end
  24. from students_kafka_source
  25. group by clazz,tumble(proc_time,INTERVAL '5' SECONDS)
  26. ;
  27. -- 向Topic中生产数据
  28. kafka-console-producer.sh --broker-list master:9092 --topic students1000
复制代码
4.2 事件时间

   基于数据自带的时间
  java,2024-08-03 10:41:50
  java,2024-08-03 10:41:51
  java,2024-08-03 10:41:52
  1. drop table if exists words_kafka_source;
  2. CREATE TABLE if not exists words_kafka_source (
  3.   `word` STRING,
  4.    -- 从数据中过来的一列,作为事件时间
  5.    event_time TIMESTAMP(3),
  6.    -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
  7.    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
  8. ) WITH (
  9.   'connector' = 'kafka',
  10.   'topic' = 'words_event_time',
  11.   'properties.bootstrap.servers' = 'master:9092',
  12.   'properties.group.id' = 'grp1',
  13.   'scan.startup.mode' = 'latest-offset',
  14.   'format' = 'csv',
  15.   -- 是否忽略脏数据
  16.   'csv.ignore-parse-errors' = 'true'
  17. );
  18. -- 创建topic
  19. kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic words_event_time
  20. -- 执行查询,使用滚动的事件时间窗口进行word count,每5s统计一次
  21. select  word
  22.         ,count(*) as cnt
  23.         ,tumble_start(event_time,INTERVAL '5' SECONDS) as window_start
  24.         ,tumble_end(event_time,INTERVAL '5' SECONDS) as window_end
  25. from words_kafka_source
  26. group by word,tumble(event_time,INTERVAL '5' SECONDS)
  27. ;
  28. -- 向Topic中生产数据
  29. kafka-console-producer.sh --broker-list master:9092 --topic words_event_time
复制代码
5、SQL语法

5.1 Hints

   在SQL查询时动态修改表的参数配置
  1. -- words_kafka_source 默认从最后开始消费
  2. select * from words_kafka_source; // 只能查询到最新的数据,不会从头开始消费
  3. -- 假设现在需要从头开始消费
  4. -- 第一种方案,将words_kafka_source删除重建
  5. -- 第二种方案,通过alter table 对表进行修改
  6. -- 第三种方案,通过hints动态调整表的配置
  7. select * from words_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset') */ ;
复制代码
5.2 With

   用于将多次实行的同一查询通过with先定义,后面可以进行多次使用,避免重复的SQL
  应用场景:1、多次使用的SQL查询可以缓存提高性能 2、将多级嵌套解开来,降低主SQL的复杂度
  1. drop table if exists students_mysql_source;
  2. CREATE TABLE if not exists students_mysql_source (
  3.   `id` BIGINT,
  4.   `name` STRING,
  5.   `age` INT,
  6.   `gender` STRING,
  7.   `clazz` STRING,
  8.   PRIMARY KEY (id) NOT ENFORCED
  9. ) WITH (
  10.    'connector' = 'jdbc',
  11.    'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false',
  12.    'table-name' = 'students',
  13.    'username' = 'root',
  14.    'password' = '123456'
  15. );
  16. select id,name from students_mysql_source where clazz = '理科一班'
  17. union all
  18. select id,name from students_mysql_source where clazz = '理科一班'
  19. ;
  20. -- 通过with可以将多次使用的SQL进行定义
  21. with stu_lkyb as (
  22.   select id,name from students_mysql_source where clazz = '理科一班'
  23. )
  24. select * from stu_lkyb
  25. union all
  26. select * from stu_lkyb
  27. union all
  28. select * from stu_lkyb
  29. ;
复制代码
5.3 Where

   可以进行过滤
  1. select id,name,clazz,age from students_mysql_source where clazz = '理科一班' and age > 20;
  2. -- 找到重复数据并进行过滤
  3. select        id,name,age,gender,clazz
  4. from (
  5. select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
  6. ) t1 where t1.cnt = 1;
  7. -- 聚合后的过滤可以使用Having
  8. select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
  9. having count(*) = 1;
复制代码
5.4 Distinct

   用于去重
  必要对每条差别的数据维护一个状态,状态会无限制的增大,最终使命可能会失败
  无界流是正常可以去重的
  有界流必须在分组之后带上聚合操作才能去重,假如直接distinct大概是groupby不聚合,最终使命里不会产生shuffle,即不会分组,也就无法去重
  1. -- 去重
  2. select id,name,age,gender,clazz from students_mysql_source group by id,name,age,gender,clazz;
  3. -- 等价于distinct
  4. select distinct id,name,age,gender,clazz from students_mysql_source;
  5. select distinct id from students_mysql_source;
复制代码
5.5 Windowing TVFs

   目条件供了三类TVFs窗口操作:TUMBLE、HOP、CUMULATE
  会话SESSION窗口只能通过GROUP WINDOW FUNCTION实现
  计数窗口在FLINK SQL中暂未支持
  5.5.1 Tumble

   必要设置一个滚动时间
  每隔一段时间会触发一次窗口的统计
  1. -- 创建Bid订单表
  2. drop table if exists bid_kafka_source;
  3. CREATE TABLE if not exists bid_kafka_source (
  4.   `item` STRING,
  5.   `price` DOUBLE,
  6.   `bidtime` TIMESTAMP(3),
  7.   `proc_time` as PROCTIME(),
  8.    -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
  9.    WATERMARK FOR bidtime AS bidtime
  10. ) WITH (
  11.   'connector' = 'kafka',
  12.   'topic' = 'bid',
  13.   'properties.bootstrap.servers' = 'master:9092',
  14.   'properties.group.id' = 'grp1',
  15.   'scan.startup.mode' = 'earliest-offset',
  16.   'format' = 'csv',
  17.   -- 是否忽略脏数据
  18.   'csv.ignore-parse-errors' = 'true'
  19. );
  20. -- 准备数据
  21. C,4.00,2020-04-15 08:05:00
  22. A,2.00,2020-04-15 08:07:00
  23. D,5.00,2020-04-15 08:09:00
  24. B,3.00,2020-04-15 08:11:00
  25. E,1.00,2020-04-15 08:13:00
  26. F,6.00,2020-04-15 08:17:00
  27. -- 创建Kafka Topic
  28. kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic bid
  29. -- 生产数据
  30. kafka-console-producer.sh --broker-list master:9092 --topic bid
  31. -- 基于事件时间的滚动窗口
  32. SELECT window_start,window_end,sum(price) as sum_price
  33. FROM TABLE(
  34.     -- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  35.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  36.     TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
  37. ) group by window_start,window_end
  38. ;
  39. -- 基于处理时间的滚动窗口
  40. SELECT window_start,window_end,sum(price) as sum_price
  41. FROM TABLE(
  42.     -- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  43.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  44.     TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '10' SECONDS)
  45. ) group by window_start,window_end
  46. ;
复制代码
5.5.2 HOP

   滑动窗口
  必要指定两个时间:滑动的时间、窗口的巨细
  1. -- 基于事件时间的滑动窗口
  2. SELECT window_start,window_end,sum(price) as sum_price
  3. FROM TABLE(
  4.     -- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  5.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  6.     HOP(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
  7. ) group by window_start,window_end
  8. ;
  9. -- 基于处理时间的滑动窗口
  10. SELECT window_start,window_end,sum(price) as sum_price
  11. FROM TABLE(
  12.     -- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  13.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  14.     HOP(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '5' SECONDS, INTERVAL '10' SECONDS)
  15. ) group by window_start,window_end
  16. ;
复制代码
5.5.3 CUMULATE

   累积窗口:起首会按照步长初始化一个窗口巨细,然后按照步长的间隔时间触发窗口的统计,接下来窗口巨细会不断增大,直到到达设置的最大size,然后重复这个过程
  必要指定两个时间间隔:步长、最大的size
  例如:步长为2分钟,size为10分钟
  每隔2分钟会触发一次统计,第一次统计的最近两分钟的数据,第二次统计是最近4分钟的…第5次统计是最近10分钟的数据,第6次统计是最近2分钟的数据…
  1. -- 基于事件时间的累计窗口
  2. SELECT window_start,window_end,sum(price) as sum_price
  3. FROM TABLE(
  4.     -- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  5.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  6.     CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
  7. ) group by window_start,window_end
  8. ;
  9. -- 基于处理时间的累计窗口
  10. SELECT window_start,window_end,sum(price) as sum_price
  11. FROM TABLE(
  12.     -- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  13.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  14.     CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '2' SECONDS, INTERVAL '10' SECONDS)
  15. ) group by window_start,window_end
  16. ;
复制代码
5.5.4 SESSION

   会话窗口,现在版本不支持TVFs写法
  必要使用老版本的写法:GROUP WINDOW FUNCTION
  间隔一段时间没有数据就会触发窗口的统计
  1. -- 基于事件时间的会话窗口
  2. select session_start(bidtime, INTERVAL '2' MINUTES) as session_start
  3.        ,session_end(bidtime, INTERVAL '2' MINUTES) as session_end
  4.        ,sum(price) as sum_price
  5. from bid_kafka_source
  6. group by session(bidtime, INTERVAL '2' MINUTES)
  7. ;
  8. -- 基于处理时间的会话窗口
  9. select session_start(proc_time, INTERVAL '2' SECONDS) as session_start
  10.        ,session_end(proc_time, INTERVAL '2' SECONDS) as session_end
  11.        ,sum(price) as sum_price
  12. from bid_kafka_source
  13. group by session(proc_time, INTERVAL '2' SECONDS)
  14. ;
复制代码
6、Over聚合

6.1 聚合类

   sum、max、min、count、avg
  sum 比力特殊:假如指定了order By,则表示累加求和,不指定则表示整个窗口求和
  max、min、count、avg 不必要指定order By
  1. -- 准备数据
  2. item,supply_id,price,bidtime
  3. A,001,4.00,2020-04-15 08:05:00
  4. A,002,2.00,2020-04-15 08:06:00
  5. A,001,5.00,2020-04-15 08:07:00
  6. B,002,3.00,2020-04-15 08:08:00
  7. A,001,1.00,2020-04-15 08:09:00
  8. A,002,6.00,2020-04-15 08:10:00
  9. B,001,6.00,2020-04-15 08:11:00
  10. A,001,6.00,2020-04-15 08:12:00
  11. B,002,6.00,2020-04-15 08:13:00
  12. B,002,6.00,2020-04-15 08:14:00
  13. A,001,66.00,2020-04-15 08:15:00
  14. B,001,6.00,2020-04-15 08:16:00
  15. -- 创建order订单表
  16. drop table if exists order_kafka_source;
  17. CREATE TABLE if not exists order_kafka_source (
  18.   `item` STRING,
  19.   `supply_id` STRING,
  20.   `price` DOUBLE,
  21.   `bidtime` TIMESTAMP(3),
  22.    -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
  23.    WATERMARK FOR bidtime AS bidtime
  24. ) WITH (
  25.   'connector' = 'kafka',
  26.   'topic' = 'order',
  27.   'properties.bootstrap.servers' = 'master:9092',
  28.   'properties.group.id' = 'grp1',
  29.   'scan.startup.mode' = 'latest-offset',
  30.   'format' = 'csv',
  31.   -- 是否忽略脏数据
  32.   'csv.ignore-parse-errors' = 'true'
  33. );
  34. -- 创建Kafka Topic
  35. kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic order
  36. -- 生产数据
  37. kafka-console-producer.sh --broker-list master:9092 --topic order
  38. -- 聚合类函数在实时的Over窗口上只会产生追加的数据,没有更新
  39. -- 最终需要维护的状态大小同partition by指定的字段有关
  40. -- 1、统计每种商品的累计成交金额
  41. select item
  42.        -- 必须指定order by ,而且必须使用时间列升序,
  43.        -- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算
  44.        -- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合
  45.        ,sum(price) over (partition by item order by bidtime) as sum_price
  46. from order_kafka_source
  47. ;
  48. -- 2、统计每种商品的最大成交金额
  49. select item
  50.        -- 必须指定order by ,而且必须使用时间列升序,
  51.        -- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计
  52.        -- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值
  53.        ,max(price) over (partition by item order by bidtime) as max_price
  54. from order_kafka_source
  55. ;
  56. -- 3、统计每种商品的最小、平均成交金额/交易次数 同上
  57. -- 4、统计最近10分钟内每种商品的累计成交金额
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4