OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理布局化、半布局化和非布局化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的焦点计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和分析型数据库StarRocks搭建流式湖仓。
方案架构和上风
架构
传统的离线数仓通过定时调理离线作业的方式,存在延时长和成本高两大问题。离线作业的调理通常每小时甚至每天才举行一次,数据的消耗者仅能看到上一小时甚至昨天的数据。同时,数据的更新多以覆写(overwrite)分区的方式举行,须要重新读取分区中原有的数据,才能与奇怪变更合并,产生新的效果数据。
基于实时计算Flink版、流式数据湖仓Paimon(使用DLF 2.0作为元数据存储)和EMR StarRocks的OpenLake方案搭建流式湖仓可以解决上述传统离线数仓存在的问题。利用Flink的实时计算能力,数据可以在数仓分层之间实时流动;利用Paimon高效的更新能力,数据变更可以在分钟级的延时内传递给卑鄙消耗者;末了由StarRocks提供查询分析服务。Paimon与Flink深度集成,提供一体化的流式湖仓团结解决方案,在延时和成本上具有双重上风。本文搭建流式湖仓的方案架构如下:
- Flink将数据源写入Paimon,形成ODS层。
- Flink订阅ODS层的变更数据(Changelog)举行加工,形成DWD层再次写入Paimon。
- Flink订阅DWD层的Changelog举行加工,形成DWS层再次写入Paimon。
- 末了由EMR StarRocks读取Paimon外部表,对外提供应用查询。
上风
该方案有如下上风:
- Paimon的每一层数据都可以在分钟级的延时内将变更传递给卑鄙,将传统离线数仓的延时从小时级甚至天级低落至分钟级。
- Paimon的每一层数据都可以直接担当变更数据,无需覆写分区,极大地低落了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
- 模子同一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据同一存储在Paimon中,可以低落架构复杂度,提高数据处置惩罚服从。
该方案依靠于Paimon的三个焦点能力,详情如下表所示。
Paimon焦点能力
| 详情
| 主键表更新
| Paimon底层使用LSM Tree数据布局,可以实现高效的数据更新。
关于Paimon主键表、Paimon底层数据布局的介绍请拜见Primary Key Table和File Layouts。
| 增量数据产生机制(Changelog Producer)
| Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据),保证数据变更可以完整地传递给卑鄙。详情请拜见增量数据产生机制。
| 数据合并机制(Merge Engine)
| 当Paimon主键表收到多条具有相同主键的数据时,为了保持主键的唯一性,Paimon效果表会将这些数据合并成一条数据。Paimon支持去重、部分更新、预聚合等丰富多样的数据合并行为,详情请拜见数据合并机制。
| 实践场景
本文以某个电商平台为例,通过搭建一套流式湖仓,实现数据的加工洗濯,并支持上层应用对数据的查询。流式湖仓实现了数据的分层和复用,并支撑各个业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化保举等多个业务场景。
- 构建ODS层:业务数据库实时入仓MySQL有orders(订单表),orders_pay(订单支付表)和product_catalog(商品类别字典表)三张业务表,这三张表通过Flink实时写入OSS,并以Paimon格式举行存储,作为ODS层。
- 构建DWD层:主题宽表将订单表、商品类别字典表、订单支付表利用Paimon的部分更新(partial-update)合并机制举行打宽,以分钟级延时生成DWD层宽表并产出变更数据(Changelog)。
- 构建DWS层:指标计算Flink实时消耗宽表的变更数据,利用Paimon的预聚合(aggregation)合并机制产出DWM层dwm_users_shops(用户-商户聚合中间表),并终极产出DWS层dws_users(用户聚合指标表)以及dws_shops(商户聚合指标表)。
条件条件
- 已开通数据湖构建,保举使用DLF 2.0提供作为存储服务,详情请拜见快速使用DLF。
- 已开通Flink工作空间,详情请拜见开通实时计算Flink版。
- 已开通EMR的StarRocks,详情请拜见快速使用存算分离版实例。
说明
StarRocks实例、DLF数据目次须要与Flink工作空间处于相同地域。
使用限定
仅实时计算引擎VVR 8.0.9及以上版本支持该流式湖仓方案。
OpenLake数据摄取
预备MySQL数据源
本文以云数据库RDS MySQL版为例,创建数据库名称为order_dw,并创建三张业务表及数据。
- 快速创建RDS MySQL实例。
紧张
RDS MySQL版实例须要与Flink工作空间处于同一VPC。不在同一VPC下时请拜见怎样访问跨VPC的其他服务?
- 创建数据库和账号。
创建名称为order_dw的数据库,并创建高权限账号或具有数据库order_dw读写权限的普通账号。
创建三张表,并插入相应数据。
- CREATE TABLE `orders` (
- order_id bigint not null primary key,
- user_id varchar(50) not null,
- shop_id bigint not null,
- product_id bigint not null,
- buy_fee bigint not null,
- create_time timestamp not null,
- update_time timestamp not null default now(),
- state int not null
- );
- CREATE TABLE `orders_pay` (
- pay_id bigint not null primary key,
- order_id bigint not null,
- pay_platform int not null,
- create_time timestamp not null
- );
- CREATE TABLE `product_catalog` (
- product_id bigint not null primary key,
- catalog_name varchar(50) not null
- );
- -- 准备数据
- INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
- INSERT INTO orders VALUES
- (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
- (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
- (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
- (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
- (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
- (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
- (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
- INSERT INTO orders_pay VALUES
- (2001, 100001, 1, '2023-02-15 17:40:56'),
- (2002, 100002, 1, '2023-02-15 17:40:56'),
- (2003, 100003, 0, '2023-02-15 17:40:56'),
- (2004, 100004, 0, '2023-02-15 17:40:56'),
- (2005, 100005, 0, '2023-02-15 18:40:56'),
- (2006, 100006, 0, '2023-02-15 18:40:56'),
- (2007, 100007, 0, '2023-02-15 18:40:56');
复制代码 创建MySQL Catalog
- 在元数据管理页面,单击创建Catalog。
- 在内置Catalog页签,单击MySQL,单击下一步。
- 填写以下参数,单击确定,新建名为mysqlcatalog的MySQL Catalog。
配置项
| 说明
| 是否必填
| 备注
| catalog name
| Catalog名称。
| 是
| 填写为自界说的英文名。本文以mysqlcatalog为例。
| hostname
| MySQL数据库的IP地址大概Hostname。
| 是
| 详情请拜见查看和管理实例毗连地址和端口。由于RDS MySQL版实例和Flink全托管处于相同VPC,此处应填写内网地址。
| port
| MySQL数据库服务的端口号,默认值为3306。
| 否
| 详情请拜见查看和管理实例毗连地址和端口。
| default-database
| 默认的MySQL数据库名称。
| 是
| 本文填写须要同步的数据库名order_dw。
| username
| MySQL数据库服务的用户名。
| 是
| 本文为预备MySQL数据源中创建的账号和密码。
| password
| MySQL数据库服务的密码。
| 是
| 关于MySQL Catalog的更多使用方法详情请拜见管理MySQL Catalog。
创建Paimon Catalog
- 登录实时计算控制台。
- 在左侧导航栏,选择元数据管理页面,单击创建Catalog。
- 在内置Catalog页签,单击Apache Paimon,单击下一步。
- 填写以下参数,选择DLF 2.0作为存储类型,单击确定。
配置项
| 说明
| 是否必填
| 备注
| metastore
| 元数据存储类型。
| 是
| 此示例选择为dlf存储类型。
| catalog name
| DLF数据目次名称。
紧张
使用RAM用户或角色时,请确保拥有DLF数据读写权限,详情请拜见授权管理。
| 是
| 保举使用DLF 2.0,无需您再填写AccessKey等信息,支持快速选择已创建的DLF数据目次,创建数据目次利用请拜见数据目次。
本示例选择名称为paimoncatalog的数据目次。
| 关于Paimon Catalog的更多使用方法详情请拜见管理Paimon Catalog。
构建ODS层:业务数据库实时入仓
基于CREATE DATABASE AS(CDAS)语句功能,可以一次性将ODS层建出来,实现MySQL数据同步至Paimon。
- 创建CDAS同步作业。
- 在实时计算控制台的数据开辟 > ETL页面,新建名为ods的SQL流作业,将如下代码复制到SQL编辑器。
- SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; -- 减轻检查点长尾的影响。
- SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 消除无用的SinkMaterialize算子。
- -- Paimon结果表在每次检查点完成之后才会正式提交数据。
- -- 此处将检查点间隔缩短为10s,是为了更快地看到结果。
- -- 在生产环境下,系统检查点的间隔与两次系统检查点之间的最短时间间隔根据业务对延时要求的不同,一般设置为1分钟到10分钟。
- SET 'execution.checkpointing.interval' = '10s';
- SET 'execution.checkpointing.min-pause' = '10s';
- CREATE DATABASE IF NOT EXISTS paimoncatalog.order_dw
- WITH (
- 'changelog-producer' = 'input' -- 因为输入数据就是MySQL产生的binlog,已经是完整的变更数据,所以可以直接把输入数据作为变更数据。
- ) AS DATABASE mysqlcatalog.order_dw INCLUDING all tables; -- 也可以根据需要选择上游数据库需要入仓的表。
复制代码 Paimon写入性能优化请拜见Paimon性能优化。
- 单击右上方的摆设,举行作业摆设。
- 在运维中心 > 作业运维,单击刚刚摆设的ods作业利用列的启动,选择无状态启动启动作业。作业启动配置详情请拜见作业启动。
- 查看MySQL同步到Paimon的三张表的数据。
在实时计算控制台的数据开辟 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片断后单击右上角的运行。
- SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;
复制代码
OpenLake数据ETL加工
构建DWD层:主题宽表
- 创建DWD层Paimon宽表dwd_orders。
在实时计算控制台的数据开辟 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片断后单击右上角的运行。
- CREATE TABLE paimoncatalog.order_dw.dwd_orders (
- order_id BIGINT,
- order_user_id STRING,
- order_shop_id BIGINT,
- order_product_id BIGINT,
- order_product_catalog_name STRING,
- order_fee BIGINT,
- order_create_time TIMESTAMP,
- order_update_time TIMESTAMP,
- order_state INT,
- pay_id BIGINT,
- pay_platform INT COMMENT 'platform 0: phone, 1: pc',
- pay_create_time TIMESTAMP,
- PRIMARY KEY (order_id) NOT ENFORCED
- ) WITH (
- 'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
- 'changelog-producer' = 'lookup' -- 使用lookup增量数据产生机制以低延时产出变更数据
- );
复制代码 返回Query has been executed表现创建乐成。
- 实时消耗ODS层orders、orders_pay表的变更数据。
在实时计算控制台的数据开辟 > ETL页面,新建名为dwd的SQL流作业,并将如下代码复制到SQL编辑器后,摆设作业并无状态启动作业。
通过该SQL作业,orders表会与product_catalog表举行维表关联,关联后的效果将与orders_pay一起写入dwd_orders表中,利用Paimon表的部分更新数据合并机制,将orders表和orders_pay表中order_id相同的数据举行打宽。
- SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
- SET 'table.exec.sink.upsert-materialize' = 'NONE';
- SET 'execution.checkpointing.interval' = '10s';
- SET 'execution.checkpointing.min-pause' = '10s';
- -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL。
- INSERT INTO paimoncatalog.order_dw.dwd_orders
- SELECT
- o.order_id,
- o.user_id,
- o.shop_id,
- o.product_id,
- dim.catalog_name,
- o.buy_fee,
- o.create_time,
- o.update_time,
- o.state,
- NULL,
- NULL,
- NULL
- FROM
- paimoncatalog.order_dw.orders o
- LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
- ON o.product_id = dim.product_id
- UNION ALL
- SELECT
- order_id,
- NULL,
- NULL,
- NULL,
- NULL,
- NULL,
- NULL,
- NULL,
- NULL,
- pay_id,
- pay_platform,
- create_time
- FROM
- paimoncatalog.order_dw.orders_pay;
复制代码 - 查看宽表dwd_orders的数据。
在实时计算控制台的数据开辟 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片断后单击右上角的运行。
- SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;
复制代码
构建DWS层:指标计算
- 创建DWS层的聚合表dws_users以及dws_shops。
在实时计算控制台的数据开辟 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片断后单击右上角的运行。
- -- 用户维度聚合指标表。
- CREATE TABLE paimoncatalog.order_dw.dws_users (
- user_id STRING,
- ds STRING,
- payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
- PRIMARY KEY (user_id, ds) NOT ENFORCED
- ) WITH (
- 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
- 'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
- -- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
- );
- -- 商户维度聚合指标表。
- CREATE TABLE paimoncatalog.order_dw.dws_shops (
- shop_id BIGINT,
- ds STRING,
- payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
- uv BIGINT COMMENT '当日不同购买用户总人数',
- pv BIGINT COMMENT '当日购买用户总人次',
- PRIMARY KEY (shop_id, ds) NOT ENFORCED
- ) WITH (
- 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
- 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
- 'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
- 'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
- -- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
- );
- -- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
- CREATE TABLE paimoncatalog.order_dw.dwm_users_shops (
- user_id STRING,
- shop_id BIGINT,
- ds STRING,
- payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
- pv BIGINT COMMENT '当日用户在商户购买的次数',
- PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
- ) WITH (
- 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
- 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
- 'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
- 'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
- -- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
- 'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
- 'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
- );
复制代码 返回Query has been executed表现创建乐成。
- DWD层dwd_orders表的变更数据。
在实时计算控制台数据开辟 > ETL页签,新建名为dwm的SQL流作业,并将如下代码复制到SQL编辑器后,摆设作业并无状态启动作业。
通过该SQL作业,dwd_orders表的数据会写入dwm_users_shops表中,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消耗总额。同时,自动对1求和,也能算出用户在商户的消耗次数。
- SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
- SET 'table.exec.sink.upsert-materialize' = 'NONE';
- SET 'execution.checkpointing.interval' = '10s';
- SET 'execution.checkpointing.min-pause' = '10s';
- INSERT INTO paimoncatalog.order_dw.dwm_users_shops
- SELECT
- order_user_id,
- order_shop_id,
- DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
- order_fee,
- 1 -- 一条输入记录代表一次消费
- FROM paimoncatalog.order_dw.dwd_orders
- WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
复制代码 - 实时消耗DWM层dwm_users_shops表的变更数据。
在实时计算控制台的数据开辟 > ETL页面,新建名为dws的SQL流作业,并将如下代码复制到SQL编辑器后,摆设作业并无状态启动作业。
通过该SQL作业,dwm_users_shops表的数据会写入dws_users表和dws_shops表中,利用Paimon表的预聚合数据合并机制,在dws_users表中,计算每个用户的总消耗额(payed_buy_fee_sum),在dws_shops表中计算商户的总流水(payed_buy_fee_sum),商户的消耗用户数目(对1求和)和消耗总人次(pv)。
- SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
- SET 'table.exec.sink.upsert-materialize' = 'NONE';
- SET 'execution.checkpointing.interval' = '10s';
- SET 'execution.checkpointing.min-pause' = '10s';
- -- 与dwd不同,此处每一条INSERT语句写入的是不同的Paimon表,可以放在同一个作业中。
- BEGIN STATEMENT SET;
- INSERT INTO paimoncatalog.order_dw.dws_users
- SELECT
- user_id,
- ds,
- payed_buy_fee_sum
- FROM paimoncatalog.order_dw.dwm_users_shops;
- -- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
- -- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
- INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
- SELECT
- shop_id,
- ds,
- payed_buy_fee_sum,
- 1, -- 一条输入记录代表一名用户在该商户的所有消费
- pv
- FROM paimoncatalog.order_dw.dwm_users_shops;
- END;
复制代码 - 查看dws_users表和dws_shops表的数据
在实时计算控制台的数据开辟 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片断后单击右上角的运行。
- --查看dws_users表数据
- SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
复制代码
- --查看dws_shops表数据
- SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;
复制代码
捕获业务数据库的变革
前面已完成了流式湖仓的构建,下面将测试流式湖仓捕获业务数据库变革的能力。
- 向MySQL的order_dw数据库中插入如下数据。
- INSERT INTO orders VALUES
- (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
- (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
- (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
- INSERT INTO orders_pay VALUES
- (2008, 100008, 1, '2023-02-15 18:40:56'),
- (2009, 100009, 1, '2023-02-15 19:40:56'),
- (2010, 100010, 0, '2023-02-15 20:40:56');
复制代码 - 查看dws_users表和dws_shops表的数据。 在实时计算控制台的数据开辟 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片断后单击右上角的运行。
- dws_users表
- SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
复制代码 -
- dws_shops表
- SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;
复制代码 -
OpenLake数据查询分析
上一小节展示了在Flink中举行Paimon Catalog的创建与Paimon表的写入。本节展示流式湖仓搭建完成后,利用StarRocks举行数据分析的一些简单应用场景。
您须要登录StarRocks实例,并已完成创建oss-paimon的catalog,详情请拜见Paimon数据源。
创建示例
排名查询
对DWS层聚合表举行分析。本文使用StarRocks查询23年2月15日交易额前三高的商户的代码示比方下。
- SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum
- FROM dws_shops
- WHERE ds = '20230215'
- ORDER BY rn LIMIT 3;
复制代码
明细查询
对DWD层宽表举行分析。本文使用StarRocks查询某个客户23年2月特定支付平台支付的订单明细的代码示比方下。
- SELECT * FROM dwd_orders
- WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
- AND order_user_id = 'user_001'
- AND pay_platform = 0
- ORDER BY order_create_time;;
复制代码
数据报表
对DWD层宽表举行分析。本文使用StarRocks查询23年2月内每个品类的订单总量和订单总金额的代码示比方下。
- SELECT
- order_create_time AS order_create_date,
- order_product_catalog_name,
- COUNT(*),
- SUM(order_fee)
- FROM
- dwd_orders
- WHERE
- order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
- GROUP BY
- order_create_date, order_product_catalog_name
- ORDER BY
- order_create_date, order_product_catalog_name;
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |