Flink SQL
1、SQL客户端
1.1 基本使用
- 启动yarn-session
- 启动Flink SQL客户端
- sql-client.sh
- --退出客户端
- exit;
复制代码 - 测试
重启SQL客户端之后,必要重新建表
- -- 构建Kafka Source
- -- 无界流
- drop table if exists students_kafka_source;
- CREATE TABLE if not exists students_kafka_source (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students1000',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv', -- 以 ,分隔的数据
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 执行查询,基于KafkaSource是无界流,所以查询时连续变化的
- select * from students_kafka_source;
- select clazz,count(*) as cnt from students_kafka_source group by clazz;
- -- 向Kafka生产数据
- kafka-console-producer.sh --broker-list master:9092 --topic students1000
复制代码 1.2 三种体现模式
- 表格模式
SQL客户端默认的结果体现模式
在内存中实体化结果,并将结果用规则的分页表格可视化展示出来
- SET 'sql-client.execution.result-mode' = 'table';
复制代码 - 变更日志模式
不会实体化和可视化结果,而是由插入(+)和取消(-)构成的持续查询产生结果流
- SET 'sql-client.execution.result-mode' = 'changelog';
复制代码 - Tableau模式
更靠近传统的数据库,会将实行的结果(雷同变更日志模式,由插入(+)和取消(-)构成的持续查询产生结果流)以制表的情势直接打在屏幕之上
- SET 'sql-client.execution.result-mode' = 'tableau';
复制代码 1.3 差别的实行模式
- 批处置惩罚
只能处置惩罚有界流
结果是固定的
底层是基于MR模子
不会出现由插入(+)和取消(-)构成的持续查询产生结果流这种结果,只会出现最闭幕果
- SET 'execution.runtime-mode' = 'batch';
复制代码 - 流处置惩罚
默认的方式
既可以处置惩罚无界流,也可以处置惩罚有界流
结果是连续变化的
底层是基于持续流模子
- SET 'execution.runtime-mode' = 'streaming';
复制代码 2、常用的connector
2.1 Kafka
- 预备工作
- # 下载依赖https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar# 长传至${FLINK_HOME}/lib# 重启yarn-session.sh# 先找到yarn-session的application idyarn application -list# kill掉yarn-session在Yarn上的进程yarn application -kill application_1722331927004_0007# 再启动yarn-sessionyarn-session.sh -d
- # 再启动sql-clientsql-client.sh
复制代码 - Source
- -- 构建Kafka Source
- -- 无界流
- drop table if exists students_kafka_source;
- CREATE TABLE if not exists students_kafka_source (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING,
- -- Kafka Source提供的数据之外的数据(元数据)
- `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
- `pt` BIGINT METADATA FROM 'partition',
- `offset` BIGINT METADATA FROM 'offset',
- `topic` STRING METADATA FROM 'topic'
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students1000',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 执行查询
- select id,name,event_time,pt,`offset`,`topic` from students_kafka_source limit 10;
复制代码 - Sink
- 结果不带更新的Sink
- drop table if exists students_lksb_sink;
- CREATE TABLE if not exists students_lksb_sink (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students_lksb_sink',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 执行不带更新的查询
- insert into students_lksb_sink
- select id,name,age,gender,clazz from students_kafka_source where clazz='理科四班';
复制代码 - 结果带更新的Sink
Kafka只支持追加的写入,不支持更新数据
故有更新的查询结果无法直接编码,写入Kafka
固然Kafka支支持append,但是可以将更新流编码成“ +、-”不断追加到Kafka中
假如有更新,那么往Kafka写两条记录即可表示更新,即:先“-”再“+”
但是csv这种格式无法表达“-”或“+”操作,故无法在有更新的结果写Kafka时使用
必要使用:canal-json大概是debezium-json
canal-json:{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}
debezium-json:{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}
- -- 基于Kafka Source 统计班级人数 最终结果写入Kafka
- drop table if exists clazz_cnt_sink;
- CREATE TABLE if not exists clazz_cnt_sink (
- `clazz` String,
- `cnt` BIGINT
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'clazz_cnt_sink_02',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'canal-json' -- 或者是指定为debezium-json
- );
- -- 执行查询并且将查询结果插入到Sink表中
- insert into clazz_cnt_sink
- select clazz,count(*) as cnt from students_kafka_source group by clazz;
复制代码
2.2 JDBC
用于连接数据库,例如:MySQL、Oracle、PG、Derby
- 预备工作
- # 下载依赖
- https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar
- # 上传依赖至FLINK的lib目录下,还需要将Linu中MySQL的驱动拷贝一份到lib目录下,可以从Hadoop中进行拷贝
- # 重启yarn-session以及sql客户端
复制代码 - Source
有界流,只会查询一次,查询完后直接结束(从jdbc中读取数据是有界流)
- drop table if exists students_mysql_source;CREATE TABLE if not exists students_mysql_source ( `id` BIGINT, `name` STRING, `age` INT, `gender` STRING, `clazz` STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false', 'table-name' = 'students', 'username' = 'root', 'password' = '123456');-- 实行查询select * from students_mysql_source;-- 将模式换成tableau 看结果变化的全过程SET 'sql-client.execution.result-mode' = 'tableau';
- -- 默认会以 流处置惩罚的方式 实行,所以可以看到结果连续变化的过程select gender,count(*) as cnt from students_mysql_source group by gender;-- 将运行模式切换成批处置惩罚SET 'execution.runtime-mode' = 'batch';
- -- 再试实行,只会看到最终的一个结果,没有变化的过程(这是与流处置惩罚的区别之处)select gender,count(*) as cnt from students_mysql_source group by gender;
复制代码 - Sink
从Kafka接收无界流的学生数据,统计班级人数,将最终的结果写入MySQL
- -- 创建MySQL的结果表-- 查询库中已有表的建表语句show create table xxx;-- 无主键的MySQL建表语句-- 最终发现写入的结果是有连续变更的过程,并不是直接写入最终的结果drop table if exists `clazz_cnt`;CREATE TABLE if not exists `clazz_cnt`( `clazz` varchar(255) DEFAULT NULL ,`cnt` bigint DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 将班级设置为主键-- 最终写入的结果是可以通过主键进行更新,所以可以展示最终的结果,而且可以实时更新drop table if exists `clazz_cnt`;CREATE TABLE if not exists `clazz_cnt`( `clazz` varchar(255) NOT NULL, `cnt` bigint(20) DEFAULT NULL, PRIMARY KEY (`clazz`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 创建MySQL的Sink表drop table if exists clazz_cnt_mysql_sink;CREATE TABLE if not exists clazz_cnt_mysql_sink ( `clazz` STRING, `cnt` BIGINT, -- 假如查询的结果有更新,则必要设置主键 PRIMARY KEY (clazz) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false', 'table-name' = 'clazz_cnt', 'username' = 'root', 'password' = '123456');-- 记得将实行模式切换成流处置惩罚,因为Kafka是无界流SET 'execution.runtime-mode' = 'streaming';
- -- 实行查询:实时统计班级人数,将结果写入MySQLinsert into clazz_cnt_mysql_sinkselect clazz,count(*) as cnt from students_kafka_source where clazz is not null group by clazz;
复制代码 2.3 HDFS
- Source
- 有界流
默认的方式
- drop table if exists students_hdfs_source;
- CREATE TABLE if not exists students_hdfs_source (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING,
- `file.path` STRING NOT NULL METADATA
- ) WITH (
- 'connector' = 'filesystem',
- 'path' = 'hdfs://master:9000/bigdata30/students.txt',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 查询表中是否有数据
- select * from students_hdfs_source limit 100;
复制代码 - 无界流
同DataStream的FileSource一致
可以通过设置source.monitor-interval参数,来指定一个监控的间隔时间,例如:5s
FLink就会定时监控目录的一个变更,有新的文件就可以实时进行读取
最终得到一个无界流
- -- 创建HDFS目录
- hdfs dfs -mkdir /bigdata30/flink
- -- 创建Source表
- drop table if exists students_hdfs_unbounded_source;
- CREATE TABLE if not exists students_hdfs_unbounded_source (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING,
- `file.path` STRING NOT NULL METADATA
- ) WITH (
- 'connector' = 'filesystem',
- 'path' = 'hdfs://master:9000/bigdata30/flink',
- 'source.monitor-interval' = '5s',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 执行查询
- select * from students_hdfs_unbounded_source;
- -- 向目录上传文件
- hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt1
- hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt2
复制代码
- Sink
- 查询结果没有更新,写入数据
- drop table if exists students_hdfs_sink;
- CREATE TABLE if not exists students_hdfs_sink (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING,
- `file_path` STRING
- ) WITH (
- 'connector' = 'filesystem',
- 'path' = 'hdfs://master:9000/bigdata30/sink/',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- insert into students_hdfs_sink
- select * from students_hdfs_source;
复制代码 - 查询结果有更新,写入数据
同Kafka雷同,HDFS不支持更新数据,故必要将变更的结果编码成canal-json大概是debezium-json的格式才能进行insert
- drop table if exists clazz_cnt_hdfs_sink;
- CREATE TABLE if not exists clazz_cnt_hdfs_sink (
- `clazz` STRING,
- `cnt` BIGINT
- ) WITH (
- 'connector' = 'filesystem',
- 'path' = 'hdfs://master:9000/bigdata30/clazz_cnt/',
- 'format' = 'canal-json'
- );
- -- 使用有界的数据源来写入待更新的计算结果
- insert into clazz_cnt_hdfs_sink
- select clazz,count(*) as cnt from students_hdfs_source group by clazz;
复制代码
2.4 HBase
- hbase启动顺序:
- zk(三台虚拟机都启动)-->hadoop(主从复制:在master端启动即可)-->hbase(在master端启动即可)
- hbase关闭顺序:
- hbase-->hadoop-->zk
- # 启动
- start-hbase.sh
- #关闭
- stop-hbase.sh
- # 进入HBase的客户端
- hbase shell
复制代码
- 预备工作
- # 下载依赖
- https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar
- # 上传依赖并重启yarn-session及sql客户端
复制代码 - Source
同MySQL雷同,得到是一个有界流
- drop table if exists students_hbase_source;
- CREATE TABLE if not exists students_hbase_source (
- rowkey STRING,
- info ROW<name STRING, age STRING,gender STRING,clazz STRING>,
- PRIMARY KEY (rowkey) NOT ENFORCED
- ) WITH (
- 'connector' = 'hbase-2.2',
- 'table-name' = 'students',
- 'zookeeper.quorum' = 'master:2181'
- );
- select rowkey,info.name,info.age,info.gender,info.clazz from students_hbase_source;
复制代码 - Sink
同MySQL雷同
- -- 在HBase中建表
- create 'stu01','info'
- -- 构建HBase Sink表
- drop table if exists stu_hbase_sink;
- CREATE TABLE if not exists stu_hbase_sink (
- id STRING,
- info ROW<name STRING,clazz STRING>,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'hbase-2.2',
- 'table-name' = 'stu01',
- 'zookeeper.quorum' = 'master:2181'
- );
- -- 丢弃null的数据
- set 'table.exec.sink.not-null-enforcer'='DROP';
- -- 仅追加的结果写入,由于HBase有rk存在,相同的RK会进行覆盖
- insert into stu_hbase_sink
- select cast(id as STRING) as id,ROW(name,clazz) as info
- from students_kafka_source
- ;
- -- hbase中遍历数据
- scan "stu01",LIMIT => 50
- -- 在HBase中建表
- create 'clazz_cnt_01','info'
- -- 构建HBase Sink表
- drop table if exists clazz_cnt_hbase_sink;
- CREATE TABLE if not exists clazz_cnt_hbase_sink (
- clazz STRING,
- info ROW<cnt BIGINT>,
- PRIMARY KEY (clazz) NOT ENFORCED
- ) WITH (
- 'connector' = 'hbase-2.2',
- 'table-name' = 'clazz_cnt_01',
- 'zookeeper.quorum' = 'master:2181'
- );
- -- 带更新的查询结果可以实时在HBase中通过RK进行更新
- insert into clazz_cnt_hbase_sink
- select clazz,ROW(count(*)) as info
- from students_kafka_source
- group by clazz
- ;
- -- hbase中遍历数据
- scan "clazz_cnt_01",LIMIT => 50
复制代码 2.5 DataGen
用于按照指定的规则生成数据,一样寻常用于性能测试
- drop table if exists datagen;
- CREATE TABLE if not exists datagen (
- id BIGINT
- ,random_id BIGINT
- ,name STRING
- ) WITH (
- 'connector' = 'datagen',
- -- optional options --
- 'rows-per-second'='20', -- 设置每秒钟生成的数据量
-
- 'fields.id.kind' = 'random',
- 'fields.id.min'='10000000',
- 'fields.id.max'='99999999',
- 'fields.random_id.kind' = 'random',
- 'fields.random_id.min'='10000000',
- 'fields.random_id.max'='99999999',
-
- 'fields.name.length'='5'
- );
复制代码 2.6 Blackhole
用于性能测试,可以作为Sink端
- drop table if exists blackhole_table;
- CREATE TABLE if not exists blackhole_table
- WITH ('connector' = 'blackhole')
- LIKE datagen (EXCLUDING ALL);
- insert into blackhole_table
- select * from datagen group by name;
- drop table if exists blackhole_table;
- CREATE TABLE if not exists blackhole_table(
- name String,
- cnt BIGINT
- )
- WITH ('connector' = 'blackhole')
- ;
- insert into blackhole_table
- select name,count(*) as cnt from datagen group by name;
复制代码 2.7 Print
将结果数据在TaskManager中输出
- drop table if exists print_table;
- CREATE TABLE if not exists print_table (
- name STRING,
- cnt BIGINT
- ) WITH (
- 'connector' = 'print'
- );
- insert into print_table
- select name,count(*) as cnt from datagen group by name;
复制代码 3、常用的格式
3.1 CSV
逗号分隔符文件,并非肯定是.csv文件
在作为Sink时的format,仅支持写入不带更新的结果
剖析每条数据是通过次序匹配
常用参数:
csv.ignore-parse-errors 默认false,忽略剖析错误,不会导致步伐直接停止
csv.field-delimiter 默认 逗号,指定数据的列分隔符
3.2 JSON
3.2.1 json
平凡的json格式,剖析数据是通过列名进行匹配
同csv雷同,只支持写入不带更新的结果
- drop table if exists cars_json_source;
- CREATE TABLE if not exists cars_json_source (
- car String
- ,county_code INT
- ,city_code INT
- ,card BIGINT
- ,camera_id String
- ,orientation String
- ,road_id BIGINT
- ,`time` BIGINT
- ,speed Double
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'cars_json',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'json'
- );
复制代码 3.2.2 canal-json
一种特殊的JSON格式
支持写入更新的结果
{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}
3.2.3 debezium-json
同canal-json,只是数据格式有些许差别
{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}
3.3 ORC
一样寻常不消
3.4 PARQUET
一样寻常不消
4、时间属性
4.1 处置惩罚时间
基于系统的时间
- drop table if exists students_kafka_source;
- CREATE TABLE if not exists students_kafka_source (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING,
- -- 通过系统时间给表增加一列,即:处理时间
- proc_time as PROCTIME()
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'students1000',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- select clazz
- ,count(*) as cnt
- ,tumble_start(proc_time,INTERVAL '5' SECONDS) as window_start
- ,tumble_end(proc_time,INTERVAL '5' SECONDS) as window_end
- from students_kafka_source
- group by clazz,tumble(proc_time,INTERVAL '5' SECONDS)
- ;
- -- 向Topic中生产数据
- kafka-console-producer.sh --broker-list master:9092 --topic students1000
复制代码 4.2 事件时间
基于数据自带的时间
java,2024-08-03 10:41:50
java,2024-08-03 10:41:51
java,2024-08-03 10:41:52
- drop table if exists words_kafka_source;
- CREATE TABLE if not exists words_kafka_source (
- `word` STRING,
- -- 从数据中过来的一列,作为事件时间
- event_time TIMESTAMP(3),
- -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
- WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'words_event_time',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 创建topic
- kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic words_event_time
- -- 执行查询,使用滚动的事件时间窗口进行word count,每5s统计一次
- select word
- ,count(*) as cnt
- ,tumble_start(event_time,INTERVAL '5' SECONDS) as window_start
- ,tumble_end(event_time,INTERVAL '5' SECONDS) as window_end
- from words_kafka_source
- group by word,tumble(event_time,INTERVAL '5' SECONDS)
- ;
- -- 向Topic中生产数据
- kafka-console-producer.sh --broker-list master:9092 --topic words_event_time
复制代码 5、SQL语法
5.1 Hints
在SQL查询时动态修改表的参数配置
- -- words_kafka_source 默认从最后开始消费
- select * from words_kafka_source; // 只能查询到最新的数据,不会从头开始消费
- -- 假设现在需要从头开始消费
- -- 第一种方案,将words_kafka_source删除重建
- -- 第二种方案,通过alter table 对表进行修改
- -- 第三种方案,通过hints动态调整表的配置
- select * from words_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset') */ ;
复制代码 5.2 With
用于将多次实行的同一查询通过with先定义,后面可以进行多次使用,避免重复的SQL
应用场景:1、多次使用的SQL查询可以缓存提高性能 2、将多级嵌套解开来,降低主SQL的复杂度
- drop table if exists students_mysql_source;
- CREATE TABLE if not exists students_mysql_source (
- `id` BIGINT,
- `name` STRING,
- `age` INT,
- `gender` STRING,
- `clazz` STRING,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false',
- 'table-name' = 'students',
- 'username' = 'root',
- 'password' = '123456'
- );
- select id,name from students_mysql_source where clazz = '理科一班'
- union all
- select id,name from students_mysql_source where clazz = '理科一班'
- ;
- -- 通过with可以将多次使用的SQL进行定义
- with stu_lkyb as (
- select id,name from students_mysql_source where clazz = '理科一班'
- )
- select * from stu_lkyb
- union all
- select * from stu_lkyb
- union all
- select * from stu_lkyb
- ;
复制代码 5.3 Where
可以进行过滤
- select id,name,clazz,age from students_mysql_source where clazz = '理科一班' and age > 20;
- -- 找到重复数据并进行过滤
- select id,name,age,gender,clazz
- from (
- select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
- ) t1 where t1.cnt = 1;
- -- 聚合后的过滤可以使用Having
- select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
- having count(*) = 1;
复制代码 5.4 Distinct
用于去重
必要对每条差别的数据维护一个状态,状态会无限制的增大,最终使命可能会失败
无界流是正常可以去重的
有界流必须在分组之后带上聚合操作才能去重,假如直接distinct大概是groupby不聚合,最终使命里不会产生shuffle,即不会分组,也就无法去重
- -- 去重
- select id,name,age,gender,clazz from students_mysql_source group by id,name,age,gender,clazz;
- -- 等价于distinct
- select distinct id,name,age,gender,clazz from students_mysql_source;
- select distinct id from students_mysql_source;
复制代码 5.5 Windowing TVFs
目条件供了三类TVFs窗口操作:TUMBLE、HOP、CUMULATE
会话SESSION窗口只能通过GROUP WINDOW FUNCTION实现
计数窗口在FLINK SQL中暂未支持
5.5.1 Tumble
必要设置一个滚动时间
每隔一段时间会触发一次窗口的统计
- -- 创建Bid订单表
- drop table if exists bid_kafka_source;
- CREATE TABLE if not exists bid_kafka_source (
- `item` STRING,
- `price` DOUBLE,
- `bidtime` TIMESTAMP(3),
- `proc_time` as PROCTIME(),
- -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
- WATERMARK FOR bidtime AS bidtime
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'bid',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 准备数据
- C,4.00,2020-04-15 08:05:00
- A,2.00,2020-04-15 08:07:00
- D,5.00,2020-04-15 08:09:00
- B,3.00,2020-04-15 08:11:00
- E,1.00,2020-04-15 08:13:00
- F,6.00,2020-04-15 08:17:00
- -- 创建Kafka Topic
- kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic bid
- -- 生产数据
- kafka-console-producer.sh --broker-list master:9092 --topic bid
- -- 基于事件时间的滚动窗口
- SELECT window_start,window_end,sum(price) as sum_price
- FROM TABLE(
- -- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time
- -- 如果需要基于窗口的统计则按照窗口列分组即可
- TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
- ) group by window_start,window_end
- ;
- -- 基于处理时间的滚动窗口
- SELECT window_start,window_end,sum(price) as sum_price
- FROM TABLE(
- -- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time
- -- 如果需要基于窗口的统计则按照窗口列分组即可
- TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '10' SECONDS)
- ) group by window_start,window_end
- ;
复制代码 5.5.2 HOP
滑动窗口
必要指定两个时间:滑动的时间、窗口的巨细
- -- 基于事件时间的滑动窗口
- SELECT window_start,window_end,sum(price) as sum_price
- FROM TABLE(
- -- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time
- -- 如果需要基于窗口的统计则按照窗口列分组即可
- HOP(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
- ) group by window_start,window_end
- ;
- -- 基于处理时间的滑动窗口
- SELECT window_start,window_end,sum(price) as sum_price
- FROM TABLE(
- -- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time
- -- 如果需要基于窗口的统计则按照窗口列分组即可
- HOP(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '5' SECONDS, INTERVAL '10' SECONDS)
- ) group by window_start,window_end
- ;
复制代码 5.5.3 CUMULATE
累积窗口:起首会按照步长初始化一个窗口巨细,然后按照步长的间隔时间触发窗口的统计,接下来窗口巨细会不断增大,直到到达设置的最大size,然后重复这个过程
必要指定两个时间间隔:步长、最大的size
例如:步长为2分钟,size为10分钟
每隔2分钟会触发一次统计,第一次统计的最近两分钟的数据,第二次统计是最近4分钟的…第5次统计是最近10分钟的数据,第6次统计是最近2分钟的数据…
- -- 基于事件时间的累计窗口
- SELECT window_start,window_end,sum(price) as sum_price
- FROM TABLE(
- -- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time
- -- 如果需要基于窗口的统计则按照窗口列分组即可
- CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
- ) group by window_start,window_end
- ;
- -- 基于处理时间的累计窗口
- SELECT window_start,window_end,sum(price) as sum_price
- FROM TABLE(
- -- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time
- -- 如果需要基于窗口的统计则按照窗口列分组即可
- CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '2' SECONDS, INTERVAL '10' SECONDS)
- ) group by window_start,window_end
- ;
复制代码 5.5.4 SESSION
会话窗口,现在版本不支持TVFs写法
必要使用老版本的写法:GROUP WINDOW FUNCTION
间隔一段时间没有数据就会触发窗口的统计
- -- 基于事件时间的会话窗口
- select session_start(bidtime, INTERVAL '2' MINUTES) as session_start
- ,session_end(bidtime, INTERVAL '2' MINUTES) as session_end
- ,sum(price) as sum_price
- from bid_kafka_source
- group by session(bidtime, INTERVAL '2' MINUTES)
- ;
- -- 基于处理时间的会话窗口
- select session_start(proc_time, INTERVAL '2' SECONDS) as session_start
- ,session_end(proc_time, INTERVAL '2' SECONDS) as session_end
- ,sum(price) as sum_price
- from bid_kafka_source
- group by session(proc_time, INTERVAL '2' SECONDS)
- ;
复制代码 6、Over聚合
6.1 聚合类
sum、max、min、count、avg
sum 比力特殊:假如指定了order By,则表示累加求和,不指定则表示整个窗口求和
max、min、count、avg 不必要指定order By
- -- 准备数据
- item,supply_id,price,bidtime
- A,001,4.00,2020-04-15 08:05:00
- A,002,2.00,2020-04-15 08:06:00
- A,001,5.00,2020-04-15 08:07:00
- B,002,3.00,2020-04-15 08:08:00
- A,001,1.00,2020-04-15 08:09:00
- A,002,6.00,2020-04-15 08:10:00
- B,001,6.00,2020-04-15 08:11:00
- A,001,6.00,2020-04-15 08:12:00
- B,002,6.00,2020-04-15 08:13:00
- B,002,6.00,2020-04-15 08:14:00
- A,001,66.00,2020-04-15 08:15:00
- B,001,6.00,2020-04-15 08:16:00
- -- 创建order订单表
- drop table if exists order_kafka_source;
- CREATE TABLE if not exists order_kafka_source (
- `item` STRING,
- `supply_id` STRING,
- `price` DOUBLE,
- `bidtime` TIMESTAMP(3),
- -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
- WATERMARK FOR bidtime AS bidtime
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'order',
- 'properties.bootstrap.servers' = 'master:9092',
- 'properties.group.id' = 'grp1',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'csv',
- -- 是否忽略脏数据
- 'csv.ignore-parse-errors' = 'true'
- );
- -- 创建Kafka Topic
- kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic order
- -- 生产数据
- kafka-console-producer.sh --broker-list master:9092 --topic order
- -- 聚合类函数在实时的Over窗口上只会产生追加的数据,没有更新
- -- 最终需要维护的状态大小同partition by指定的字段有关
- -- 1、统计每种商品的累计成交金额
- select item
- -- 必须指定order by ,而且必须使用时间列升序,
- -- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算
- -- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合
- ,sum(price) over (partition by item order by bidtime) as sum_price
- from order_kafka_source
- ;
- -- 2、统计每种商品的最大成交金额
- select item
- -- 必须指定order by ,而且必须使用时间列升序,
- -- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计
- -- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值
- ,max(price) over (partition by item order by bidtime) as max_price
- from order_kafka_source
- ;
- -- 3、统计每种商品的最小、平均成交金额/交易次数 同上
- -- 4、统计最近10分钟内每种商品的累计成交金额
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |