FlinkSQL学习笔记(二)表界说详解

打印 上一主题 下一主题

主题 547|帖子 547|积分 1641

写在前面

本小节主要阐明了FlinkSQL在界说表时间的一些根本规则,其中包罗:
1、Catalog为核心的临时表、永久表、视图的关系
2、Table对象和SQL界说表的方式
3、表界说过程中的schema、format、watermark、connector的根本利用方式
4、以kafka connector为例,具体阐明了如何建表并获取元数据的过程
【这边并没有按照视频推导,而是按照官方文档自己一步步完成的,这个可以点个赞】
【碰到题目要学会看日记,无论是否为SQL,日记很重要】
难度自己不大,主要在于如何灵活运用,实在本质上在于对于官方文档的利用。
1、表的概述以及类别



  • 表的表示结构
    catalog name:元数据空间,常用于标识不同的“源”,比如hive catalog,inner catalog等;使得Flink里面创建的表hive中能查到,但是不一定可以取数,原因在于这里不同的“源”的界说在hive中没有,不一定可以查到。更多细节参考补充阐明。
    database name:通常语义中的“库”
    table name:通常语义中的“表”
  • 表与视图
    FlinkSQL中的表,可以是Virtual的(view视图)和regular的(table通例表)
    table形貌了一个物理上的外部数据源,如文件、数据库表、kafka消息topic
    view则基于表创建,代表一个或多个表上的一段计算逻辑(就是对一段查询筹划的逻辑封装)
    (不管是table照旧view,在tableAPI中得到的都是table对象)
  • 临时表与永久表
    临时表(视图):创建时带temporary关键字(create temporary view,create temporary table);表 schema 只维护在所属 flink session 运行时内存中;当所属的 flink session 结束后表信息将不复存在;且该表无法在 flink session 间共享;
    永久表(视图):创建时不带temporary关键字(create view,create table);表 schema 可记载在外部持久化的元数据管理器中(比如 hive 的 metastore);当所属 flink session 结束后,该表信息不会丢失;且在不同 flink session 中都可访问到该表的信息。
    注:永久表的元数据如果不持久化,也没有办法持久。
2、表的的界说概述

下面内容简单相识即可,本质上照旧对建表API的利用,实际运用过程中注意Stream、Table、SQL之间的切换方式即可。
2.1、基于TableAPI创建[Table对象]



  • 从已存在的表
  1. Table table = tableEnv.from("test-table");//通过在env的catalog中注册的表名,获取Table对象//通过在env的catalog中注册的表名,获取Table对象
复制代码


  • 从 TableDescriptor(毗连器/format/schema/options),本质上照旧from方法
  1. Table table = tableEnv.from(TableDescriptor
  2.         .forConnector("kafka")
  3.         .schema(Schema.newBuilder()
  4.                 .column("id", DataTypes.INT())
  5.                 .column("name", DataTypes.STRING())
  6.                 .column("age", DataTypes.INT())
  7.                 .column("gender", DataTypes.STRING())
  8.                 .build())
  9.         .format("json")
  10.         .option("topic", "testTopic")
  11.         .option("properties.bootstrap.servers", "192.168.247.129:9092")
  12.         .option("properties.group.id", "testGroup")
  13.         .option("scan.startup.mode", "earliest-offset")
  14.         .option("json.fail-on-missing-field", "false")
  15.         .option("json.ignore-parse-errors", "true")
  16.         .build());
复制代码


  • 从 DataStream获取
    这里 主动推断 schema(反射手段),如果必要自界说的话,看看Schema的利用大概是一个很不错的选择,通过SQL创建的篇章里面给了一个简单的例子。
  1. DataBean bean1 = new DataBean(1, "s1", "e1", "pg1", 1000);
  2. DataBean bean2 = new DataBean(2, "s2", "e3", "pg1", 1000);
  3. DataStreamSource<DataBean> dataStream1 = env.fromElements(bean1, bean2);
复制代码


  • 从 Table对象上的查询 api生成
    通过 Table上调用查询 api,生成新的 Table对象(本质上就是 view)
  1. Table table = table3.select($("guid"), $("uuid"));
复制代码


  • 从测试数据
  1. Table table2 = tableEnv.fromValues(
  2.        DataTypes.ROW(
  3.                DataTypes.FIELD("id", DataTypes.INT()),
  4.                DataTypes.FIELD("name", DataTypes.STRING())),
  5.        Row.of(1, "jack")
  6. );
复制代码
2.2、基于TableSQL创建[不返回Table对象]



  • 从已存在的dataStream注册
  1. tableEnv.createTemporaryView("t1",table2);
复制代码


  • 从已存在的Table对象注册
  1. tableEnv.createTemporaryView("t1",table2);
复制代码


  • 从TableDescriptor(毗连器)注册
  1. DataBean bean1 = new DataBean(1, "s1", "e1", "pg1", 1000);
  2. DataBean bean2 = new DataBean(1, "s1", "e1", "pg1", 1000);
  3. DataStreamSource<DataBean> dataStream1 = tableEnv.fromElements(bean1, bean2);
  4. Schema schema = Schema.Builder.column...build();
  5. tenv.createTemporaryView("t1",dataStream1,schema);
复制代码


  • 通过Connector注册
  1. tenv.createTable("t1", TableDescriptor.forConnector("filesystem")
  2. .option("path", "file:///d:/a.txt")
  3. .format("csv")
  4. .schema(Schema.newBuilder()
  5. .column("guid",DataTypes.STRING())
  6. .column("name",DataTypes.STRING())
  7. .column("age",DataTypes.STRING())
  8. .build())
  9. .build());
复制代码


  • 实验SQL的DDL语句注册
  1. tableEnv.executeSql(
  2.          "        CREATE TABLE KafkaTable (                               "
  3.           + "          `user_id` BIGINT,                                     "
  4.           + "          `item_id` BIGINT,                                     "
  5.           + "          `behavior` STRING,                                    "
  6.           + "          `ts` TIMESTAMP(3) METADATA FROM 'timestamp'           "
  7.           + "        ) WITH (                                                "
  8.           + "          'connector' = 'kafka',                                "
  9.           + "          'topic' = 'user_behavior',                            "
  10.           + "          'properties.bootstrap.servers' = 'localhost:9092',    "
  11.           + "          'properties.group.id' = 'testGroup',                  "
  12.           + "          'scan.startup.mode' = 'earliest-offset',              "
  13.           + "          'format' = 'csv'                                      "
  14.           + "        )                                                       "
  15. );
复制代码
3、catalog详解

3.1、什么是catalog

一句话:catalog就是一个元数据空间,简单说就是记载、获取元数据(表界说信息)的实体;
flinkSQL在运行时,可以拥有多个catalog,它们由catalogManager模块来注册、管理;
CatalogManager中可以注册多个元数据空间;
flinkSQL环境创建之初,就会初始化一个默认的元数据空间
空间名称:default_catalog
空间实现类:GenericInMemoryCatalog,默认的元数据空间对象


元数据空间管理对象:CatalogManager
①:用于记载Session中所有的Catalog
②:初始化一个默认的Catalog
③:初始化用于记载Session中注册的临时表

3.2、深入测试catalog


  • 注册一个HiveCatalog
  1. // 创建一个hive元数据空间的实现对象
  2. HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "conf");
  3. // 将hive元数据空间对象注册到环境中
  4. tableEnv.registerCatalog("myHiveCatalog",hiveCatalog);
复制代码

  • 实验分别在不同的Catalog中创建表、视图、临时表
  1. //1、尝试在HiveCatalog中建表
  2. tableEnv.executeSql("CREATE TABLE IF NOT EXISTS myHiveCatalog.`default`.t_kafka (...)");
  3. //2、默认在DefaultCatalog中建表
  4. tableEnv.executeSql("CREATE TABLE t_kafka2 (...)  ");
  5. //3、在HiveCatalog中创建视图
  6. tableEnv.executeSql("CREATE VIEW IF NOT EXISTS myHiveCatalog.`default`.t_kafka_view ...");
  7. //4、创建临时表,指定HiveCatalog
  8. tableEnv.executeSql("create temporary table myHiveCatalog.`default`.test_temporary_hive...");
复制代码


  • 查看CatalogManager,得到Catalog的结果,临时表单独存在在temporaryTables

3.3、临时表与永久表的底层差异

结论1:如果利用hive元数据空间窗口表、视图,则:


  • 永久表(视图)的元信息,都会被写入 hive的元数据管理器中,从而可以实现永久存在
  • 临时表(视图)的元信息,并不会写入 hive的元数据管理其中,而是放在 catalogManager的一个 temporaryTables的内存 hashmap中记载
  • 临时表空间中的表名(全名)如果与 hive空间中的表名雷同,则查询时会优先选择临时表空间的表
结论 2:如果选择 GenericInMemoryCatalog元数据空间来创建表、视图,则:


  • 永久表(视图)的元信息,都会被写入 GenericInMemoryCatalog的元数据管理器中(内存中)
  • 临时表(视图)的元信息,放在 catalogManager的一个 temporaryTables的内存 hashmap中记载
  • 无论永久照旧临时,当 flink的运行 session结束后,所创建的表(永久、临时)都将不复存在
3.4、如何理解hive catalog

注:本质区别,这也阐明之前为啥有的时间进行查询的时间必要切换查询引擎
flinksql利用 hive catalog来建表(查询、修改、删除表),本质上只是利用了 hive的 metastore服务;
更具体来说,flinksql只是把 flinksql的表界说信息,按照 hive元数据的情势,托管到 hive的 metastore中而已!
固然,hive中也能看到这些托管的表信息,但是,并不能利用它底层的 mapreduce大概 spark引擎来查询这些表;
因为 mapreduce大概 spark引擎,并不能理解 flinksql表界说中的信息,也无法为这些界说信息提供相应的组件去读取数据(比如,mr大概 spark就没有 flinksql中的各种 connector组件)
4、表界说详解

4.1、schema字段界说详解



  • physical column,物理字段:源自于“外部存储”体系自己 schema中的字段
    如 kafka消息的 key、value(json格式)中的字段;mysql表中的字段;hive表中的字段;parquet文件中的字段……
  • computed column,表达式字段(逻辑字段):在物理字段上施加一个 sql表达式,并将表达式结果界说为一个字段
  • metadata column,元数据字段:泉源于 connector从外部存储体系中获取到的“外部体系元信息”。比如,kafka的消息,通常意义上的数据内容是在 record的 key和 value中的,而实质上(底层角度来看),kafka中的每一条 record,不但带了 key和 value数据内容,还带了这条 record所属的 topic,所属的 partition,地点的 offset,以及 record的 timetamp和 timestamp范例等“元信息”,而 flink的 connector可以获取并暴露这些元信息,并允许用户将这些信息界说成 flinksql表中的字段;
  • 主键约束:flinkSQL自己也支持主键约束,这个目前没有效到,感觉应该可以类别Mysql的主键约束理解。
4.2、format组件

connector毗连器在对接外部存储时,根据外部存储中的数据格式不同,必要用到不同的 format组件;
format组件的作用就是:告诉毗连器,如何分析外部存储中的数据及映射到表 schema
这里枚举常见的两种format
json


  • flink官网文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/
  • 假定有如下的json格式的数据:{"id":1,"friends":[{"name":"a","info":{"addr":"bj","gender":"male"}},{"name":"b","info":{"addr":"sh","gender":"female"}}]}
  • flinkSQL语句
  1. tableEnvironment.executeSql(
  2.          " CREATE TABLE t_json(                                                                                   "
  3.                 + "         id        INT,                                                   "
  4.                 + "         friends        ARRAY<ROW<name STRING,info MAP<STRING,STRING>>>    "
  5.                 + "                                                                "
  6.                 + " )WITH(                                                         "
  7.                 + "         'connector' = 'filesystem',                                "
  8.                 + "         'path' = 'input/json',                                     "
  9.                 + "         'format' = 'json'                                          "
  10.                 + " )                                                              "
  11. );
  12. tableEnvironment.executeSql("DESC t_json").print();
  13. tableEnvironment.executeSql(
  14.          "  SELECT id                                               "
  15.                 + "           , friend.name                                      "
  16.                 + "           , friend.info['gender']                            "
  17.                 + "           , friend.info['addr']                              "
  18.                 + "    FROM t_json,                                          "
  19.                 + "    UNNEST(friends)   AS friend                           "
  20. ).print();
复制代码


  • 控制台输出

csv


  • flink官网文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/csv/
  • 假定命据格式如下:
    |1|,|zs|,|18|
    #哈哈哈哈
    |2|,|ls|,|20|
    |3|,|ww|,\N
  • flinkSQL语句
  1. tableEnvironment.executeSql(
  2.          " CREATE TABLE t_csv (                                                                                   "
  3.                 + "         id         INT,                                                  "
  4.                 + "         name STRING,                                               "
  5.                 + "         age  INT                                                   "
  6.                 + "                                                                "
  7.                 + " )WITH(                                                         "
  8.                 + "         'connector' = 'filesystem',                                "
  9.                 + "         'path' = 'input/csv',                                      "
  10.                 + "         'format' = 'csv',                                          "
  11.                 + "         'csv.quote-character' = '|',                               "
  12.                 + "         'csv.ignore-parse-errors' = 'true',                        "
  13.                 + "         'csv.allow-comments' = 'true',                             "
  14.                 + "         'csv.null-literal' = '\\N',                                "
  15.                 + "         'format' = 'csv'                                           "
  16.                 + " )                                                              "
  17. );
  18. tableEnvironment.executeSql("DESC t_csv").print();
  19. tableEnvironment.executeSql(
  20.          "  SELECT id                                               "
  21.                 + "           , name                                             "
  22.                 + "           , age                                              "
  23.                 + "    FROM t_csv                                            "
  24. ).print();
复制代码


  • 控制台输出

4.3、 watermark与time属性详解

时间属性界说,主要是用于各类基于时间的运算操纵(如基于时间窗口的查询计算)
4.3.1、界说水位线



  • 表界说
    注:这里的时间范例必须为Timestamp

  • 查询结果

4.3.2、表与流之间 WaterMark

WaterMark和转换之前的WaterMark计算规则保持一致


  • 流转表
    流转表的过程中,无论“源流”是否存在 watermark,都不会主动传递 watermark
    如需时间运算(如时间窗口等),必要在转换界说中显式声明 watermark策略

    • 先设法界说一个 timestamp(3)大概 timestamp_ltz(3)范例的字段(可以来自于数据字段,也可以来自于一个元数据: rowtime

    • 然后基于该字段,用 watermarkExpression声明 watermark策略


  • 表转流
    前提:源表界说了 wartermark策略;
    则将表转成流时,将会主动传递源表的 watermark;

4.4、connector详解

connector常是用于对接外部存储建表(源表或目标表)时的映射器、桥接器;
connector本质上是对 flink的 table source /table sink算子的封装;参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
连机器利用的核心要素:


  • 导入毗连器 jar包依赖
  • 指定毗连器范例名
  • 指定毗连器所需的参数(不同毗连器有不同的参数设置),如:format
  • 获取毗连器所提供的元数据,如:schema
Flink支持的毗连器有很多种,包罗:Filesystem、Elasticsearch、MongoDB、Apache Kafka、Apache HBase…
以kafka毗连器举例阐明其在FlinkSQL中的过程

  • 可以获取的元数据

  • 假定kafka中存在如下数据
  1. // 创建生产者实例
  2. Producer<String, String> producer = new KafkaProducer(props);
  3. // 创建消息,并添加头部信息
  4. ProducerRecord<String, String> record = new ProducerRecord("connector_test",
  5.         "{"k1":13,"k2":23}",
  6.         "{"k1":"value_1","k2":"value_2","eventID":"002","eventTime":1708759402246}");
  7. record.headers().add("head1", "headValue1".getBytes());
  8. // 发送消息
  9. producer.send(record);
  10. System.out.println(record);
  11. // 关闭生产者
  12. producer.close();
复制代码
存在的题目:


  • kafka的消息中有 json格式的 key(key内容必要映射到表 schema中)
  • kafka的消息中有 json格式的 value(value内容必要映射到表 schema中)
  • key和 value的 json数据内容中还有同名的字段
  • kafka的消息中有 header(header内容必要映射到表 schema中)

  • 创建FlinkSQL表
    不带key开头的format默认只对value生效:

  1. CREATE TABLE KafkaSourceTable (                                                 
  2.   `meta_time` TIMESTAMP(3) METADATA FROM 'timestamp',   
  3.   `partition` BIGINT METADATA VIRTUAL,                  
  4.   `offset` BIGINT METADATA VIRTUAL,                     
  5.   `headers` MAP<STRING,BYTES> METADATA FROM 'headers'   
  6.   `inKey_k1` STRING,                                    
  7.   `inKey_k2` STRING,                                    
  8.   `k1` STRING,                                          
  9.   `K2` STRING,                                          
  10.   `eventTime` BIGINT,                                   
  11.   `eventID` BIGINT                                      
  12. ) WITH (                                                
  13.   'connector' = 'kafka',                                
  14.   'topic' = 'connector_test',                           
  15.   'properties.bootstrap.servers' = '123.56.100.37:9092',
  16.   'properties.group.id' = 'testGroup',                  
  17.   'scan.startup.mode' = 'earliest-offset',              
  18.   'format' = 'json',                                    
  19.   'json.ignore-parse-errors' = 'true',                  
  20.   'json.fail-on-missing-field' = 'true',               
  21.   'key.fields'='inKey_k1;inKey_k2',                     
  22.   'key.fields-prefix' = 'inKey_',                       
  23.   'value.fields-include' = 'EXCEPT_KEY'                 
  24. );   
复制代码

  • 实验结果


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

杀鸡焉用牛刀

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表