Flink SQL实践

打印 上一主题 下一主题

主题 658|帖子 658|积分 1974

环境准备

方式1:基于Standalone Flink集群的SQL Client
启动Flink集群
  1. [hadoop@node2 ~]$ start-cluster.sh
  2. [hadoop@node2 ~]$ sql-client.sh
  3. ...
  4. 省略若干日志输出
  5. ...
  6. Flink SQL>  
复制代码

方式2:基于Yarn Session Flink集群的SQL Client
启动hadoop集群
  1. [hadoop@node2 ~]$ myhadoop.sh start
复制代码
使用Yarn Session启动Flink集群
  1. [hadoop@node2 ~]$ yarn-session.sh -d
复制代码
启动一个基于yarn-session的sql-client
  1. [hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
  2. ...
  3. 省略若干日志输出
  4. ...
  5. Flink SQL>
复制代码
看到“Flink SQL>”提示符,说明成功开启了Flink的SQL客户端,此时就可以举行SQL相关操作了。

注意:以上选择此中一种方式举行后续操作。


数据库操作

  1. Flink SQL> show databases;
  2. +------------------+
  3. |    database name |
  4. +------------------+
  5. | default_database |
  6. +------------------+
  7. 1 row in set
  8. Flink SQL> create database mydatabase;
  9. [INFO] Execute statement succeed.
  10. Flink SQL> show databases;
  11. +------------------+
  12. |    database name |
  13. +------------------+
  14. | default_database |
  15. |       mydatabase |
  16. +------------------+
  17. 2 rows in set
  18. Flink SQL> show current database;
  19. +-----------------------+
  20. | current database name |
  21. +-----------------------+
  22. |      default_database |
  23. +-----------------------+
  24. 1 row in set
  25. 切换当前数据库
  26. Flink SQL> use mydatabase;
  27. [INFO] Execute statement succeed.
  28. Flink SQL> show current database;
  29. +-----------------------+
  30. | current database name |
  31. +-----------------------+
  32. |            mydatabase |
  33. +-----------------------+
  34. 1 row in set
  35. Flink SQL> quit;
  36. ...
  37. ...
  38. ...
  39. [hadoop@node2 ~]$
复制代码

表DDL操作

创建表

CREATE TABLE方式

创建test表
  1. CREATE TABLE test(
  2.     id INT,
  3.     ts BIGINT,
  4.     vc INT
  5. ) WITH (
  6. 'connector' = 'print'
  7. );
复制代码
LIKE方式

基于test表创建test1,并添加value字段
  1. CREATE TABLE test1 (
  2.     `value` STRING
  3. )
  4. LIKE test;
复制代码
查看表 
  1. show tables;
复制代码
 查看test表结构
  1. desc test;
复制代码
查看test1表结构
  1. desc test1;
复制代码

操作过程
  1. Flink SQL> CREATE TABLE test(>     id INT, >     ts BIGINT, >     vc INT> ) WITH (> 'connector' = 'print'> );[INFO] Execute statement succeed.​Flink SQL> CREATE TABLE test1 (>     `value` STRING> )> LIKE test;[INFO] Execute statement succeed.​Flink SQL> show tables;
  2. +------------+| table name |+------------+|       test ||      test1 |+------------+2 rows in set​Flink SQL> desc test;+------+--------+------+-----+--------+-----------+| name |   type | null | key | extras | watermark |+------+--------+------+-----+--------+-----------+|   id |    INT | TRUE |     |        |           ||   ts | BIGINT | TRUE |     |        |           ||   vc |    INT | TRUE |     |        |           |+------+--------+------+-----+--------+-----------+3 rows in set​​Flink SQL> desc test1;+-------+--------+------+-----+--------+-----------+|  name |   type | null | key | extras | watermark |+-------+--------+------+-----+--------+-----------+|    id |    INT | TRUE |     |        |           ||    ts | BIGINT | TRUE |     |        |           ||    vc |    INT | TRUE |     |        |           || value | STRING | TRUE |     |        |           |+-------+--------+------+-----+--------+-----------+4 rows in set​
复制代码
CTAS方式

CTAS:CREATE TABLE AS SELECT
  1. create table test2 as select id, ts from test;
复制代码
但这种方式不支持是print的连接器。因为print只能当作sink,不能当作source。
  1. Flink SQL> create table test2 as select id, ts from test;
  2. [ERROR] Could not execute SQL statement. Reason:
  3. org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
复制代码

修改表

修改表名

  1. alter table test1 rename to test11;
复制代码
操作过程
  1. Flink SQL> alter table test1 rename to test11;[INFO] Execute statement succeed.​Flink SQL> show tables;
  2. +------------+| table name |+------------+|       test ||     test11 |+------------+2 rows in set​
复制代码

添加表字段

创建test2表
  1. CREATE TABLE test2(
  2.     id INT,
  3.     ts BIGINT,
  4.     vc INT
  5. ) WITH (
  6. 'connector' = 'print'
  7. );
复制代码
查看test2表结构 
  1. desc test2;
复制代码
添加表字段,并放在第一个字段
  1. ALTER TABLE test2 ADD `status` INT COMMENT 'status descriptor' FIRST;
复制代码
 查看test2表结构
  1. desc test2;
复制代码
操作过程
  1. Flink SQL> CREATE TABLE test2(
  2. >     id INT,
  3. >     ts BIGINT,
  4. >     vc INT
  5. > ) WITH (
  6. > 'connector' = 'print'
  7. > );
  8. [INFO] Execute statement succeed.
  9. Flink SQL> desc test2;
  10. +------+--------+------+-----+--------+-----------+
  11. | name |   type | null | key | extras | watermark |
  12. +------+--------+------+-----+--------+-----------+
  13. |   id |    INT | TRUE |     |        |           |
  14. |   ts | BIGINT | TRUE |     |        |           |
  15. |   vc |    INT | TRUE |     |        |           |
  16. +------+--------+------+-----+--------+-----------+
  17. 3 rows in set
  18. Flink SQL> ALTER TABLE test2 ADD `status` INT COMMENT 'status descriptor' FIRST;
  19. [INFO] Execute statement succeed.
  20. Flink SQL> desc test2;
  21. +--------+--------+------+-----+--------+-----------+-------------------+
  22. |   name |   type | null | key | extras | watermark |           comment |
  23. +--------+--------+------+-----+--------+-----------+-------------------+
  24. | status |    INT | TRUE |     |        |           | status descriptor |
  25. |     id |    INT | TRUE |     |        |           |                   |
  26. |     ts | BIGINT | TRUE |     |        |           |                   |
  27. |     vc |    INT | TRUE |     |        |           |                   |
  28. +--------+--------+------+-----+--------+-----------+-------------------+
  29. 4 rows in set
复制代码
修改表字段

修改表字段
  1. ALTER TABLE test2 MODIFY (vc DOUBLE NOT NULL, status STRING COMMENT 'status desc');
复制代码
查看表结构
  1. desc test2;
复制代码
操作过程
  1. Flink SQL> ALTER TABLE test2 MODIFY (vc DOUBLE NOT NULL, status STRING COMMENT 'status desc');
  2. [INFO] Execute statement succeed.
  3. Flink SQL> desc test2;
  4. +--------+--------+-------+-----+--------+-----------+-------------+
  5. |   name |   type |  null | key | extras | watermark |     comment |
  6. +--------+--------+-------+-----+--------+-----------+-------------+
  7. | status | STRING |  TRUE |     |        |           | status desc |
  8. |     id |    INT |  TRUE |     |        |           |             |
  9. |     ts | BIGINT |  TRUE |     |        |           |             |
  10. |     vc | DOUBLE | FALSE |     |        |           |             |
  11. +--------+--------+-------+-----+--------+-----------+-------------+
  12. 4 rows in set
复制代码
删除表字段

删除表字段
  1. ALTER TABLE test2 DROP (ts, status);
复制代码
查看表结构
  1. desc test2;
复制代码
操作过程
  1. Flink SQL> ALTER TABLE test2 DROP (ts, status);
  2. [INFO] Execute statement succeed.
  3. Flink SQL> desc test2;
  4. +------+--------+-------+-----+--------+-----------+
  5. | name |   type |  null | key | extras | watermark |
  6. +------+--------+-------+-----+--------+-----------+
  7. |   id |    INT |  TRUE |     |        |           |
  8. |   vc | DOUBLE | FALSE |     |        |           |
  9. +------+--------+-------+-----+--------+-----------+
  10. 2 rows in set
复制代码

删除表

语法
  1. DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
复制代码
案例
  1. drop table if exists test2;
复制代码
操作过程
  1. Flink SQL> drop table if exists test2;
  2. [INFO] Execute statement succeed.
复制代码


表DML查询操作

Select

select

SELECT测试及结果显示模式设置

  1. SELECT 'Hello World', 'It''s me';
复制代码
注意:SELECT后面的字符串必须用单引号括起来,假如字符串里面包罗有单引号,则再多用一个单引号(如:'It's me'写成'It''s me')。
结果如下:

按q键返回命令行。

设置结果显示模式
可以看到,结果显示模式默认table,还可以设置为tableau、changelog。



  • 结果显示模式设置为tableau
  1. SET sql-client.execution.result-mode=tableau;
复制代码
操作过程
  1. Flink SQL> SET sql-client.execution.result-mode=tableau;
  2. [INFO] Execute statement succeed.
  3. Flink SQL> SELECT 'Hello World', 'It''s me';
  4. ...
  5. 省略若干日志输出
  6. ...
  7. +----+--------------------------------+--------------------------------+
  8. | op |                         EXPR$0 |                         EXPR$1 |
  9. +----+--------------------------------+--------------------------------+
  10. | +I |                    Hello World |                        It's me |
  11. +----+--------------------------------+--------------------------------+
  12. Received a total of 1 row
复制代码
 效果如下




  • 显示模式设置为changelog 
  1. SET sql-client.execution.result-mode=changelog;
复制代码
操作过程
  1. Flink SQL> SET sql-client.execution.result-mode=changelog;
  2. [INFO] Execute statement succeed.
  3. Flink SQL> SELECT 'Hello World', 'It''s me';
  4. ​...
  5. 省略若干日志输出
  6. ...
复制代码
显示结果如下:

根据个人喜好,设置此中一种结果显示模式。

Source表

通过数据天生器创建source表
  1. CREATE TABLE source (
  2.     id INT,
  3.     ts BIGINT,
  4.     vc INT
  5. ) WITH (
  6.     'connector' = 'datagen',
  7.     'rows-per-second'='1',
  8.     'fields.id.kind'='random',
  9.     'fields.id.min'='1',
  10.     'fields.id.max'='10',
  11.     'fields.ts.kind'='sequence',
  12.     'fields.ts.start'='1',
  13.     'fields.ts.end'='1000000',
  14.     'fields.vc.kind'='random',
  15.     'fields.vc.min'='1',
  16.     'fields.vc.max'='100'
  17. );
复制代码
查询source表数据
  1. select * from source;
复制代码
查询结果

按住ctrl + c 竣事查询。


  1. SELECT id, vc + 10 FROM source;
复制代码

 执行效果如下




Sink表

创建sink表
  1. CREATE TABLE sink (
  2.     id INT,
  3.     ts BIGINT,
  4.     vc INT
  5. ) WITH (
  6. 'connector' = 'print'
  7. );
复制代码
查询source表数据插入sink表
  1. INSERT INTO sink select  * from source;
复制代码
直接查询sink表数据,报错如下:
  1. Flink SQL> select * from sink;
  2. [ERROR] Could not execute SQL statement. Reason:
  3. org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
复制代码
正确查询方式,通过8088进入Application Master进入Web UI,看到一个Running Job


通过这个Runnig Job的Task Manager查看结果


取消作业


 select where

  1. SELECT id FROM source WHERE id >5;
复制代码


With子句

WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以以为它们定义了仅为一个查询而存在的暂时视图
  1. WITH source_with_total AS (
  2.     SELECT id, vc+10 AS total
  3.     FROM source
  4. )
  5. SELECT id, SUM(total)
  6. FROM source_with_total
  7. GROUP BY id;
复制代码
 执行效果如下


分组聚合

  1. SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;
复制代码

-U是撤回流

创建source1表
  1. CREATE TABLE source1 (
  2. dim STRING,
  3. user_id BIGINT,
  4. price BIGINT,
  5. row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
  6. WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
  7. ) WITH (
  8. 'connector' = 'datagen',
  9. 'rows-per-second' = '10',
  10. 'fields.dim.length' = '1',
  11. 'fields.user_id.min' = '1',
  12. 'fields.user_id.max' = '100000',
  13. 'fields.price.min' = '1',
  14. 'fields.price.max' = '100000'
  15. );
复制代码
创建sink1表
  1. CREATE TABLE sink1 (
  2. dim STRING,
  3. pv BIGINT,
  4. sum_price BIGINT,
  5. max_price BIGINT,
  6. min_price BIGINT,
  7. uv BIGINT,
  8. window_start bigint
  9. ) WITH (
  10. 'connector' = 'print'
  11. );
复制代码
查询对source1表举行分组聚归并插入到sink1表中
  1. insert into sink1
  2. select dim,
  3. count(*) as pv,
  4. sum(price) as sum_price,
  5. max(price) as max_price,
  6. min(price) as min_price,
  7. -- 计算 uv 数
  8. count(distinct user_id) as uv,
  9. cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
  10. from source1
  11. group by
  12. dim,
  13. -- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min,
  14. cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);
复制代码
查看结果


在Web UI中取消作业。

多维分析
Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:
  1. SELECT
  2.   supplier_id
  3. , rating
  4. , product_id
  5. , COUNT(*)
  6. FROM (
  7. VALUES
  8.   ('supplier1', 'product1', 4),
  9.   ('supplier1', 'product2', 3),
  10.   ('supplier2', 'product3', 3),
  11.   ('supplier2', 'product4', 4)
  12. )
  13. -- 供应商id、产品id、评级
  14. AS Products(supplier_id, product_id, rating)  
  15. GROUP BY GROUPING SETS(
  16.   (supplier_id, product_id, rating),
  17.   (supplier_id, product_id),
  18.   (supplier_id, rating),
  19.   (supplier_id),
  20.   (product_id, rating),
  21.   (product_id),
  22.   (rating),
  23.   ()
  24. );
复制代码
运行结果 


分组窗口聚合

准备数据
  1. CREATE TABLE ws (
  2.   id INT,
  3.   vc INT,
  4.   pt AS PROCTIME(), --处理时间
  5.   et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  6.   WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
  7. ) WITH (
  8.   'connector' = 'datagen',
  9.   'rows-per-second' = '10',
  10.   'fields.id.min' = '1',
  11.   'fields.id.max' = '3',
  12.   'fields.vc.min' = '1',
  13.   'fields.vc.max' = '100'
  14. );
复制代码


滚动窗口

滚动窗口(时间属性字段,窗口长度)
  1. select  
  2. id,
  3. TUMBLE_START(et, INTERVAL '5' SECOND)  wstart,
  4. TUMBLE_END(et, INTERVAL '5' SECOND)  wend,
  5. sum(vc) sumVc
  6. from ws
  7. group by id, TUMBLE(et, INTERVAL '5' SECOND);
复制代码

观察结果,可以看到按id分组举行统计,窗口长度(wend-wstart)为5秒,按Q退出查询。

滑动窗口

滑动窗口(时间属性字段,滑动步长,窗口长度)
  1. select  
  2. id,
  3. HOP_START(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND)   wstart,
  4. HOP_END(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND)  wend,
  5. sum(vc) sumVc
  6. from ws
  7. group by id, HOP(pt, INTERVAL '3' SECOND,INTERVAL '5' SECOND);
复制代码

从结果中看到,窗口长度是5秒,同一id与上一个窗口滑动的步长为3秒。

会话窗口

会话窗口(时间属性字段,会话间隔)
  1. select  
  2. id,
  3. SESSION_START(et, INTERVAL '5' SECOND)  wstart,
  4. SESSION_END(et, INTERVAL '5' SECOND)  wend,
  5. sum(vc) sumVc
  6. from ws
  7. group by id, SESSION(et, INTERVAL '5' SECOND);
复制代码

因为数据源源不停天生,以是不满意5s没有数据的会话间隔。

注意:分组窗口根本被更增强大的TVF窗口替代。

窗口表值函数(TVF)聚合

对比分组窗口(GroupWindow),TVF窗口更有用和强大。包罗:


  • 提供更多的性能优化本领
  • 支持GroupingSets语法
  • 可以在window聚合中使用TopN
  • 提供累积窗口
对于窗口表值函数,窗口本身返回的是就是一个表,以是窗口会出如今FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end
  1. FROM TABLE(
  2. 窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL时间…)
  3. )
  4. GROUP BY [window_start,][window_end,] --可选
复制代码

滚动窗口

  1. SELECT
  2. window_start,
  3. window_end,
  4. id , SUM(vc)
  5. sumVC
  6. FROM TABLE(
  7.   TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
  8. GROUP BY window_start, window_end, id;
复制代码

从结果来看,第一个id为2的窗口时间范围是[35,40),第二个id为2的窗口时间范围是[40,45),正是长度为5秒的滚动窗口。

滑动窗口

要求: 窗口长度=滑动步长的整数倍(底层会优化成多个小滚动窗口)
  1. SELECT window_start, window_end, id , SUM(vc) sumVC
  2. FROM TABLE(
  3.   HOP(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS , INTERVAL '10' SECONDS))
  4. GROUP BY window_start, window_end, id;
复制代码

观察相同id的窗口数据,例如:id为2,时间范围[55,05),[00,10),...  
数据符合窗口长度为10秒、滑动步长为5秒的滑动窗口。

累积窗口


累积窗口会在肯定的统计周期内举行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。
注意: 窗口最大长度 = 累积步长的整数倍
  1. SELECT
  2. window_start,
  3. window_end,
  4. id ,
  5. SUM(vc) sumVC
  6. FROM TABLE(
  7.   CUMULATE(TABLE ws, DESCRIPTOR(et), INTERVAL '2' SECONDS , INTERVAL '6' SECONDS))
  8. GROUP BY window_start, window_end, id;
复制代码

观察结果,id为1的窗口时间数据:[36,38),[36,40),[36,42),[42,44),...  
符合累计窗口的特点。

多维分析

  1. SELECT
  2. window_start,
  3. window_end,
  4. id ,
  5. SUM(vc) sumVC
  6. FROM TABLE(
  7.   TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
  8. GROUP BY window_start, window_end,
  9. rollup( (id) )
  10. --  cube( (id) )
  11. --  grouping sets( (id),()  )
  12. ;
复制代码


rollup在多维分析中是“上卷”的意思,即将数据按某种指定的粒度举行进一步聚合,获得更粗粒度的聚合数据。
从以上结果中,截取[00,05)的数据

可以看到基于id汇总,id=1  聚合值为860,id=2  聚合值为907,id=3  聚合值为727,上卷为更粗粒度(不区分id了,id在这里为NULL)的聚合数据得到2494(860+907+727=2494)。

Over 聚合

OVER聚合为一系列有序行的每个输入行计算一个聚合值。与GROUP BY聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行天生一个聚合值。 可以在变乱时间或处置惩罚时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
语法
  1. SELECT
  2.   agg_func(agg_col) OVER (
  3.     [PARTITION BY col1[, col2, ...]]
  4.     ORDER BY time_col
  5.     range_definition),
  6.   ...
  7. FROM ...
复制代码
ORDER BY:必须是时间戳列,只能升序
range_definition:标识聚合窗口的聚合数据范围,有两种指定命据范围的方式,1.按照行数聚合,2.按照时间区间聚合
案例
按照时间区间聚合 统计每个传感器前10秒到如今收到的水位数据(vc)条数。
  1. SELECT
  2.     id,
  3.     et,
  4.     vc,
  5.     count(vc) OVER (
  6.         PARTITION BY id
  7.         ORDER BY et
  8.         RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
  9.   ) AS cnt
  10. FROM ws;
复制代码


也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口,便于重复使用:
  1. SELECT
  2.     id,
  3.     et,
  4.     vc,
  5. count(vc) OVER w AS cnt,
  6. sum(vc) OVER w AS sumVC
  7. FROM ws
  8. WINDOW w AS (
  9.     PARTITION BY id
  10.     ORDER BY et
  11.     RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
  12. );
复制代码
 


按照行数聚合 统计每个传感器前5条到如今数据的平均水位
  1. SELECT
  2.     id,
  3.     et,
  4.     vc,
  5.     avg(vc) OVER (
  6.             PARTITION BY id
  7.             ORDER BY et
  8.             ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  9. ) AS avgVC
  10. FROM ws;
复制代码

也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:
  1. SELECT
  2.     id,
  3.     et,
  4.     vc,
  5. avg(vc) OVER w AS avgVC,
  6. count(vc) OVER w AS cnt
  7. FROM ws
  8. WINDOW w AS (
  9.     PARTITION BY id
  10.     ORDER BY et
  11.     ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  12. );
复制代码
 


特殊语法TOP-N
ROW_NUMBER() :对数据举行排序标记,标记该行数据在排序后的编号
WHERE rownum <= N:TopN 的查询
  1. select
  2.     id,
  3.     et,
  4.     vc,
  5.     rownum
  6. from
  7. (
  8.     select
  9.         id,
  10.         et,
  11.         vc,
  12.         row_number() over(
  13.             partition by id
  14.             order by vc desc
  15.         ) as rownum
  16.     from ws
  17. )
  18. where rownum<=3;
复制代码
 


特殊语法Deduplication去重
去重,也即上文介绍到的TopN 中 row_number = 1 的场景,但排序列必须是时间属性的列。
对每个传感器的水位值去重
  1. select
  2.     id,
  3.     et,
  4.     vc,
  5.     rownum
  6. from
  7. (
  8.     select
  9.         id,
  10.         et,
  11.         vc,
  12.         row_number() over(
  13.             partition by id,vc
  14.             order by et
  15.         ) as rownum
  16.     from ws
  17. )
  18. where rownum=1;
复制代码



联结(Join)查询

常规联结查询

再准备一张表用于join
  1. CREATE TABLE ws1 (
  2.   id INT,
  3.   vc INT,
  4.   pt AS PROCTIME(), --处理时间
  5.   et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  6.   WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark
  7. ) WITH (
  8.   'connector' = 'datagen',
  9.   'rows-per-second' = '1',
  10.   'fields.id.min' = '3',
  11.   'fields.id.max' = '5',
  12.   'fields.vc.min' = '1',
  13.   'fields.vc.max' = '100'
  14. );
复制代码
等值内联结(INNER Equi-JOIN) 内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。现在仅支持等值联结条件。
  1. SELECT *
  2. FROM ws
  3. INNER JOIN ws1
  4. ON ws.id = ws1.id;
复制代码

等值外联结(OUTER Equi-JOIN)
与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;别的,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。
  1. SELECT *
  2. FROM ws
  3. LEFT JOIN ws1
  4. ON ws.id = ws1.id;
复制代码
 


 间隔联结查询

  1. SELECT *
  2. FROM ws,ws1
  3. WHERE ws.id = ws1. id
  4. AND ws.et BETWEEN ws1.et - INTERVAL '2' SECOND AND ws1.et + INTERVAL '2' SECOND;
复制代码
查看Web UI Running Job


控制台结果


Order by 和 Limit

  1. SELECT *
  2. FROM ws
  3. ORDER BY et, id desc;
复制代码


  1. SELECT *
  2. FROM ws
  3. LIMIT 3;
复制代码


SQL Hints

在执行查询时,可以在表名后面添加SQL Hints来暂时修改表属性,对当前job生效。
  1. select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;
复制代码

集合操作

并集

1)UNION 和 UNION ALL
UNION:将集合归并而且去重
UNION ALL:将集合归并,不做去重
  1. (SELECT id FROM ws) UNION (SELECT id FROM ws1);
复制代码

  1. (SELECT id FROM ws) UNION ALL (SELECT id FROM ws1);
复制代码
 

交集

Intersect 和 Intersect All
Intersect:交集而且去重
Intersect ALL:交集不做去重
  1. (SELECT id FROM ws) INTERSECT (SELECT id FROM ws1);
复制代码
 


  1. (SELECT id FROM ws) INTERSECT ALL (SELECT id FROM ws1);
复制代码

差集

Except 和 Except All
Except:差集而且去重
Except ALL:差集不做去重
  1. (SELECT id FROM ws) EXCEPT (SELECT id FROM ws1);
复制代码


  1. (SELECT id FROM ws) EXCEPT ALL (SELECT id FROM ws1);
复制代码


In 子查询
In 子查询的结果集只能有一列
  1. SELECT id, vc
  2. FROM ws
  3. WHERE id IN (
  4. SELECT id FROM ws1
  5. );
复制代码
 



系统函数

系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,险些支持所有的标准SQL中的操作,这为我们使用SQL编写流处置惩罚程序提供了极大的方便。
查看Flink有哪些内置函数。
  1. show functions;
复制代码
Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。
1)标量函数(Scalar Functions)
标量函数指的就是只对输入数据做转换操作、返回一个值的函数。 标量函数是最常见、也最简朴的一类系统函数,数目非常庞大,许多在标准SQL中也有定义。以是我们这里只对一些常见类型列举部分函数,做一个简朴概述,具体应用可以查看官网的完备函数列表。
比较函数(Comparison Functions) 比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:
(1)value1 = value2 判断两个值相等; (2)value1 <> value2 判断两个值不相等 (3)value IS NOT NULL 判断value不为空
逻辑函数(Logical Functions)
逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)举行真值判断;返回的还是一个布尔类型的值。例如: (1)boolean1 OR boolean2 布尔值boolean1与布尔值boolean2取逻辑或 (2)boolean IS FALSE 判断布尔值boolean是否为false (3)NOT boolean 布尔值boolean取逻辑非
算术函数(Arithmetic Functions)
举行算术计算的函数,包罗用算术符号连接的运算,和复杂的数学运算。例如:
(1)numeric1 + numeric2 两数相加 (2)POWER(numeric1, numeric2) 幂运算,取数numeric1的numeric2次方 (3)RAND() 返回(0.0, 1.0)区间内的一个double类型的伪随机数
字符串函数(String Functions)
举行字符串处置惩罚的函数。例如: (1)string1 || string2 两个字符串的连接 (2)UPPER(string) 将字符串string转为全部大写 (3)CHAR_LENGTH(string) 计算字符串string的长度

时间函数(Temporal Functions)
举行与时间相关操作的函数。例如: (1)DATE string 按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date (2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp (3)CURRENT_TIME 返回当地时区的当前时间,类型为SQL time(与LOCALTIME等价) (4)INTERVAL string range 返回一个时间间隔。

2)聚合函数(Aggregate Functions)
聚合函数是以表中多个行作为输入,提取字段举行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,岂论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。 标准SQL中常见的聚合函数Flink SQL都是支持的,现在也在不停扩展,为流处置惩罚应用提供更强大的功能。例如:
(1)COUNT(*) 返回所有行的数目,统计个数。 (2)SUM([ ALL | DISTINCT ] expression) 对某个字段举行求和操作。默认环境下省略了关键字ALL,表示对所有行求和;假如指定DISTINCT,则会对数据举行去重,每个值只叠加一次。 (3)RANK() 返回当前值在一组值中的排名。 (4)ROW_NUMBER() 对一组值排序后,返回当前值的行号。 此中,RANK()和ROW_NUMBER()一样平常用在OVER窗口中。
具体可以参考:
Flink官网系统函数





Module操作

Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写本身的 Module。
现在 Flink 包罗了以下三种 Module:


  • CoreModule:CoreModule 是 Flink 内置的 Module,其包罗了现在 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用此中的 UDF
  • HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户举使用用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  • 用户自定义 Module:用户可以实现 Module 接口实现本身的 UDF 扩展 Module 使用 LOAD 子句去加载 Flink SQL 体系内置的大概用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的大概用户自定义的 Module。
1)语法
  1. -- 加载
  2. LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
  3. -- 卸载
  4. UNLOAD MODULE module_name
  5. -- 查看
  6. SHOW MODULES;
  7. SHOW FULL MODULES;
复制代码

在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,而且根据加载 Module 的次序去按次序查找和解析 UDF,先查到的先解析使用。
此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的次序举行解析,结果就是会使用次序为第一个的 Module 的 UDF,可以使用下面语法更改次序:
  1. USE MODULE hive,core;
复制代码
USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整次序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。

操作
到mvn中央仓库,下载flink-sql连接hive的jar包,下载所在

选择flink对应版本的下载,例如:1.17.1
(1)上传jar包到flink的lib中
上传hive connector
  1. [hadoop@node2 ~]$ cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar $FLINK_HOME/lib
复制代码
注意:拷贝hadoop的包,解决依赖辩论题目
  1. [hadoop@node2 ~]$ cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar $FLINK_HOME/lib
复制代码

(2)重启flink集群和sql-client
关闭sql-client
  1. Flink SQL> quit;
复制代码
关闭flink集群(这里用的yarn session)


启动yarn session
  1. [hadoop@node2 ~]$ yarn-session.sh -d
复制代码

启动sql-client
  1. [hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
复制代码

(3)加载hive module
  1. Flink SQL> load module hive with ('hive-version'='3.1.3');
  2. [INFO] Execute statement succeed.
  3. Flink SQL> show modules;
  4. +-------------+
  5. | module name |
  6. +-------------+
  7. |        core |
  8. |        hive |
  9. +-------------+
  10. 2 rows in set
  11. Flink SQL> show functions;
  12. 发现查到的函数数量变多了,说明加载到了hive的函数
复制代码
测试使用hive的内置函数
  1. select split('a:b', ':');
复制代码


常用 Connector 读写

kafka
file
jdbc
代码中使用FlinkSQL

我们想要在代码中使用Table API,必须引入相关的依赖。
  1. <dependency>
  2.    <groupId>org.apache.flink</groupId>
  3.    <artifactId>flink-table-api-java-bridge</artifactId>
  4.    <version>${flink.version}</version>
  5. </dependency>
复制代码
这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。
假如我们盼望在当地的集成开发环境(IDE)里运行Table API和SQL,还须要引入以下依赖:
  1. <dependency>
  2.    <groupId>org.apache.flink</groupId>
  3.    <artifactId>flink-table-planner-loader</artifactId>
  4.    <version>${flink.version}</version>
  5. </dependency>
  6. <dependency>
  7.    <groupId>org.apache.flink</groupId>
  8.    <artifactId>flink-table-runtime</artifactId>
  9.    <version>${flink.version}</version>
  10. </dependency>
  11. <dependency>
  12.    <groupId>org.apache.flink</groupId>
  13.    <artifactId>flink-connector-files</artifactId>
  14.    <version>${flink.version}</version>
  15. </dependency>
复制代码
案例1
新建一个名为sql的包(package)来存放Flink SQL相关Java代码,代码所在的包,例如:org.example.sql
  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.api.Table;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. import static org.apache.flink.table.api.Expressions.$;
  5. public class SqlDemo {
  6.     public static void main(String[] args) {
  7.         // 创建流执行环境
  8.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9.         // 创建表环境
  10.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  11.         // 创建表
  12.         tableEnv.executeSql("CREATE TABLE source(\n" +
  13.                 "id INT, \n" +
  14.                 "ts BIGINT, \n"+
  15.                 "vc INT\n"+
  16.                 ")WITH(\n" +
  17.                 "    'connector' = 'datagen', \n" +
  18.                 "    'rows-per-second'='1', \n" +
  19.                 "    'fields.id.kind'='random', \n" +
  20.                 "    'fields.id.min'='1', \n" +
  21.                 "    'fields.id.max'='10', \n" +
  22.                 "    'fields.ts.kind'='sequence', \n" +
  23.                 "    'fields.ts.start'='1', \n" +
  24.                 "    'fields.ts.end'='1000000', \n" +
  25.                 "    'fields.vc.kind'='random', \n" +
  26.                 "    'fields.vc.min'='1', \n" +
  27.                 "    'fields.vc.max'='100'\n" +
  28.                 ");\n");
  29.         tableEnv.executeSql("CREATE TABLE sink (\n" +
  30.                 "    id INT, \n" +
  31.                 "    sumVC INT \n" +
  32.                 ") WITH (\n" +
  33.                 "'connector' = 'print'\n" +
  34.                 ");\n");
  35.         // 执行查询
  36.         //    1.使用sql查询
  37.         Table table = tableEnv.sqlQuery("select id, sum(vc) as sumVC from source where id>5 group by id;");
  38.         //   把table对象注册成表名
  39.         tableEnv.createTemporaryView("tmp", table);
  40.         tableEnv.sqlQuery("select * from tmp where id>7");
  41.         //    2.使用table api查询
  42. //        Table source = tableEnv.from("source");
  43. //        Table result = source
  44. //                .where($("id").isGreater(5))
  45. //                .groupBy($("id"))
  46. //                .aggregate($("vc").sum().as("sumVC"))
  47. //                .select($("id"), $("sumVC"));
  48.         // 输出表
  49.         // sql写法
  50.         tableEnv.executeSql("insert into sink select * from tmp");
  51.         // table api写法
  52. //        result.executeInsert("sink");
  53.     }
  54. }
复制代码
在IDEA运行程序,部分运行结果如下

颠末分析验证,发现输出结果是由tableEnv.executeSql("insert into sink select * from tmp")输出的。

案例2
  1. import java.util.Objects;
  2. public class WaterSensor {
  3.     public String id;//水位传感器id
  4.     public Long ts;//传感器记录时间戳
  5.     public Integer vc;//水位记录值
  6.     public WaterSensor() {
  7.     }
  8.     public WaterSensor(String id, Long ts, Integer vc) {
  9.         this.id = id;
  10.         this.ts = ts;
  11.         this.vc = vc;
  12.     }
  13.     public String getId() {
  14.         return id;
  15.     }
  16.     public void setId(String id) {
  17.         this.id = id;
  18.     }
  19.     public Long getTs() {
  20.         return ts;
  21.     }
  22.     public void setTs(Long ts) {
  23.         this.ts = ts;
  24.     }
  25.     public Integer getVc() {
  26.         return vc;
  27.     }
  28.     public void setVc(Integer vc) {
  29.         this.vc = vc;
  30.     }
  31.     @Override
  32.     public String toString() {
  33.         return "WaterSensor{" +
  34.                 "id='" + id + '\'' +
  35.                 ", ts=" + ts +
  36.                 ", vc=" + vc +
  37.                 '}';
  38.     }
  39.     @Override
  40.     public boolean equals(Object o) {
  41.         if (this == o) {
  42.             return true;
  43.         }
  44.         if (o == null || getClass() != o.getClass()) {
  45.             return false;
  46.         }
  47.         WaterSensor that = (WaterSensor) o;
  48.         return Objects.equals(id, that.id) &&
  49.                 Objects.equals(ts, that.ts) &&
  50.                 Objects.equals(vc, that.vc);
  51.     }
  52.     @Override
  53.     public int hashCode() {
  54.         return Objects.hash(id, ts, vc);
  55.     }
  56. }
复制代码
​​​​​​​
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. public class TableStreamDemo {
  6.     public static void main(String[] args) throws Exception {
  7.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.         DataStreamSource<WaterSensor> sensorDS = env.fromElements(
  9.                 new WaterSensor("s1", 1L, 1),
  10.                 new WaterSensor("s1", 2L, 2),
  11.                 new WaterSensor("s2", 2L, 2),
  12.                 new WaterSensor("s3", 3L, 3),
  13.                 new WaterSensor("s3", 4L, 4)
  14.         );
  15.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  16.         // TODO 1. 流转表
  17.         Table sensorTable = tableEnv.fromDataStream(sensorDS);
  18.         tableEnv.createTemporaryView("sensor", sensorTable);
  19.         Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2");
  20.         Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id");
  21.         // TODO 2. 表转流
  22.         // 2.1 追加流
  23.         tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
  24.         // 2.2 changelog流(结果需要更新)
  25.         tableEnv.toChangelogStream(sumTable ).print("sum");
  26.         // 只要代码中调用了 DataStreamAPI,就需要 execute,否则不需要
  27.         env.execute();
  28.     }
  29. }
复制代码
运行结果


案例3
  1. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.annotation.DataTypeHint;
  4. import org.apache.flink.table.annotation.FunctionHint;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.table.functions.TableFunction;
  8. import org.apache.flink.types.Row;
  9. import static org.apache.flink.table.api.Expressions.$;
  10. public class MyTableFunctionDemo {
  11.     public static void main(String[] args) throws Exception {
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         DataStreamSource<String> strDS = env.fromElements(
  14.                 "hello flink",
  15.                 "hello world hi",
  16.                 "hello java"
  17.         );
  18.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  19.         Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));
  20.         tableEnv.createTemporaryView("str", sensorTable);
  21.         // TODO 2.注册函数
  22.         tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);
  23.         // TODO 3.调用 自定义函数
  24.         // 3.1 交叉联结
  25.         tableEnv
  26.                 // 3.1 交叉联结(笛卡尔积)
  27. //                .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")
  28.                 // 3.2 带 on  true 条件的 左联结
  29. //                .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")
  30.                 // 重命名侧向表中的字段
  31.                 .sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words))  as T(newWord,newLength) on true")
  32.                 .execute()
  33.                 .print();
  34.     }
  35.     // TODO 1.继承 TableFunction<返回的类型>
  36.     // 类型标注: Row包含两个字段:word和length
  37.     @FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>"))
  38.     public static class SplitFunction extends TableFunction<Row> {
  39.         // 返回是 void,用 collect方法输出
  40.         public void eval(String str) {
  41.             for (String word : str.split(" ")) {
  42.                 collect(Row.of(word, word.length()));
  43.             }
  44.         }
  45.     }
  46. }
复制代码
运行结果


案例4
从门生的分数表ScoreTable中计算每个门生的加权平均分。
  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.api.java.tuple.Tuple3;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.table.functions.AggregateFunction;
  8. import static org.apache.flink.table.api.Expressions.$;
  9. public class MyAggregateFunctionDemo {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         
  13.         //  姓名,分数,权重
  14.         DataStreamSource<Tuple3<String,Integer, Integer>> scoreWeightDS = env.fromElements(
  15.                 Tuple3.of("zs",80, 3),
  16.                 Tuple3.of("zs",90, 4),
  17.                 Tuple3.of("zs",95, 4),
  18.                 Tuple3.of("ls",75, 4),
  19.                 Tuple3.of("ls",65, 4),
  20.                 Tuple3.of("ls",85, 4)
  21.         );
  22.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  23.         Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight"));
  24.         tableEnv.createTemporaryView("scores", scoreWeightTable);
  25.         // TODO 2.注册函数
  26.         tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class);
  27.         // TODO 3.调用 自定义函数
  28.         tableEnv
  29.                 .sqlQuery("select name,WeightedAvg(score,weight)  from scores group by name")
  30.                 .execute()
  31.                 .print();
  32.     }
  33.    
  34.     // TODO 1.继承 AggregateFunction< 返回类型,累加器类型<加权总和,权重总和> >
  35.     public static class WeightedAvg extends AggregateFunction<Double, Tuple2<Integer, Integer>> {
  36.         @Override
  37.         public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
  38.             return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1;
  39.         }
  40.         @Override
  41.         public Tuple2<Integer, Integer> createAccumulator() {
  42.             return Tuple2.of(0, 0);
  43.         }
  44.         /**
  45.          * 累加计算的方法,每来一行数据都会调用一次
  46.          * @param acc 累加器类型
  47.          * @param score 第一个参数:分数
  48.          * @param weight 第二个参数:权重
  49.          */
  50.         public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){
  51.             acc.f0 += score * weight;  // 加权总和 =  分数1 * 权重1 + 分数2 * 权重2 +....
  52.             acc.f1 += weight;         // 权重和 = 权重1 + 权重2 +....
  53.         }
  54.     }
  55. }
复制代码
 运行结果


案例5
 表聚合函数
用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很显着,这就像表函数和聚合函数的联合体,是一个“多对多”的转换。
  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. import org.apache.flink.table.functions.TableAggregateFunction;
  7. import org.apache.flink.util.Collector;
  8. import static org.apache.flink.table.api.Expressions.$;
  9. import static org.apache.flink.table.api.Expressions.call;
  10. public class MyTableAggregateFunctionDemo {
  11.     public static void main(String[] args) throws Exception {
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         //  姓名,分数,权重
  14.         DataStreamSource<Integer> numDS = env.fromElements(3, 6, 12, 5, 8, 9, 4);
  15.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  16.         Table numTable = tableEnv.fromDataStream(numDS, $("num"));
  17.         // TODO 2.注册函数
  18.         tableEnv.createTemporaryFunction("Top2", Top2.class);
  19.         // TODO 3.调用 自定义函数: 只能用 Table API
  20.         numTable
  21.                 .flatAggregate(call("Top2", $("num")).as("value", "rank"))
  22.                 .select( $("value"), $("rank"))
  23.                 .execute().print();
  24.     }
  25.    
  26.     // TODO 1.继承 TableAggregateFunction< 返回类型,累加器类型<加权总和,权重总和> >
  27.     // 返回类型 (数值,排名) =》 (12,1) (9,2)
  28.     // 累加器类型 (第一大的数,第二大的数) ===》 (12,9)
  29.     public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
  30.         @Override
  31.         public Tuple2<Integer, Integer> createAccumulator() {
  32.             return Tuple2.of(0, 0);
  33.         }
  34.         /**
  35.          * 每来一个数据调用一次,比较大小,更新 最大的前两个数到 acc中
  36.          *
  37.          * @param acc 累加器
  38.          * @param num 过来的数据
  39.          */
  40.         public void accumulate(Tuple2<Integer, Integer> acc, Integer num) {
  41.             if (num > acc.f0) {
  42.                 // 新来的变第一,原来的第一变第二
  43.                 acc.f1 = acc.f0;
  44.                 acc.f0 = num;
  45.             } else if (num > acc.f1) {
  46.                 // 新来的变第二,原来的第二不要了
  47.                 acc.f1 = num;
  48.             }
  49.         }
  50.         
  51.         /**
  52.          * 输出结果: (数值,排名)两条最大的
  53.          *
  54.          * @param acc 累加器
  55.          * @param out 采集器<返回类型>
  56.          */
  57.         public void emitValue(Tuple2<Integer, Integer> acc, Collector<Tuple2<Integer, Integer>> out) {
  58.             if (acc.f0 != 0) {
  59.                 out.collect(Tuple2.of(acc.f0, 1));
  60.             }
  61.             if (acc.f1 != 0) {
  62.                 out.collect(Tuple2.of(acc.f1, 2));
  63.             }
  64.         }
  65.     }
  66. }
复制代码
运行结果
  1. +----+-------------+-------------+
  2. | op |       value |        rank |
  3. +----+-------------+-------------+
  4. | +I |           3 |           1 |
  5. | -D |           3 |           1 |
  6. | +I |           6 |           1 |
  7. | +I |           3 |           2 |
  8. | -D |           6 |           1 |
  9. | -D |           3 |           2 |
  10. | +I |          12 |           1 |
  11. | +I |           6 |           2 |
  12. | -D |          12 |           1 |
  13. | -D |           6 |           2 |
  14. | +I |          12 |           1 |
  15. | +I |           6 |           2 |
  16. | -D |          12 |           1 |
  17. | -D |           6 |           2 |
  18. | +I |          12 |           1 |
  19. | +I |           8 |           2 |
  20. | -D |          12 |           1 |
  21. | -D |           8 |           2 |
  22. | +I |          12 |           1 |
  23. | +I |           9 |           2 |
  24. | -D |          12 |           1 |
  25. | -D |           9 |           2 |
  26. | +I |          12 |           1 |
  27. | +I |           9 |           2 |
  28. +----+-------------+-------------+
  29. 24 rows in set
复制代码


完成!enjoy it!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曂沅仴駦

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表