Flink学习之Flink SQL

打印 上一主题 下一主题

主题 511|帖子 511|积分 1533

Flink SQL

1、SQL客户端

1.1 基本使用



  • 启动yarn-session
    1. yarn-session.sh -d
    复制代码
  • 启动Flink SQL客户端
    1. sql-client.sh
    2. --退出客户端
    3. exit;
    复制代码
  • 测试
           重启SQL客户端之后,必要重新建表
       
    1. -- 构建Kafka Source
    2. -- 无界流
    3. drop table if exists students_kafka_source;
    4. CREATE TABLE if not exists students_kafka_source (
    5.   `id` BIGINT,
    6.   `name` STRING,
    7.   `age` INT,
    8.   `gender` STRING,
    9.   `clazz` STRING
    10. ) WITH (
    11.   'connector' = 'kafka',
    12.   'topic' = 'students1000',
    13.   'properties.bootstrap.servers' = 'master:9092',
    14.   'properties.group.id' = 'grp1',
    15.   'scan.startup.mode' = 'earliest-offset',
    16.   'format' = 'csv', -- 以 ,分隔的数据
    17.   -- 是否忽略脏数据
    18.   'csv.ignore-parse-errors' = 'true'
    19. );
    20. -- 执行查询,基于KafkaSource是无界流,所以查询时连续变化的
    21. select * from students_kafka_source;
    22. select clazz,count(*) as cnt from students_kafka_source group by clazz;
    23. -- 向Kafka生产数据
    24. kafka-console-producer.sh --broker-list master:9092 --topic students1000
    复制代码
1.2 三种体现模式



  • 表格模式
           SQL客户端默认的结果体现模式
        在内存中实体化结果,并将结果用规则的分页表格可视化展示出来
       
    1. SET 'sql-client.execution.result-mode' = 'table';
    复制代码
  • 变更日志模式
           不会实体化和可视化结果,而是由插入(+)和取消(-)构成的持续查询产生结果流
       
    1. SET 'sql-client.execution.result-mode' = 'changelog';
    复制代码
  • Tableau模式
           更靠近传统的数据库,会将实行的结果(雷同变更日志模式,由插入(+)和取消(-)构成的持续查询产生结果流)以制表的情势直接打在屏幕之上
       
    1. SET 'sql-client.execution.result-mode' = 'tableau';
    复制代码
1.3 差别的实行模式



  • 批处置惩罚
           只能处置惩罚有界流
        结果是固定的
        底层是基于MR模子
        不会出现由插入(+)和取消(-)构成的持续查询产生结果流这种结果,只会出现最闭幕果
       
    1. SET 'execution.runtime-mode' = 'batch';
    复制代码
  • 流处置惩罚
           默认的方式
        既可以处置惩罚无界流,也可以处置惩罚有界流
        结果是连续变化的
        底层是基于持续流模子
       
    1. SET 'execution.runtime-mode' = 'streaming';
    复制代码
2、常用的connector

2.1 Kafka



  • 预备工作
    1. # 下载依赖https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar# 长传至${FLINK_HOME}/lib# 重启yarn-session.sh# 先找到yarn-session的application idyarn application -list# kill掉yarn-session在Yarn上的进程yarn application -kill application_1722331927004_0007# 再启动yarn-sessionyarn-session.sh -d
    2. # 再启动sql-clientsql-client.sh
    复制代码
  • Source
    1. -- 构建Kafka Source
    2. -- 无界流
    3. drop table if exists students_kafka_source;
    4. CREATE TABLE if not exists students_kafka_source (
    5.   `id` BIGINT,
    6.   `name` STRING,
    7.   `age` INT,
    8.   `gender` STRING,
    9.   `clazz` STRING,
    10.    -- Kafka Source提供的数据之外的数据(元数据)
    11.   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    12.   `pt` BIGINT METADATA FROM 'partition',
    13.   `offset` BIGINT METADATA FROM 'offset',
    14.   `topic` STRING METADATA FROM 'topic'
    15. ) WITH (
    16.   'connector' = 'kafka',
    17.   'topic' = 'students1000',
    18.   'properties.bootstrap.servers' = 'master:9092',
    19.   'properties.group.id' = 'grp1',
    20.   'scan.startup.mode' = 'earliest-offset',
    21.   'format' = 'csv',
    22.   -- 是否忽略脏数据
    23.   'csv.ignore-parse-errors' = 'true'
    24. );
    25. -- 执行查询
    26. select id,name,event_time,pt,`offset`,`topic` from students_kafka_source limit 10;
    复制代码
  • Sink

    • 结果不带更新的Sink
      1. drop table if exists students_lksb_sink;
      2. CREATE TABLE if not exists students_lksb_sink (
      3.   `id` BIGINT,
      4.   `name` STRING,
      5.   `age` INT,
      6.   `gender` STRING,
      7.   `clazz` STRING
      8. ) WITH (
      9.   'connector' = 'kafka',
      10.   'topic' = 'students_lksb_sink',
      11.   'properties.bootstrap.servers' = 'master:9092',
      12.   'properties.group.id' = 'grp1',
      13.   'scan.startup.mode' = 'earliest-offset',
      14.   'format' = 'csv',
      15.   -- 是否忽略脏数据
      16.   'csv.ignore-parse-errors' = 'true'
      17. );
      18. -- 执行不带更新的查询
      19. insert into students_lksb_sink
      20. select id,name,age,gender,clazz from students_kafka_source where clazz='理科四班';
      复制代码
    • 结果带更新的Sink
                 Kafka只支持追加的写入,不支持更新数据
            故有更新的查询结果无法直接编码,写入Kafka
            固然Kafka支支持append,但是可以将更新流编码成“ +、-”不断追加到Kafka中
            假如有更新,那么往Kafka写两条记录即可表示更新,即:先“-”再“+”
            但是csv这种格式无法表达“-”或“+”操作,故无法在有更新的结果写Kafka时使用
            必要使用:canal-json大概是debezium-json
            canal-json:{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}
            debezium-json:{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}
           
      1. -- 基于Kafka Source 统计班级人数 最终结果写入Kafka
      2. drop table if exists clazz_cnt_sink;
      3. CREATE TABLE if not exists clazz_cnt_sink (
      4.   `clazz` String,
      5.   `cnt` BIGINT
      6. ) WITH (
      7.   'connector' = 'kafka',
      8.   'topic' = 'clazz_cnt_sink_02',
      9.   'properties.bootstrap.servers' = 'master:9092',
      10.   'properties.group.id' = 'grp1',
      11.   'scan.startup.mode' = 'earliest-offset',
      12.   'format' = 'canal-json' -- 或者是指定为debezium-json
      13. );
      14. -- 执行查询并且将查询结果插入到Sink表中
      15. insert into clazz_cnt_sink
      16. select clazz,count(*) as cnt from students_kafka_source group by clazz;
      复制代码

2.2 JDBC

   用于连接数据库,例如:MySQL、Oracle、PG、Derby
  

  • 预备工作
    1. # 下载依赖
    2. https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar
    3. # 上传依赖至FLINK的lib目录下,还需要将Linu中MySQL的驱动拷贝一份到lib目录下,可以从Hadoop中进行拷贝
    4. # 重启yarn-session以及sql客户端
    复制代码
  • Source
           有界流,只会查询一次,查询完后直接结束(从jdbc中读取数据是有界流
       
    1. drop table if exists students_mysql_source;CREATE TABLE if not exists students_mysql_source (  `id` BIGINT,  `name` STRING,  `age` INT,  `gender` STRING,  `clazz` STRING,  PRIMARY KEY (id) NOT ENFORCED) WITH (   'connector' = 'jdbc',   'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false',   'table-name' = 'students',   'username' = 'root',   'password' = '123456');-- 实行查询select * from students_mysql_source;-- 将模式换成tableau 看结果变化的全过程SET 'sql-client.execution.result-mode' = 'tableau';
    2. -- 默认会以 流处置惩罚的方式 实行,所以可以看到结果连续变化的过程select gender,count(*) as cnt from students_mysql_source group by gender;-- 将运行模式切换成批处置惩罚SET 'execution.runtime-mode' = 'batch';
    3. -- 再试实行,只会看到最终的一个结果,没有变化的过程(这是与流处置惩罚的区别之处)select gender,count(*) as cnt from students_mysql_source group by gender;
    复制代码
  • Sink
           从Kafka接收无界流的学生数据,统计班级人数,将最终的结果写入MySQL
       
    1. -- 创建MySQL的结果表-- 查询库中已有表的建表语句show create table xxx;-- 无主键的MySQL建表语句-- 最终发现写入的结果是有连续变更的过程,并不是直接写入最终的结果drop table if exists `clazz_cnt`;CREATE TABLE if not exists `clazz_cnt`(  `clazz` varchar(255) DEFAULT NULL  ,`cnt` bigint DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 将班级设置为主键-- 最终写入的结果是可以通过主键进行更新,所以可以展示最终的结果,而且可以实时更新drop table if exists `clazz_cnt`;CREATE TABLE if not exists `clazz_cnt`(  `clazz` varchar(255) NOT NULL,  `cnt` bigint(20) DEFAULT NULL,  PRIMARY KEY (`clazz`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 创建MySQL的Sink表drop table if exists clazz_cnt_mysql_sink;CREATE TABLE if not exists clazz_cnt_mysql_sink (  `clazz` STRING,  `cnt`        BIGINT,   -- 假如查询的结果有更新,则必要设置主键   PRIMARY KEY (clazz) NOT ENFORCED) WITH (   'connector' = 'jdbc',   'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false',   'table-name' = 'clazz_cnt',   'username' = 'root',   'password' = '123456');-- 记得将实行模式切换成流处置惩罚,因为Kafka是无界流SET 'execution.runtime-mode' = 'streaming';
    2. -- 实行查询:实时统计班级人数,将结果写入MySQLinsert into clazz_cnt_mysql_sinkselect clazz,count(*) as cnt from students_kafka_source where clazz is not null group by clazz;
    复制代码
2.3 HDFS



  • Source

    • 有界流
                 默认的方式
           
      1. drop table if exists students_hdfs_source;
      2. CREATE TABLE if not exists students_hdfs_source (
      3.   `id` BIGINT,
      4.   `name` STRING,
      5.   `age` INT,
      6.   `gender` STRING,
      7.   `clazz` STRING,
      8.   `file.path` STRING NOT NULL METADATA
      9. ) WITH (
      10.   'connector' = 'filesystem',
      11.   'path' = 'hdfs://master:9000/bigdata30/students.txt',
      12.   'format' = 'csv',
      13.    -- 是否忽略脏数据
      14.   'csv.ignore-parse-errors' = 'true'
      15. );
      16. -- 查询表中是否有数据
      17. select * from students_hdfs_source limit 100;
      复制代码
    • 无界流
                 同DataStream的FileSource一致
            可以通过设置source.monitor-interval参数,来指定一个监控的间隔时间,例如:5s
            FLink就会定时监控目录的一个变更,有新的文件就可以实时进行读取
            最终得到一个无界流
           
      1. -- 创建HDFS目录
      2. hdfs dfs -mkdir /bigdata30/flink
      3. -- 创建Source表
      4. drop table if exists students_hdfs_unbounded_source;
      5. CREATE TABLE if not exists students_hdfs_unbounded_source (
      6.   `id` BIGINT,
      7.   `name` STRING,
      8.   `age` INT,
      9.   `gender` STRING,
      10.   `clazz` STRING,
      11.   `file.path` STRING NOT NULL METADATA
      12. ) WITH (
      13.   'connector' = 'filesystem',
      14.   'path' = 'hdfs://master:9000/bigdata30/flink',
      15.   'source.monitor-interval' = '5s',
      16.   'format' = 'csv',
      17.    -- 是否忽略脏数据
      18.   'csv.ignore-parse-errors' = 'true'
      19. );
      20. -- 执行查询
      21. select * from students_hdfs_unbounded_source;
      22. -- 向目录上传文件
      23. hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt1
      24. hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt2
      复制代码

  • Sink

    • 查询结果没有更新,写入数据
      1. drop table if exists students_hdfs_sink;
      2. CREATE TABLE if not exists students_hdfs_sink (
      3.   `id` BIGINT,
      4.   `name` STRING,
      5.   `age` INT,
      6.   `gender` STRING,
      7.   `clazz` STRING,
      8.   `file_path` STRING
      9. ) WITH (
      10.   'connector' = 'filesystem',
      11.   'path' = 'hdfs://master:9000/bigdata30/sink/',
      12.   'format' = 'csv',
      13.    -- 是否忽略脏数据
      14.   'csv.ignore-parse-errors' = 'true'
      15. );
      16. insert into students_hdfs_sink
      17. select * from students_hdfs_source;
      复制代码
    • 查询结果有更新,写入数据
                 同Kafka雷同,HDFS不支持更新数据,故必要将变更的结果编码成canal-json大概是debezium-json的格式才能进行insert
           
      1. drop table if exists clazz_cnt_hdfs_sink;
      2. CREATE TABLE if not exists clazz_cnt_hdfs_sink (
      3.   `clazz` STRING,
      4.   `cnt` BIGINT
      5. ) WITH (
      6.   'connector' = 'filesystem',
      7.   'path' = 'hdfs://master:9000/bigdata30/clazz_cnt/',
      8.   'format' = 'canal-json'
      9. );
      10. -- 使用有界的数据源来写入待更新的计算结果
      11. insert into clazz_cnt_hdfs_sink
      12. select clazz,count(*) as cnt from students_hdfs_source group by clazz;
      复制代码

2.4 HBase

  1. hbase启动顺序:
  2. zk(三台虚拟机都启动)-->hadoop(主从复制:在master端启动即可)-->hbase(在master端启动即可)
  3. hbase关闭顺序:
  4. hbase-->hadoop-->zk
  5. # 启动
  6. start-hbase.sh
  7. #关闭
  8. stop-hbase.sh
  9. # 进入HBase的客户端
  10. hbase shell
复制代码


  • 预备工作
    1. # 下载依赖
    2. https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar
    3. # 上传依赖并重启yarn-session及sql客户端
    复制代码
  • Source
           同MySQL雷同,得到是一个有界流
       
    1. drop table if exists students_hbase_source;
    2. CREATE TABLE if not exists students_hbase_source (
    3.     rowkey STRING,
    4.     info ROW<name STRING, age STRING,gender STRING,clazz STRING>,
    5.     PRIMARY KEY (rowkey) NOT ENFORCED
    6. ) WITH (
    7. 'connector' = 'hbase-2.2',
    8. 'table-name' = 'students',
    9. 'zookeeper.quorum' = 'master:2181'
    10. );
    11. select rowkey,info.name,info.age,info.gender,info.clazz from students_hbase_source;
    复制代码
  • Sink
           同MySQL雷同
       
    1. -- 在HBase中建表
    2. create 'stu01','info'
    3. -- 构建HBase Sink表
    4. drop table if exists stu_hbase_sink;
    5. CREATE TABLE if not exists stu_hbase_sink (
    6.     id STRING,
    7.     info ROW<name STRING,clazz STRING>,
    8.     PRIMARY KEY (id) NOT ENFORCED
    9. ) WITH (
    10. 'connector' = 'hbase-2.2',
    11. 'table-name' = 'stu01',
    12. 'zookeeper.quorum' = 'master:2181'
    13. );
    14. -- 丢弃null的数据
    15. set 'table.exec.sink.not-null-enforcer'='DROP';
    16. -- 仅追加的结果写入,由于HBase有rk存在,相同的RK会进行覆盖
    17. insert into stu_hbase_sink
    18. select cast(id as STRING) as id,ROW(name,clazz) as info
    19. from students_kafka_source
    20. ;
    21. -- hbase中遍历数据
    22. scan "stu01",LIMIT => 50
    23. -- 在HBase中建表
    24. create 'clazz_cnt_01','info'
    25. -- 构建HBase Sink表
    26. drop table if exists clazz_cnt_hbase_sink;
    27. CREATE TABLE if not exists clazz_cnt_hbase_sink (
    28.     clazz STRING,
    29.     info ROW<cnt BIGINT>,
    30.     PRIMARY KEY (clazz) NOT ENFORCED
    31. ) WITH (
    32. 'connector' = 'hbase-2.2',
    33. 'table-name' = 'clazz_cnt_01',
    34. 'zookeeper.quorum' = 'master:2181'
    35. );
    36. -- 带更新的查询结果可以实时在HBase中通过RK进行更新
    37. insert into clazz_cnt_hbase_sink
    38. select clazz,ROW(count(*)) as info
    39. from students_kafka_source
    40. group by clazz
    41. ;
    42. -- hbase中遍历数据
    43. scan "clazz_cnt_01",LIMIT => 50
    复制代码
2.5 DataGen

   用于按照指定的规则生成数据,一样寻常用于性能测试
  1. drop table if exists datagen;
  2. CREATE TABLE if not exists datagen (
  3.     id BIGINT
  4.     ,random_id BIGINT
  5.     ,name STRING
  6. ) WITH (
  7. 'connector' = 'datagen',
  8. -- optional options --
  9. 'rows-per-second'='20', -- 设置每秒钟生成的数据量
  10.    
  11.     'fields.id.kind' = 'random',
  12.     'fields.id.min'='10000000',
  13.     'fields.id.max'='99999999',
  14.     'fields.random_id.kind' = 'random',
  15.     'fields.random_id.min'='10000000',
  16.     'fields.random_id.max'='99999999',
  17.    
  18.     'fields.name.length'='5'
  19. );
复制代码
2.6 Blackhole

   用于性能测试,可以作为Sink端
  1. drop table if exists blackhole_table;
  2. CREATE TABLE if not exists  blackhole_table
  3. WITH ('connector' = 'blackhole')
  4. LIKE datagen (EXCLUDING ALL);
  5. insert into blackhole_table
  6. select * from datagen group by name;
  7. drop table if exists blackhole_table;
  8. CREATE TABLE if not exists  blackhole_table(
  9.         name String,
  10.     cnt BIGINT
  11. )
  12. WITH ('connector' = 'blackhole')
  13. ;
  14. insert into blackhole_table
  15. select name,count(*) as cnt from datagen group by name;
复制代码
2.7 Print

   将结果数据在TaskManager中输出
  1. drop table if exists print_table;
  2. CREATE TABLE if not exists print_table (
  3. name STRING,
  4. cnt BIGINT
  5. ) WITH (
  6. 'connector' = 'print'
  7. );
  8. insert into print_table
  9. select name,count(*) as cnt from datagen group by name;
复制代码
3、常用的格式

3.1 CSV

   逗号分隔符文件,并非肯定是.csv文件
  在作为Sink时的format,仅支持写入不带更新的结果
  剖析每条数据是通过次序匹配
  常用参数:
  csv.ignore-parse-errors 默认false,忽略剖析错误,不会导致步伐直接停止
  csv.field-delimiter 默认 逗号,指定数据的列分隔符
  3.2 JSON

3.2.1 json

   平凡的json格式,剖析数据是通过列名进行匹配
  同csv雷同,只支持写入不带更新的结果
  1. drop table if exists cars_json_source;
  2. CREATE TABLE if not exists cars_json_source (
  3.     car String
  4.     ,county_code INT
  5.     ,city_code INT
  6.     ,card BIGINT
  7.     ,camera_id String
  8.     ,orientation String
  9.     ,road_id BIGINT
  10.     ,`time` BIGINT
  11.     ,speed Double
  12. ) WITH (
  13.   'connector' = 'kafka',
  14.   'topic' = 'cars_json',
  15.   'properties.bootstrap.servers' = 'master:9092',
  16.   'properties.group.id' = 'grp1',
  17.   'scan.startup.mode' = 'earliest-offset',
  18.   'format' = 'json'
  19. );
复制代码
3.2.2 canal-json

   一种特殊的JSON格式
  支持写入更新的结果
  {“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}
  3.2.3 debezium-json

   同canal-json,只是数据格式有些许差别
  {“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}
  3.3 ORC

   一样寻常不消
  3.4 PARQUET

   一样寻常不消
  4、时间属性

4.1 处置惩罚时间

   基于系统的时间
  1. drop table if exists students_kafka_source;
  2. CREATE TABLE if not exists students_kafka_source (
  3.   `id` BIGINT,
  4.   `name` STRING,
  5.   `age` INT,
  6.   `gender` STRING,
  7.   `clazz` STRING,
  8.    -- 通过系统时间给表增加一列,即:处理时间
  9.    proc_time as PROCTIME()
  10. ) WITH (
  11.   'connector' = 'kafka',
  12.   'topic' = 'students1000',
  13.   'properties.bootstrap.servers' = 'master:9092',
  14.   'properties.group.id' = 'grp1',
  15.   'scan.startup.mode' = 'earliest-offset',
  16.   'format' = 'csv',
  17.   -- 是否忽略脏数据
  18.   'csv.ignore-parse-errors' = 'true'
  19. );
  20. select  clazz
  21.         ,count(*) as cnt
  22.         ,tumble_start(proc_time,INTERVAL '5' SECONDS) as window_start
  23.         ,tumble_end(proc_time,INTERVAL '5' SECONDS) as window_end
  24. from students_kafka_source
  25. group by clazz,tumble(proc_time,INTERVAL '5' SECONDS)
  26. ;
  27. -- 向Topic中生产数据
  28. kafka-console-producer.sh --broker-list master:9092 --topic students1000
复制代码
4.2 事件时间

   基于数据自带的时间
  java,2024-08-03 10:41:50
  java,2024-08-03 10:41:51
  java,2024-08-03 10:41:52
  1. drop table if exists words_kafka_source;
  2. CREATE TABLE if not exists words_kafka_source (
  3.   `word` STRING,
  4.    -- 从数据中过来的一列,作为事件时间
  5.    event_time TIMESTAMP(3),
  6.    -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
  7.    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
  8. ) WITH (
  9.   'connector' = 'kafka',
  10.   'topic' = 'words_event_time',
  11.   'properties.bootstrap.servers' = 'master:9092',
  12.   'properties.group.id' = 'grp1',
  13.   'scan.startup.mode' = 'latest-offset',
  14.   'format' = 'csv',
  15.   -- 是否忽略脏数据
  16.   'csv.ignore-parse-errors' = 'true'
  17. );
  18. -- 创建topic
  19. kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic words_event_time
  20. -- 执行查询,使用滚动的事件时间窗口进行word count,每5s统计一次
  21. select  word
  22.         ,count(*) as cnt
  23.         ,tumble_start(event_time,INTERVAL '5' SECONDS) as window_start
  24.         ,tumble_end(event_time,INTERVAL '5' SECONDS) as window_end
  25. from words_kafka_source
  26. group by word,tumble(event_time,INTERVAL '5' SECONDS)
  27. ;
  28. -- 向Topic中生产数据
  29. kafka-console-producer.sh --broker-list master:9092 --topic words_event_time
复制代码
5、SQL语法

5.1 Hints

   在SQL查询时动态修改表的参数配置
  1. -- words_kafka_source 默认从最后开始消费
  2. select * from words_kafka_source; // 只能查询到最新的数据,不会从头开始消费
  3. -- 假设现在需要从头开始消费
  4. -- 第一种方案,将words_kafka_source删除重建
  5. -- 第二种方案,通过alter table 对表进行修改
  6. -- 第三种方案,通过hints动态调整表的配置
  7. select * from words_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset') */ ;
复制代码
5.2 With

   用于将多次实行的同一查询通过with先定义,后面可以进行多次使用,避免重复的SQL
  应用场景:1、多次使用的SQL查询可以缓存提高性能 2、将多级嵌套解开来,降低主SQL的复杂度
  1. drop table if exists students_mysql_source;
  2. CREATE TABLE if not exists students_mysql_source (
  3.   `id` BIGINT,
  4.   `name` STRING,
  5.   `age` INT,
  6.   `gender` STRING,
  7.   `clazz` STRING,
  8.   PRIMARY KEY (id) NOT ENFORCED
  9. ) WITH (
  10.    'connector' = 'jdbc',
  11.    'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false',
  12.    'table-name' = 'students',
  13.    'username' = 'root',
  14.    'password' = '123456'
  15. );
  16. select id,name from students_mysql_source where clazz = '理科一班'
  17. union all
  18. select id,name from students_mysql_source where clazz = '理科一班'
  19. ;
  20. -- 通过with可以将多次使用的SQL进行定义
  21. with stu_lkyb as (
  22.   select id,name from students_mysql_source where clazz = '理科一班'
  23. )
  24. select * from stu_lkyb
  25. union all
  26. select * from stu_lkyb
  27. union all
  28. select * from stu_lkyb
  29. ;
复制代码
5.3 Where

   可以进行过滤
  1. select id,name,clazz,age from students_mysql_source where clazz = '理科一班' and age > 20;
  2. -- 找到重复数据并进行过滤
  3. select        id,name,age,gender,clazz
  4. from (
  5. select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
  6. ) t1 where t1.cnt = 1;
  7. -- 聚合后的过滤可以使用Having
  8. select id,name,age,gender,clazz,count(*) as cnt from students_mysql_source group by id,name,age,gender,clazz
  9. having count(*) = 1;
复制代码
5.4 Distinct

   用于去重
  必要对每条差别的数据维护一个状态,状态会无限制的增大,最终使命可能会失败
  无界流是正常可以去重的
  有界流必须在分组之后带上聚合操作才能去重,假如直接distinct大概是groupby不聚合,最终使命里不会产生shuffle,即不会分组,也就无法去重
  1. -- 去重
  2. select id,name,age,gender,clazz from students_mysql_source group by id,name,age,gender,clazz;
  3. -- 等价于distinct
  4. select distinct id,name,age,gender,clazz from students_mysql_source;
  5. select distinct id from students_mysql_source;
复制代码
5.5 Windowing TVFs

   目条件供了三类TVFs窗口操作:TUMBLE、HOP、CUMULATE
  会话SESSION窗口只能通过GROUP WINDOW FUNCTION实现
  计数窗口在FLINK SQL中暂未支持
  5.5.1 Tumble

   必要设置一个滚动时间
  每隔一段时间会触发一次窗口的统计
  1. -- 创建Bid订单表
  2. drop table if exists bid_kafka_source;
  3. CREATE TABLE if not exists bid_kafka_source (
  4.   `item` STRING,
  5.   `price` DOUBLE,
  6.   `bidtime` TIMESTAMP(3),
  7.   `proc_time` as PROCTIME(),
  8.    -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
  9.    WATERMARK FOR bidtime AS bidtime
  10. ) WITH (
  11.   'connector' = 'kafka',
  12.   'topic' = 'bid',
  13.   'properties.bootstrap.servers' = 'master:9092',
  14.   'properties.group.id' = 'grp1',
  15.   'scan.startup.mode' = 'earliest-offset',
  16.   'format' = 'csv',
  17.   -- 是否忽略脏数据
  18.   'csv.ignore-parse-errors' = 'true'
  19. );
  20. -- 准备数据
  21. C,4.00,2020-04-15 08:05:00
  22. A,2.00,2020-04-15 08:07:00
  23. D,5.00,2020-04-15 08:09:00
  24. B,3.00,2020-04-15 08:11:00
  25. E,1.00,2020-04-15 08:13:00
  26. F,6.00,2020-04-15 08:17:00
  27. -- 创建Kafka Topic
  28. kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic bid
  29. -- 生产数据
  30. kafka-console-producer.sh --broker-list master:9092 --topic bid
  31. -- 基于事件时间的滚动窗口
  32. SELECT window_start,window_end,sum(price) as sum_price
  33. FROM TABLE(
  34.     -- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  35.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  36.     TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
  37. ) group by window_start,window_end
  38. ;
  39. -- 基于处理时间的滚动窗口
  40. SELECT window_start,window_end,sum(price) as sum_price
  41. FROM TABLE(
  42.     -- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  43.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  44.     TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '10' SECONDS)
  45. ) group by window_start,window_end
  46. ;
复制代码
5.5.2 HOP

   滑动窗口
  必要指定两个时间:滑动的时间、窗口的巨细
  1. -- 基于事件时间的滑动窗口
  2. SELECT window_start,window_end,sum(price) as sum_price
  3. FROM TABLE(
  4.     -- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  5.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  6.     HOP(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
  7. ) group by window_start,window_end
  8. ;
  9. -- 基于处理时间的滑动窗口
  10. SELECT window_start,window_end,sum(price) as sum_price
  11. FROM TABLE(
  12.     -- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  13.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  14.     HOP(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '5' SECONDS, INTERVAL '10' SECONDS)
  15. ) group by window_start,window_end
  16. ;
复制代码
5.5.3 CUMULATE

   累积窗口:起首会按照步长初始化一个窗口巨细,然后按照步长的间隔时间触发窗口的统计,接下来窗口巨细会不断增大,直到到达设置的最大size,然后重复这个过程
  必要指定两个时间间隔:步长、最大的size
  例如:步长为2分钟,size为10分钟
  每隔2分钟会触发一次统计,第一次统计的最近两分钟的数据,第二次统计是最近4分钟的…第5次统计是最近10分钟的数据,第6次统计是最近2分钟的数据…
  1. -- 基于事件时间的累计窗口
  2. SELECT window_start,window_end,sum(price) as sum_price
  3. FROM TABLE(
  4.     -- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  5.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  6.     CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
  7. ) group by window_start,window_end
  8. ;
  9. -- 基于处理时间的累计窗口
  10. SELECT window_start,window_end,sum(price) as sum_price
  11. FROM TABLE(
  12.     -- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time
  13.     -- 如果需要基于窗口的统计则按照窗口列分组即可
  14.     CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(proc_time), INTERVAL '2' SECONDS, INTERVAL '10' SECONDS)
  15. ) group by window_start,window_end
  16. ;
复制代码
5.5.4 SESSION

   会话窗口,现在版本不支持TVFs写法
  必要使用老版本的写法:GROUP WINDOW FUNCTION
  间隔一段时间没有数据就会触发窗口的统计
  1. -- 基于事件时间的会话窗口
  2. select session_start(bidtime, INTERVAL '2' MINUTES) as session_start
  3.        ,session_end(bidtime, INTERVAL '2' MINUTES) as session_end
  4.        ,sum(price) as sum_price
  5. from bid_kafka_source
  6. group by session(bidtime, INTERVAL '2' MINUTES)
  7. ;
  8. -- 基于处理时间的会话窗口
  9. select session_start(proc_time, INTERVAL '2' SECONDS) as session_start
  10.        ,session_end(proc_time, INTERVAL '2' SECONDS) as session_end
  11.        ,sum(price) as sum_price
  12. from bid_kafka_source
  13. group by session(proc_time, INTERVAL '2' SECONDS)
  14. ;
复制代码
6、Over聚合

6.1 聚合类

   sum、max、min、count、avg
  sum 比力特殊:假如指定了order By,则表示累加求和,不指定则表示整个窗口求和
  max、min、count、avg 不必要指定order By
  1. -- 准备数据
  2. item,supply_id,price,bidtime
  3. A,001,4.00,2020-04-15 08:05:00
  4. A,002,2.00,2020-04-15 08:06:00
  5. A,001,5.00,2020-04-15 08:07:00
  6. B,002,3.00,2020-04-15 08:08:00
  7. A,001,1.00,2020-04-15 08:09:00
  8. A,002,6.00,2020-04-15 08:10:00
  9. B,001,6.00,2020-04-15 08:11:00
  10. A,001,6.00,2020-04-15 08:12:00
  11. B,002,6.00,2020-04-15 08:13:00
  12. B,002,6.00,2020-04-15 08:14:00
  13. A,001,66.00,2020-04-15 08:15:00
  14. B,001,6.00,2020-04-15 08:16:00
  15. -- 创建order订单表
  16. drop table if exists order_kafka_source;
  17. CREATE TABLE if not exists order_kafka_source (
  18.   `item` STRING,
  19.   `supply_id` STRING,
  20.   `price` DOUBLE,
  21.   `bidtime` TIMESTAMP(3),
  22.    -- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
  23.    WATERMARK FOR bidtime AS bidtime
  24. ) WITH (
  25.   'connector' = 'kafka',
  26.   'topic' = 'order',
  27.   'properties.bootstrap.servers' = 'master:9092',
  28.   'properties.group.id' = 'grp1',
  29.   'scan.startup.mode' = 'latest-offset',
  30.   'format' = 'csv',
  31.   -- 是否忽略脏数据
  32.   'csv.ignore-parse-errors' = 'true'
  33. );
  34. -- 创建Kafka Topic
  35. kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic order
  36. -- 生产数据
  37. kafka-console-producer.sh --broker-list master:9092 --topic order
  38. -- 聚合类函数在实时的Over窗口上只会产生追加的数据,没有更新
  39. -- 最终需要维护的状态大小同partition by指定的字段有关
  40. -- 1、统计每种商品的累计成交金额
  41. select item
  42.        -- 必须指定order by ,而且必须使用时间列升序,
  43.        -- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算
  44.        -- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合
  45.        ,sum(price) over (partition by item order by bidtime) as sum_price
  46. from order_kafka_source
  47. ;
  48. -- 2、统计每种商品的最大成交金额
  49. select item
  50.        -- 必须指定order by ,而且必须使用时间列升序,
  51.        -- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计
  52.        -- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值
  53.        ,max(price) over (partition by item order by bidtime) as max_price
  54. from order_kafka_source
  55. ;
  56. -- 3、统计每种商品的最小、平均成交金额/交易次数 同上
  57. -- 4、统计最近10分钟内每种商品的累计成交金额
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张国伟

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表