详解 Flink Catalog 在 ChunJun 中的实践之路

打印 上一主题 下一主题

主题 697|帖子 697|积分 2091

我们知道 Flink 有Table(表)、View(视图)、Function(函数/算子)、Database(数据库)的概念,相对于这些耳熟能详的概念,Flink 里还有一个 Catalog(目录) 的概念。
本文将为大家带来 Flink Catalog 的介绍以及 Flink Catalog 在 ChunJun 中的实践之路。
Flink Catalog 简介

Catalog 提供元数据,如数据库、表、分区、视图,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息。
Flink Catalog 作用

数据处理中最关键的一个方面是管理元数据:
· 可能是暂时性的元数据,如临时表,或针对表环境注册的 UDFs;
· 或者是永久性的元数据,比如 Hive 元存储中的元数据。
Catalog 提供了一个统一的 API 来管理元数据,并使其可以从表 API 和 SQL 查询语句中来访问。
Catalog 使用户能够引用他们数据系统中的现有元数据,并自动将它们映射到 Flink 的相应元数据。例如,Flink 可以将 JDBC 表自动映射到 Flink 表,用户不必在 Flink 中手动重写 DDL。Catalog 大大简化了用户现有系统开始使用 Flink 所需的步骤,并增强了用户体验。
Flink Catalog 的结构

● Flink Catalog 原生结构
• GenericInMemoryCatalog:基于内存实现的 Catalog
• Jdbc Catalog:可以将 Flink 通过 JDBC 协议连接到关系数据库,目前 Flink 在1.12和1.13中有不同的实现,包括 MySql Catalog 和 Postgres Catalog
• Hive Catalog:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口
● Flink Iceberg Catalog
● Flink Hudi Catalog
HoodieCatalog、HoodieHiveCatalog


Flink Catalog 详解

GenericInMemoryCatalog
  1. final CatalogManager catalogManager =
  2.         CatalogManager.newBuilder()
  3.                 .classLoader(userClassLoader)
  4.                 .config(tableConfig)
  5.                 .defaultCatalog(
  6.                         settings.getBuiltInCatalogName(),
  7.                         new GenericInMemoryCatalog(
  8.                                 settings.getBuiltInCatalogName(),
  9.                                 settings.getBuiltInDatabaseName()))
  10.                 .build();
  11.         
  12. defaultCatalog =
  13.                     new GenericInMemoryCatalog(
  14.                             defaultCatalogName, settings.getBuiltInDatabaseName());
  15. CatalogManager catalogManager =
  16.                 builder.defaultCatalog(defaultCatalogName, defaultCatalog).build();
复制代码
GenericInMemoryCatalog 所有的数据都保存在 HashMap 里面,无法持久化。
JDBC Catalog
  1. CREATE CATALOG my_catalog WITH(
  2.     'type' = 'jdbc',
  3.     'default-database' = '...',
  4.     'username' = '...',
  5.     'password' = '...',
  6.     'base-url' = '...'
  7. );
  8. USE CATALOG my_catalog;
复制代码
如果创建并使用 Postgres Catalog 或 MySQL Catalog,请配置 JDBC 连接器和相应的驱动。
JDBC Catalog 支持以下参数:
• name:必填,Catalog 的名称
• default-database:必填,默认要连接的数据库
• username:必填,Postgres/MySQL 账户的用户名
• password:必填,账户的密码
• base-url: 必填,(不应该包含数据库名)
对于 Postgres Catalog base-url 应为 "jdbc:postgresql://:" 的格式
对于 MySQL   Catalog base-url 应为 "jdbc:mysql://:" 的格式
Hive Catalog
  1. CREATE CATALOG myhive WITH (
  2.     'type' = 'hive',
  3.     'default-database' = 'mydatabase',
  4.     'hive-conf-dir' = '/opt/hive-conf'
  5. );
  6. -- set the HiveCatalog as the current catalog of the session
  7. USE CATALOG myhive;
复制代码

Iceberg Catalog

● Hive Catalog 管理 Iceberg 表
  1. (Flink) default_database.flink_table ->
  2. (Iceberg) default_database.flink_table
  3. CREATE TABLE flink_table (
  4.     id   BIGINT,
  5.     data STRING
  6. ) WITH (
  7.     'connector'='iceberg',
  8.     'catalog-name'='hive_prod',
  9.     'uri'='thrift://localhost:9083',
  10.     'warehouse'='hdfs://nn:8020/path/to/warehouse'
  11. );
  12. (Flink)default_database.flink_table ->
  13. (Iceberg) hive_db.hive_iceberg_table
  14. CREATE TABLE flink_table (
  15.     id   BIGINT,
  16.     data STRING
  17. ) WITH (
  18.     'connector'='iceberg',
  19.     'catalog-name'='hive_prod',
  20.     'catalog-database'='hive_db',
  21.     'catalog-table'='hive_iceberg_table',
  22.     'uri'='thrift://localhost:9083',
  23.     'warehouse'='hdfs://nn:8020/path/to/warehouse'
  24. );
复制代码
● Hadoop Catalog 管理 Iceberg 表
  1. CREATE TABLE flink_table (
  2.     id   BIGINT,
  3.     data STRING
  4. ) WITH (
  5.     'connector'='iceberg',
  6.     'catalog-name'='hadoop_prod',
  7.     'catalog-type'='hadoop',
  8.     'warehouse'='hdfs://nn:8020/path/to/warehouse'
  9. );
复制代码
● 自定义 Catalog 管理 Iceberg 表
  1. CREATE TABLE flink_table (
  2.     id   BIGINT,
  3.     data STRING
  4. ) WITH (
  5.     'connector'='iceberg',
  6.     'catalog-name'='custom_prod',
  7.     'catalog-impl'='com.my.custom.CatalogImpl',
  8.      -- More table properties for the customized catalog
  9.     'my-additional-catalog-config'='my-value',
  10.      ...
  11. );
复制代码
• connector:iceberg
• catalog-name:用户指定的目录名称,这是必须的,因为连接器没有任何默认值
• catalog-type:内置目录的 hive 或 hadoop(默认为hive),或者对于使用 catalog-impl 的自定义目录实现,不做设置
• catalog-impl:自定义目录实现的全限定类名,如果 catalog-type 没有被设置,则必须被设置,更多细节请参见自定义目录
• catalog-database: 后台目录中的 iceberg 数据库名称,默认使用当前的 Flink 数据库名称
• catalog-table: 后台目录中的冰山表名,默认使用 Flink CREATE TABLE 句子中的表名
Hudi Catalog
  1. create catalog hudi with(
  2. 'type' = 'hudi',
  3. 'mode' = 'hms',
  4.   'hive.conf.dir'='/etc/hive/conf'
  5. );
  6. --- 创建数据库供hudi使用
  7. create database hudi.hudidb;
  8. --- order表
  9. CREATE TABLE hudi.hudidb.orders_hudi(
  10.   uuid INT,
  11.   ts INT,
  12.   num INT,
  13.   PRIMARY KEY(uuid) NOT ENFORCED
  14. ) WITH (
  15.   'connector' = 'hudi',
  16.   'table.type' = 'MERGE_ON_READ'
  17. );
  18. select * from hudi.hudidb.orders_hudi;
复制代码


Flink Catalog 在 ChunJun 中的实践

下面将为大家介绍本文的重头戏,Flink Catalog 在 ChunJun 中的实践之路。
直接引入开源 Catalog

ChunJun 目前的所有 Catalog 为以下四种:

● Hive Catalog 需要的依赖

● Iceberg Catalog 需要的依赖

● JDBC Catalog
JDBC 因为 Flink 1.12 和 1.13 API 有变化,因此需要涉及源码的改动,改动一些 API 后,从源码引入。
● DT Catalog
结合内部业务,自定义的一种 Catalog ,下文将会进行详细介绍。
DT Catalog -存储元数据表设计

● 创建 mysql 元数据表 database_info
  1. -- 创建表的 sql
  2. create table database_info
  3. (
  4.     `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '项目ID',-- database id
  5.     `catalog_name`  varchar(255) COMMENT 'catalog 名字',
  6.     `database_name` varchar(255) COMMENT 'database 名字',
  7.     `catalog_type`  varchar(30) COMMENT 'catalog 类型, eg: mysql,oracle...',
  8.     `project_id`    int(11)            NOT NULL COMMENT '项目ID',
  9.     `tenant_id`     int(11)            NOT NULL COMMENT '租户ID'
  10. ) ENGINE = InnoDB
  11.   DEFAULT CHARSET = utf8;
  12. -- 创建索引
  13. CREATE INDEX idx_catalog_name_database_name_project_id_tenant_id ON database_info (`catalog_name`, `database_name`, `project_id`, `tenant_id`);
复制代码
● 创建 mysql 元数据表 table_info
  1. -- 创建表的 sql
  2. create table table_info
  3. (
  4.     `id`            bigint PRIMARY KEY NOT NULL AUTO_INCREMENT,
  5.     `database_id`    bigint COMMENT 'database_info 表的 id',
  6.     `table_name`  varchar(255) COMMENT '表名',
  7.     `project_id`    int(11)            NOT NULL COMMENT '项目ID',
  8.     `tenant_id`     int(11)            NOT NULL COMMENT '租户ID'
  9. ) ENGINE = InnoDB
  10.   DEFAULT CHARSET = utf8;
  11. -- 创建索引
  12. CREATE INDEX idx_catalog_id_project_id_tenant_id ON table_info (`database_id`, `project_id`, `tenant_id`);
  13. CREATE INDEX idx_database_id_table_name_project_id_tenant_id ON table_info (`database_id`, `table_name`, `project_id`, `tenant_id`);
复制代码
● 创建 mysql 元数据表 properties_info
  1. create table properties_info
  2. (
  3.     `id`       bigint PRIMARY KEY NOT NULL AUTO_INCREMENT ,
  4.     `table_id` bigint(20) COMMENT 'table_info 表的 id',
  5.     `key`      varchar(255) COMMENT '表的属性 key',
  6.     `value`    varchar(255) COMMENT '表的属性 value'
  7. ) ENGINE = InnoDB
  8.   DEFAULT CHARSET = utf8;
  9. CREATE INDEX idx_table_id ON properties_info (table_id);
复制代码
● properties_info 里面存了什么?
  1. schema.0.name=id,
  2.   schema.0.data-type=INT NOT NULL,
  3.   schema.1.name=name,
  4.   schema.1.data-type=VARCHAR(2147483647)
  5.   schema.2.name=age,
  6.   schema.2.data-type=BIGINT,
  7.   schema.primary-key.name=PK_3386,
  8.   schema.primary-key.columns=id,
  9.   connector=jdbc,
  10.   url=jdbc:mysql: //172.16.83.218:3306/wujuan?useSSL=false,
  11.   username=drpeco,
  12.   password=DT@Stack#123,
  13.   comment=,
  14.   scan.auto-commit=true,
  15.   lookup.cache.max-rows=20000,
  16.   scan.fetch-size=10,
  17.   lookup.cache.ttl=700000
  18.   table-name=t2,
复制代码
使用 DT Catalog

● 创建 DT Catalog
  1. CREATE CATALOG catalog1
  2. WITH (
  3.     'type' = 'dt',
  4.     'default-database' = 'default_database',
  5.     'driver' = 'com.mysql.cj.jdbc.Driver',
  6.     'url' = 'jdbc:mysql://xxx:3306/catalog_default',
  7.     'username' = 'drpeco',
  8.     'password' = 'DT@Stack#123',
  9.     'project-id' = '1',
  10.     'tenant-id' = '1'
  11.   );
复制代码

● 创建 Database
  1. DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
  2. Drop a database with the given database name. If the database to drop does not exist, an exception is thrown.
  3. IF EXISTS
  4. If the database does not exist, nothing happens.
  5. RESTRICT
  6. Dropping a non-empty database triggers an exception. Enabled by default.
  7. CASCADE
  8. Dropping a non-empty database also drops all associated tables and functions.
  9. create database if not exists catalog1.database1
  10. drop database if exists catalog1.database1
  11. -- 删除非空数据库,连通数据库中的所有表也一起删除
  12. drop database if exists catalog1.database1 CASCADE
复制代码
● 创建 Table
1)Rename Table
  1. ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
  2. Rename the given table name to another new table name
复制代码
2)Set or Alter Table Properties
  1. ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
  2. Set one or more properties in the specified table. If a particular property is already set in the table, override the old value with the new one.
复制代码
  1. -- 创建表
  2. CREATE TABLE if not exists catalog1.default_database.table1
  3. (
  4.     id      int,
  5.     name    string,
  6.     age     bigint,
  7.     primary key ( id) not enforced
  8. ) with (
  9.     'connector' = 'jdbc',
  10.     'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
  11.     'table-name' = 't2',
  12.     'username' = 'drpeco',
  13.     'password' = 'DT@Stack#123'
  14. );
复制代码
  1. -- 删除表
  2. drop table if exists mysql_catalog2.wujuan_database2.wujuan_table
  3. -- 重命名表名
  4. ALTER TABLE catalog1.default_database.table1 RENAME TO table2;
  5. -- 设置表属性
  6. ALTER TABLE catalog1.default_database.table1
  7. SET (
  8. 'tablename'='t2',
  9. 'url'='dbc:mysql://172.16.83.218:3306/wujuan?useSSL=false'
  10. )
复制代码
使用 DTCatalog 的具体场景和实现原理

● 全部是 DDL,只有 Catalog 的创建
  1. CREATE CATALOG catalog1
  2. WITH (
  3.     'type' = 'DT',
  4.     'default-database' = 'default_database',
  5.     'driver' = 'com.mysql.cj.jdbc.Driver',
  6.     'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default?autoReconnect=true&failOverReadOnly=false',
  7.     'username' = 'drpeco',
  8.     'password' = 'DT@Stack#123',
  9.     'project-id' = '1',
  10.     'tenant-id' = '1'
  11.   );
  12.         ```
  13.        
  14. · 可以执行,但是没有意义,ChunJun 不会存储 Catalog 信息,只有平台存储;
  15. · 不支持语法校验。
  16. ● 全部是 DDL,包含 Catalog、Database、Table 的创建
复制代码
-- 初始化 Catalog
CREATE CATALOG catalog1
WITH (
'type' = 'dt',
'default-database' = 'default_database',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://172.16.100.186:3306/catalog_default',
'username' = 'drpeco',
'password' = 'DT@Stack#123',
'project-id' = '1',
'tenant-id' = '1'
);
-- 创建数据库
create database if not exists database1
-- 创建表
CREATE TABLE if not exists catalog1.default_database.table1
(
id      int,
name    string,
age     bigint,
primary key ( id) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.83.218:3306/wujuan?useSSL=false',
'table-name' = 't2',
'username' = 'drpeco',
'password' = 'DT@Stack#123'
);
  1. · 无论创建数据库、表,删除数据库、表,必须包含 create catalog 语句;
  2. · 可以执行,可以创建数据库和表;
  3. · 不支持语法校验。
复制代码
// 抛出异常的逻辑
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tEnv);
TableResult execute = statementSet.execute();       -->
tableEnvironment.executeInternal(operations);        -->
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);     -->
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); -->

// 抛出异常的方法
public static StreamGraph generateStreamGraph(StreamExecutionEnvironment execEnv, List

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

种地

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表