FlinkSql之TableAPI详解

打印 上一主题 下一主题

主题 533|帖子 533|积分 1599

一、FlinkSql的概念

核心概念

Flink 的 Table APISQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.
动态表和连续查询

动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。
与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。
TableAPI
首先需要导入依赖
  1.  <dependency><br>     <groupId>org.apache.flink</groupId><br>     <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><br>     <version>${flink.version}</version><br>     <scope>provided</scope><br> </dependency><br> <dependency><br>     <groupId>org.apache.flink</groupId><br>     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><br>     <version>${flink.version}</version><br>     <scope>provided</scope><br> </dependency><br> <dependency><br>     <groupId>org.apache.flink</groupId><br>     <artifactId>flink-csv</artifactId><br>     <version>${flink.version}</version><br> </dependency><br> <dependency><br>     <groupId>org.apache.flink</groupId><br>     <artifactId>flink-json</artifactId><br>     <version>${flink.version}</version><br> </dependency><br> ​<br> <br> <dependency><br>     <groupId>org.apache.commons</groupId><br>     <artifactId>commons-compress</artifactId><br>     <version>1.21</version><br> </dependency>
复制代码
  1.  /**<br>  * 使用TableAPI的基本套路:<br>  * 1.创建表的执行环境<br>  * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取<br>  * 3.对动态表进行查询<br>  * 4.把动态表转换为流<br>  */
复制代码
这里需要注意的问题:
1.TableAPI 中将动态表转换为流时有两种方法
  1.  DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);
复制代码
toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句
  1.  DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);
复制代码
toRetractStream则可以使用
2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class
代码实现:
  1.  package net.cyan.FlinkSql;
  2.  ​
  3.  import net.cyan.POJO.WaterSensor;
  4.  import org.apache.flink.api.common.functions.FilterFunction;
  5.  import org.apache.flink.api.java.tuple.Tuple2;
  6.  import org.apache.flink.configuration.Configuration;
  7.  import org.apache.flink.streaming.api.datastream.DataStream;
  8.  import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9.  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10.  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11.  import org.apache.flink.table.api.Table;
  12.  import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  13.  import org.apache.flink.types.Row;
  14.  ​
  15.  import static org.apache.flink.table.api.Expressions.$;
  16.  ​
  17.  /**<br>  * 使用TableAPI的基本套路:<br>  * 1.创建表的执行环境<br>  * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取<br>  * 3.对动态表进行查询<br>  * 4.把动态表转换为流<br>  */
  18.  public class Demo1 {
  19.      public static void main(String[] args) {
  20.          Configuration configuration=new Configuration();
  21.          configuration.setInteger("rest.port",3333);
  22.          //创建执行环境
  23.          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  24.          env.setParallelism(1);
  25.          //模拟数据
  26.          DataStreamSource WaterSensorSource = env.fromElements(
  27.                  new WaterSensor("S1", 1000L, 10),
  28.                  new WaterSensor("S1", 1000L, 10),
  29.                  new WaterSensor("S2", 2000L, 20),
  30.                  new WaterSensor("S3", 3000L, 30),
  31.                  new WaterSensor("S4", 4000L, 40),
  32.                  new WaterSensor("S5", 5000L, 50)
  33.          );
  34.          //创建表的执行环境
  35.          StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
  36.          //创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
  37.          Table table = tableEnvironment.fromDataStream(WaterSensorSource);
  38.          //对表进行查询
  39.          //1、过时的查询书写
  40.          Table result = table
  41.                  .where("id='S1'")
  42.                  .select("*");
  43.          //2、不过时的书写
  44.          Table result1 = table
  45.  //                .where($("id").isEqual("S1"))
  46.                  .select($("id"), $("ts"), $("vc"));
  47.          //3.聚合函数
  48.          Table select = table
  49.                  .groupBy($("id"))
  50.                  .aggregate($("vc").sum().as("sum_vc"))
  51.                  .select($("id"), $("sum_vc"));
  52.          //把动态表转换为流,使用到了之前创建的表运行环境
  53.  ​
  54.          SingleOutputStreamOperator tuple2DataStream = tableEnvironment
  55.                  .toRetractStream(select, Row.class)
  56.                  .filter(t -> t.f0)
  57.                  .map(t -> t.f1);
  58.  //        DataStream rowDataStream = tableEnvironment.toAppendStream(result, Row.class);
  59.  //        DataStream rowDataStream1 = tableEnvironment.toAppendStream(result1, Row.class);
  60.  //        rowDataStream.print();
  61.  //        rowDataStream1.print();
  62.          tuple2DataStream.print();
  63.  ​
  64.  ​
  65.          try {
  66.              //启动执行环境
  67.              env.execute();
  68.          } catch (Exception e) {
  69.              e.printStackTrace();
  70.          }
  71.  ​
  72.  ​
  73.  ​
  74.      }
  75.  }
复制代码
 
二、TableAPI读取文件

使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名
  1.  package net.cyan.FlinkSql;<br> ​<br> import org.apache.flink.configuration.Configuration;<br> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;<br> ​<br> import org.apache.flink.table.api.DataTypes;<br> import org.apache.flink.table.api.Table;<br> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;<br> import org.apache.flink.table.descriptors.Csv;<br> import org.apache.flink.table.descriptors.FileSystem;<br> import org.apache.flink.table.descriptors.Schema;<br> import org.apache.flink.table.types.DataType;<br> ​<br> import static org.apache.flink.table.api.Expressions.$;<br> ​<br> public class Demo2_readWriteText {<br>     public static void main(String[] args) {<br>         //创建执行环境<br> //        Configuration configuration = new Configuration();<br> //        configuration.setInteger("rest.port", 3333);<br>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br>         env.setParallelism(1);<br>         StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);<br>         //创建查询的数据结果封装类型<br>         Schema schema = new Schema()<br>                 .field("id", DataTypes.STRING())<br>                 .field("ts", DataTypes.BIGINT())<br>                 .field("vc", DataTypes.INT());<br>         talEnv<br>                 .connect(new FileSystem().path("input/sensor.txt"))  //读取文件路径<br>                 .withFormat(new Csv()) //读取文件的数据格式<br>                 .withSchema(schema) //读取出来的数据格式<br>                 .createTemporaryTable("sensor");//定义结果表名<br> ​<br>         //进行查询<br>         Table select = talEnv.from("sensor")<br>                 .where($("id").isEqual("sensor_1"))<br>                 .select($("id"), $("ts"), $("vc"));<br> ​<br> ​<br>         //将查询结果写入到新文件中<br>         //同样建立一个动态表连接<br>         talEnv<br>                 .connect(new FileSystem().path("input/b.txt"))  //写入路径<br>                 .withFormat(new Csv()) //写入文件的数据格式<br>                 .withSchema(schema) //写入的数据格式<br>                 .createTemporaryTable("abc");//定义写入表名<br>         //进行写入操作<br> ​<br>         select.executeInsert("abc");<br> ​<br> //        try {<br> //            //启动执行环境<br> //            env.execute();<br> //        } catch (Exception e) {<br> //            e.printStackTrace();<br> //        }<br> ​<br>     }<br> }
复制代码
 
三、TableAPI 读取、写入Kakfa

基本流程
1>需要创建表的运行环境
  1.  //创建表的运行环境<br> StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
复制代码
2>创建查询出的数据写出结构
  1.  //创建表结构<br> Schema schema=new Schema()<br>         .field("id",DataTypes.STRING())<br>         .field("ts",DataTypes.BIGINT())<br>         .field("vc",DataTypes.INT());
复制代码
3> 创建kafka连接
  1.  //创建kafka连接<br> tabEnv.connect(<br>         new Kafka()<br>         .version("universal")// 版本号<br>         .property("bootstrap.servers","hadoop102:9092")//地址<br>         .property("group.id","cy")//消费者组<br>         .topic("first")//消费主题<br> ​<br>  )<br>         .withFormat(new Json())//写入的格式<br>         .withSchema(schema)<br>         .createTemporaryTable("a");//临时表
复制代码
4> 进行查询
  1.  //创建表<br> Table select = tabEnv.from("a").select("*");
复制代码
5> 创建写入kafka连接
  1.  //创建写入主题<br> tabEnv.connect(<br>         new Kafka()<br>                 .version("universal")// 版本号<br>                 .property("bootstrap.servers","hadoop102:9092")//地址<br>                 .topic("first1")//消费主题<br>                 .sinkPartitionerRoundRobin()//随机分区<br> ​<br> )<br>         .withFormat(new Json())//写入的格式<br>         .withSchema(schema)<br>         .createTemporaryTable("c");
复制代码
6> 写入
  1.  //写入<br> select.executeInsert("c");
复制代码
完整代码如下
  1.  package net.cyan.FlinkSql;<br> ​<br> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;<br> import org.apache.flink.table.api.DataTypes;<br> import org.apache.flink.table.api.Table;<br> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;<br> import org.apache.flink.table.descriptors.Json;<br> import org.apache.flink.table.descriptors.Kafka;<br> import org.apache.flink.table.descriptors.Schema;<br> ​<br> public class Demo5_readWriteKafka {<br>     public static void main(String[] args) {<br>        //创建执行环境<br>        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br>         //创建表的运行环境<br>         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);<br>         //创建表结构<br>         Schema schema=new Schema()<br>                 .field("id",DataTypes.STRING())<br>                 .field("ts",DataTypes.BIGINT())<br>                 .field("vc",DataTypes.INT());<br>         //创建kafka连接<br>         tabEnv.connect(<br>                 new Kafka()<br>                 .version("universal")// 版本号<br>                 .property("bootstrap.servers","hadoop102:9092")//地址<br>                 .property("group.id","cy")//消费者组<br>                 .topic("first")//消费主题<br> ​<br>          )<br>                 .withFormat(new Json())//写入的格式<br>                 .withSchema(schema)<br>                 .createTemporaryTable("a");<br>         //创建表<br>         Table select = tabEnv.from("a").select("*");<br>         //创建写入主题<br>         tabEnv.connect(<br>                 new Kafka()<br>                         .version("universal")// 版本号<br>                         .property("bootstrap.servers","hadoop102:9092")//地址<br>                         .topic("first1")//消费主题<br>                         .sinkPartitionerRoundRobin()//随即分区<br> ​<br>         )<br>                 .withFormat(new Json())//写入的格式<br>                 .withSchema(schema)<br>                 .createTemporaryTable("c");<br> ​<br>         //写入<br>         select.executeInsert("c");<br> ​<br> ​<br>     }<br> }
复制代码
 
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

水军大提督

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表