Flink 中 Table API 和 SQL
实行介绍
在 Spark 中,最底子的编程模子是 RDD 编程。但是并不是全部的步伐员都可以大概用 RDD 很好地处理数据,以是 Spark 社区在 RDD 的底子上增长了关系型编程接口 Spark DataFrame 和 Spark SQL。Spark DataFrame 和 Spark SQL 的出现,大大降低了 Spark 的利用门槛,使那些并不擅长 Scala 以及只会 SQL 的步伐员和数据分析师也能利用 Spark 的分析本领举行大数据分析。在 Flink 中也有类似的编程接口,就是本节实行中的 Table API 和 SQL。
知识点
- Maven 依赖
- Table API
- Flink SQL
Flink Table API 和 SQL 介绍
在 Flink1.9 之前,开发职员如果必要处理批盘算和流盘算,必要同时掌握两种编程接口,对应的业务代码也是两套。一直到 2019 年阿里巴巴 Blink 团队在 Blink 中实现了 Table API 和 SQL,并将 Blink 贡献给 Flink 社区之后,这一题目才得以解决。由于 Table API 和 SQL 出现的时间较晚,以是功能尚不完善,但是已有功能已经可以解决开发职员的很多困难。
根据上图我们可以看到,Flink 中最底层的编程接口是 Stateful Stream Processing,在其的上面一层就是 DataStream/DataSet API,实际上我们在前面的实行中所利用的就是 DataStream/DataSet API,分别对应流处理 API 和批处理 API。再往上就是 Table API 和 SQL。越往上层的接口利用越简单方便,越往底层的接口利用更加灵活,但是利用也更加困难,对编程职员的编码本领要求也越高。Table API 和 SQL 的出现,使得我们可以通过简单的 API 调用和在代码中加入 SQL 就可以完成结构化数据处理,大大进步了开发效率。
环境搭建
要利用 Flink Table API 和 Flink SQL,必要在 pom.xml 文件中新加入两个依赖:
- [/code] xml
- 代码解读
- 复制代码
- <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.17.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.12</artifactId> <version>1.17.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.17.2</version> </dependency>
-
- [list]
- [*]flink-table-planner:planner 筹划器。是 Table API 最主要的部门,提供了运行时环境和天生步伐执行筹划的 planner。
- [*]flink-table-api-scala-bridge:bridge 桥接器。主要负责 Table API 和 DataStream/DataSet API 的连接支持,按照语言分 java 和 scala 版本。
- [/list] 注意:在引入 Table API 和 SQL 的依赖时间的版本为1.17.2,此时 Flink 中的核心依赖版本也应该修改为对应的版本。
- [code]
复制代码 xml
代码解读
复制代码
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.17.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.17.2</version> </dependency>
和 DataStream API 以及 DataSet API 一样,Table API 和 SQL 也有相似的编程模子。以是要在代码中利用 Table API 和 SQL 就必须先创建其所必要的执行环境 TableEnviroment 对象。在 Flink 1.9 之前可以通过下面这种方式创建:
- [/code] ini
- 代码解读
- 复制代码
- // 创建流环境 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment // 基于流环境创建Table环境 val tableEvn = StreamTableEnvironment.create(streamEnv)
- 在 Flink 1.9 之后还可以利用下面这种方式创建:
- [code]
复制代码 scss
代码解读
复制代码
// 创建流环境 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment // 创建EnvironmentSettings对象 val envSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() // 创建Table环境 val tableEnv = StreamTableEnvironment.create(streamEnv, envSettings)
自从 Blink 加入之后,Flink 中就保存了两套 Planner,Flink Planner 被称为 Old Planner,新加入的被称为 Blink Planner。由于 blink 不支持表和 DataSet 之间的转换等,以是官方推荐利用 Old Planner。
Table API
创建 Table
在 Flink 中创建表有两种方法:
- 从文件创建(批盘算)
- 从 DataStream 创建(流盘算)
一般只有批盘算,我们才会从文件从文件中创建。在 /home/vlab 路径下创建 userlog.log 文件来表示用户日志,并加入如下内容:
- [/code] 代码解读
- 复制代码
- 20230403121533,login,北京,118.128.11.31,0001 20230403121536,login,上海,10.90.113.150,0002 20230403121544,login,成都,112.112.31.33,0003 20230403121559,login,成都,101.132.93.24,0004 20230403121612,login,上海,189.112.89.78,0005 20230403121638,login,北京,113.52.101.50,0006
- 以上内容的每一行表示一条用户登录的日志,以逗号分隔,从左往右分别表示登录时间、用户行为、登录城市、登录 IP、UserID,IP 地址和城市不对应,请忽略!然后在 com.vlab.table 包下创建 TableTest object,代码如下:
- [code]
复制代码 less
代码解读
复制代码
package com.vlab.table import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment /** * @projectName FlinkLearning * @package com.vlab.table * @className com.vlab.table.TableTest * @description 示例代码展示了怎样利用Flink的Table API读取CSV文件。 * @author pblh123 * @date 2025/2/10 10:10 * @version 1.0 * */ object TableTest { def main(args: Array[String]): Unit = { // 参数数量判定 if (args.length != 1) { System.err.println("Usage: TableTest <input path>") System.exit(5) } val inputPath = args(0) // 利用Scala的流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 初始化Table API的上下文环境 val tableEnv = StreamTableEnvironment.create(env) // 利用DDL语句创建一个临时表用于读取CSV文件数据 tableEnv.executeSql(s""" |CREATE TABLE user5 ( | `time` BIGINT, | `action` STRING, | `city` STRING, | `ip` STRING, | `uid` BIGINT |) WITH ( | 'connector' = 'filesystem', | 'path' = '$inputPath', | 'format' = 'csv' |) """.stripMargin) // 确保用户表已注册并已加载 val table = tableEnv.from("user5") // 打印schema信息 table.printSchema() // 创建一个控制台输出表 tableEnv.executeSql( s""" CREATE TABLE console_output ( `time` BIGINT, `action` STRING, `city` STRING, `ip` STRING, `uid` BIGINT ) WITH ( 'connector' = 'print' ) """.stripMargin) // 创建查询操作,将数据插入到输出表 val query = tableEnv.sqlQuery("SELECT * FROM user5 WHERE city = '北京'") // 将查询结果插入到控制台输出表 query.executeInsert("console_output") } }
上面代码中的 env 对象和 tableEnv 对象的类型,都是批盘算才会用到的。创建好 tableEnv 之后,读取了 /home/vlab/userlog.log 文件,并通过sql指定命据源格式为 csv。注意,csv 类型的文件指的是以英文逗号分隔的文本文件,并非必须是 .csv 扩展名。通过DDL语句指定了 Table 的结构信息,最后打印了 user5 表的结构信息,如下所示:

通过流盘算创建 Table 的代码如下所示:
- [/code] scss
- 代码解读
- 复制代码
- package com.vlab.table import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.java.StreamTableEnvironment object StreamTableTest { /** * 用户日志案例类 * 用于表示用户日志变乱,包罗时间、操作、城市、IP地址和用户ID */ case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long) /** * 主步伐入口 * 该步伐从socket流中读取用户日志数据,处理并注册为表,然后查询并打印 * * @param args 下令行参数,必要主机名和端口号 */ def main(args: Array[String]): Unit = { // 检查下令行参数数量是否正确 if (args.length != 2) { System.err.println("Usage: TableTest <hostname> <port>") System.exit(5) } // 初始化流执行环境和表环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 解析下令行参数 val hostname = args(0) val port = args(1).toInt // 从socket流中读取数据,过滤空行,解析并映射为UserLog对象 val data = env.socketTextStream(hostname, port) .filter(_.nonEmpty) .map(line => { val fields = line.split(",") if (fields.length != 5) { throw new IllegalArgumentException(s"Invalid input: $line") } UserLog(fields(0).toLong, fields(1), fields(2), fields(3), fields(4).toLong) }) // 注册为表 tableEnv.createTemporaryView("user_log", data) // 直接查询表 val table = tableEnv.sqlQuery("SELECT * FROM user_log") // 转换为 DataStream 并打印 tableEnv.toDataStream(table).print() // 执行流处理作业 env.execute("Stream Table Test") } }
- 和批盘算的方式相比,流盘算我们利用了 StreamExecutionEnvironment 和 StreamTableEnvironment ,在终端运行 nc -l -p 9999 之后打印的结果如下所示:
- [align=center][img=1189,590]https://i-blog.csdnimg.cn/direct/ab4c00c8251c4215b8995c8cd7745679.png[/img][/align]
- 有同学可能会好奇,我们并没有通过 Socket 发送数据,它怎么就知道表结构并打印了呢?因为我们在 map 方法中已经将其转换为 UserLog 类型了,以是 Table 的结构和 UserLog 是一致的。
- [size=1]修改字段名[/size]
- Flink 中支持按照字段的位置举行字段重定名和通过 as 关键字举行字段重定名,但是无论通过哪种方式都必要导入 Flink Table 的隐式转换:
- [code]
复制代码 arduino
代码解读
复制代码
import org.apache.flink.table.api.scala._
基于位置的方式:
- [/code] rust
- 代码解读
- 复制代码
- val table = tableEnv.fromDataStream(userLogStream, 'time, 'action as 'action2, 'city, 'ip, 'user_id)
- 完整代码如下:
- [code]
复制代码 scss
代码解读
复制代码
package com.vlab.table import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object StreamTableTest { case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long) def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("Usage: TableTest <hostname> <port>") System.exit(5) } val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val hostname = args(0) val port = args(1).toInt // 利用 flatMap 过滤无效数据 val userLogStream = env.socketTextStream(hostname, port) .flatMap { line => try { val tokens = line.split(",") // 1. 检查字段数量 if (tokens.length != 5) { println(s"Invalid data format: $line") None } // 2. 检查数值字段是否为空 else if (tokens(0).trim.isEmpty || tokens(4).trim.isEmpty) { println(s"Empty numeric field in: $line") None } // 3. 尝试转换数值类型 else { Some(UserLog( tokens(0).trim.toLong, tokens(1).trim, tokens(2).trim, tokens(3).trim, tokens(4).trim.toLong )) } } catch { case e: NumberFormatException => println(s"Number conversion failed for line: $line (${e.getMessage})") None case e: Exception => println(s"Unexpected error parsing line: $line") None } } // 改字段名 val table = tableEnv.fromDataStream(userLogStream, 'time, 'action as 'action2, 'city, 'ip, 'user_id) tableEnv.createTemporaryView("user_log_table", table) // 打印表结构信息 tableEnv.sqlQuery("SELECT * FROM user_log_table").printSchema() // 过滤城市北京,上海 val result2 = tableEnv.sqlQuery( """ |SELECT * |FROM user_log_table |WHERE city IN ('北京', '上海') |""".stripMargin) result2.execute().print() env.execute("User Log Processing Example") } }
运行结果如下:
查询
假设我们要过滤出城市为北京和成都的用户,并分别统计这两个城市中的用户数量,利用 SQL 应该是这样的:
- [/code] csharp
- 代码解读
- 复制代码
- select city, count(user_id) as cnt from temp_userlog where city = '北京' or city = '成都' group by city
- 对应到 Flink Table API 应该是:
- [code]
复制代码 kotlin
代码解读
复制代码
package com.vlab.table import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment /** * @projectName FlinkLearning * @package com.vlab.table * @className com.vlab.table.StreamTableOperation * @description ${description} * @author pblh123 * @date 2025/2/10 12:59 * @version 1.0 */ object StreamTableOperation { case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long) def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("Usage: TableTest <hostname> <port>") System.exit(5) } val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val hostname = args(0) val port = args(1).toInt // 数据洗濯转换流(利用之前修复的版本) val userLogStream = env.socketTextStream(hostname, port) .flatMap { line => try { val tokens = line.split(",") if (tokens.length != 5 || tokens(0).trim.isEmpty || tokens(4).trim.isEmpty) None else Some(UserLog( tokens(0).trim.toLong, tokens(1).trim, tokens(2).trim, tokens(3).trim, tokens(4).trim.toLong )) } catch { case _: Exception => None } } // 转换为Table val table = tableEnv.fromDataStream(userLogStream) // 实现分组统计逻辑 val res = table .filter($"city" === "北京" || $"city" === "成都") // 等价于 where .groupBy($"city") .select( $"city", $"user_id".count.as("cnt") ) // 打印结果模式 res.printSchema() // 执行并打印结果 res.execute().print() env.execute("City User Count") } }
此中
where('city === "北京" || 'city === "成都")
等同于
filter('city === "北京" || 'city === "成都"),
groupBy('city)
等同于
groupBy("city")
在终端执行 nc -l -p 9999 并运行以上代码,然后在终端输入以下内容:
- [/code] 代码解读
- 复制代码
- 20230403121533,login,北京,118.128.11.31,0001 20230403121536,login,上海,10.90.113.150,0002 20230403121544,login,成都,112.112.31.33,0003 20230403121559,login,成都,101.132.93.24,0004 20230403121612,login,上海,189.112.89.78,0005 20230403121638,login,北京,113.52.101.50,0006
- 运行结果如下:
- [align=center][img=1146,761]https://i-blog.csdnimg.cn/direct/b8c4425ca5524457baad6c5bd9be32a6.png[/img][/align]
- [b]回撤流(Retract Stream)机制[/b]
- [align=center][img=534,299]https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fp6-xtjj-sign.byteimg.com%2Ftos-cn-i-73owjymdk6%2F4133e003c2af43589efb03628c60a14d~tplv-73owjymdk6-jj-mark-v1%3A0%3A0%3A0%3A0%3A5o6Y6YeR5oqA5pyv56S-5Yy6IEAg6Iiq6YeN5ZCN5LqGNzc5NDg%3D%3Aq75.awebp&pos_id=dJSEFjon[/img][/align]
- 最终输出的类型为 (Boolean, T),最前面的布尔值代表的是数据更新类型,True 对应的是 Insert 操作更新的数据,而 False 对应的是 Delete 操作更新的数据。当第一条北京的数据出现时,属于 Insert 操作,当第二条北京的数据出现时,后面的统计结果由 1 变为 2,以是之前为 1 的那条数据就会被 Delete,以是会对应到 False 输出一次;成都对应的数据也是同理,当第一条成都的数据出现时,属于 Insert 操作,而当第二条数据出现的时间,原来的统计结果为 1 的数据会被 Delete,以是会对应为 False。
-
- [list]
- [*] [b]实用场景[/b]:当Table的盘算结果必要支持更新时(如GROUP BY聚合)
- [*] 数据结构
- :每个元素是二元组
- [code]
复制代码 sql
代码解读
复制代码
(Boolean, Row)
- [/code] javascript
- 代码解读
- 复制代码
- Boolean 表示操作类型:
- [list]
- [*]true:表示新增或更新记录(等价INSERT或UPDATE)
- [*]false:表示撤回之前的记录(等价DELETE)
- [/list]
- [*] Row 是实际数据内容
- [/list]
- [/list] [b](2)为什么必要过滤 _._1 == true[/b]
-
- [list]
- [*] 在流式盘算中,聚合结果可能不断更新
- [*] 示例场景:
- [code]
复制代码 scss
代码解读
复制代码
输入数据: 1001,北京 1002,北京 1003,成都 输出过程: (+I, 北京, 1) // 第一次出现北京 (-U, 北京, 1) // 撤回旧值 (+U, 北京, 2) // 更新为最新值 (+I, 成都, 1) // 新增成都记录
- 过滤后只保存最终有效结果:
- [/code] scss
- 代码解读
- 复制代码
- (+I, 北京, 1) (+U, 北京, 2) (+I, 成都, 1)
- [/list] 可以做如下修改:
- [code]
复制代码 scss
代码解读
复制代码
package com.vlab.table import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row /** * @projectName FlinkLearning * @package com.vlab.table * @className com.vlab.table.StreamTableOperation * @description ${description} * @author pblh123 * @date 2025/2/10 12:59 * @version 1.0 */ object StreamTableOperation { case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long) def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("Usage: TableTest <hostname> <port>") System.exit(5) } val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val hostname = args(0) val port = args(1).toInt // 数据洗濯转换流(利用之前修复的版本) val userLogStream = env.socketTextStream(hostname, port) .flatMap { line => try { val tokens = line.split(",") if (tokens.length != 5 || tokens(0).trim.isEmpty || tokens(4).trim.isEmpty) None else Some(UserLog( tokens(0).trim.toLong, tokens(1).trim, tokens(2).trim, tokens(3).trim, tokens(4).trim.toLong )) } catch { case _: Exception => None } } // 转换为Table val table = tableEnv.fromDataStream(userLogStream) // 实现分组统计逻辑 val res = table .filter($"city" === "北京" || $"city" === "成都") // 等价于 where .groupBy($"city") .select( $"city", $"user_id".count.as("cnt") ) // 打印结果模式 res.printSchema() // 转换为回撤流并处理 // 添加类型声明的版本 tableEnv .toRetractStream[Row](res) .filter(_._1) .map { (t: (Boolean, Row)) => // 显式声明输入类型 t match { case (_, row) => val city = row.getFieldAs[String](0) val count = row.getFieldAs[Long](1) s"【实时统计】城市:$city, 用户数:$count" } } .print() env.execute("City User Count") } }
重新在控制台输入相同的日志数据,运行结果如下:
Flink SQL
如果你觉得上面的 Table API 利用很不习惯,没关系,你同样可以用 Flink SQL 来处理数据。Flink SQL 底层利用 Apache Calcite 框架,将标准的 SQL 语句转为 Flink 底层的 API 算子,并会自动基于 SQL 的逻辑举行性能优化。你只必要关心自己的业务逻辑,并将业务逻辑转换为标准的 SQL 语句,剩下的 Flink 可以帮你搞定。事实上,在开发过程中,开发职员经常会将 Table API 和 Flink SQL 搭配利用。
我们在 com.vlab.table 包下创建 MyFlinkSql object。照旧针对”过滤出城市为北京和成都的用户,并分别统计这两个城市中的用户数量“这个业务逻辑,对应到 Flink SQL 中的语法为:
- [/code] scss
- 代码解读
- 复制代码
- package com.vlab.table import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row /** * @projectName FlinkLearning * @package com.vlab.table * @className com.vlab.table.SqlUserCount * @description ${description} * @author pblh123 * @date 2025/2/10 13:30 * @version 1.0 */ object SqlUserCount { case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long) def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("Usage: SQLUserCount <hostname> <port>") System.exit(1) } // 1. 初始化环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 2. 读取并处理数据源 val userLogStream = env.socketTextStream(args(0), args(1).toInt) .flatMap { line => try { val tokens = line.split(",") if (tokens.length == 5 && tokens(0).nonEmpty && tokens(4).nonEmpty) { Some(UserLog( tokens(0).trim.toLong, tokens(1).trim, tokens(2).trim, tokens(3).trim, tokens(4).trim.toLong )) } else None } catch { case _: Exception => None } } // 3. 注册表 tableEnv.createTemporaryView("user_logs", userLogStream) // 4. 执行SQL查询 val resultTable = tableEnv.sqlQuery( """ |SELECT | city, | COUNT(user_id) AS user_count |FROM user_logs |WHERE city IN ('北京', '成都') |GROUP BY city |""".stripMargin) // 5. 转换为数据流并输出 tableEnv .toRetractStream[Row](resultTable) .filter(_._1) // 只保存新增/更新记录 .map { (t: (Boolean, Row)) => // 显式声明输入类型 t match { case (_, row) => val city = row.getFieldAs[String](0) val count = row.getFieldAs[Long](1) s"【实时统计】城市:$city, 用户数:$count" } } .print() env.execute("SQL City User Count") } }
- 第二种写法为:
- [code]
复制代码 scss
代码解读
复制代码
val resultTable = tableEnv.sqlQuery( """ |SELECT | city, | COUNT(user_id) FILTER (WHERE city IN ('北京', '成都')) AS user_count |FROM user_logs |GROUP BY city |HAVING city IN ('北京', '成都') |""".stripMargin) // 5. 转换为数据流并输出 tableEnv .toRetractStream[Row](resultTable) .filter(_._1) // 只保存新增/更新记录 .map { (t: (Boolean, Row)) => // 显式声明输入类型 t match { case (_, row) => val city = row.getFieldAs[String](0) val count = row.getFieldAs[Long](1) s"【实时统计】城市:$city, 用户数:$count" } } .print()
在终端执行 nc -l -p 9999,然后运行以上任意一种方式(推荐利用第一种),并在终端发送以下日志:
[code][/code] 代码解读
复制代码
20230403121533,login,北京,118.128.11.31,0001 20230403121536,login,上海,10.90.113.150,0002 20230403121544,login,成都,112.112.31.33,0003 20230403121559,login,成都,101.132.93.24,0004 20230403121612,login,上海,189.112.89.78,0005 20230403121638,login,北京,113.52.101.50,0006
运行结果如下:

总结
本节实行我们介绍了 Flink 中的 Table API 和 SQL 的利用,Table API 和 SQL 在处理结构化数据时,相对于算子而言有绝对的优势,固定的接⼝ API 和标准的 SQL 语句⼤⼤降低开发⼈员的⼯作量,并提升开发效率,也⽅便后期的维护。虽然 Flink 中的 Table API 和 SQL 还不算完善,但就现在所提供的功能已经可以满足我们大部门的需求了。关于后续的新特性,大家可以关注 Flink 社区的动态。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|