ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink-StarRocks详解:第六部门-即席查询大案例解析(第56天) [打印本页]

作者: 种地    时间: 2024-8-9 14:13
标题: Flink-StarRocks详解:第六部门-即席查询大案例解析(第56天)
系列文章目次


  

前言

本文为Flink-StarRocks详解后续章节:主要详解StarRocks数仓场景:即席查询大案例
6. 数仓场景:即席查询案例

本文通过示例介绍如何基于EMR Serverless StarRocks的视图能力构建数仓场景-即席查询办理方案。
6.1 场景介绍

随着向量化、CBO(Cost Based Optimizer,基于代价的优化器)、单机多核调理等技术的应用,StarRocks的盘算能力逐步提升。许多时候在使用StarRocks举行数仓分层建模时,大部门将数据建模到DWD层(底子整合层)或DWS层(维度宽度)。在实际业务中,运用StarRocks的盘算能力,可以直接查询DWD或DWS层数据,还可以机动地交互式即席查询。
6.2 方案架构

使用StarRocks实现数仓场景即席查询的基本架构如下图所示。

整体数据流如下:
(1)Flink洗濯导入Kafka的日志大概通过Flink-CDC-StarRocks工具读取MySQL Binlog导入StarRocks。根据需要选用明细、聚合、更新或主键各种模子,只物理落地ODS层(格式整理层)。
(2)向上接纳StarRocks View视图能力,利用StarRocks向量化极速查询和CBO优化器满足多表关联、嵌套子查询等复杂SQL,查询时现场盘算指标结果,保证指标上卷和下钻高度同源同等。
6.3 方案特点

该方案主要特点是,盘算逻辑在StarRocks侧(现场查询),实用于业务库高频数据更新的场景,实体数据只在ODS或DWD层存储。
 方案优势
 机动性强,可随时根据业务逻辑调整View。
 指标修正简单,上层都是View逻辑封装,只需要更新底表数据。
 方案缺点
当View的逻辑较为复杂,数据量较多时,查询性能较低。
 实用场景
 数据泉源于数据库和埋点系统,适合对QPS要求不高,对机动性要求比力高,且盘算资源较为充足的场景。
 实时要求非常高,要求写入即可查,更新即反馈。适合有即席查询需求,且资源较为充足,查询复杂度较低的场景。
6.4 利用流程

6.4.1 步骤一:创建MySQL源数据表

(1)登录DMS
(2)创建库和表
  1. CREATE DATABASE IF NOT EXISTS flink_cdc;
  2. create table flink_cdc.orders (
  3.    order_id INT NOT NULL AUTO_INCREMENT,
  4.    order_revenue FLOAT NOT NULL,
  5.    order_region VARCHAR(40) NOT NULL,
  6.    customer_id INT NOT NULL,
  7.    PRIMARY KEY ( order_id )
  8. );
  9. create table flink_cdc.customers (
  10.    customer_id INT NOT NULL,
  11.    customer_age INT NOT NULL,
  12.    customer_name VARCHAR(40) NOT NULL,
  13.    PRIMARY KEY ( customer_id )
  14. );
复制代码
6.4.2 步骤二:创建StarRocks表

(1)登录EMR StarRocks Manager
(2)创建库和表
  1. CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  2. CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
  3.   `customer_id` INT NOT NULL  COMMENT "",
  4.   `customer_age` FLOAT NOT NULL  COMMENT "",
  5.   `customer_name` STRING NOT NULL  COMMENT ""
  6. ) ENGINE=olap
  7. PRIMARY KEY(`customer_id`)
  8. COMMENT ""
  9. DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
  10. PROPERTIES (
  11.   "replication_num" = "1"
  12. );
  13. CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
  14.   `order_id` INT NOT NULL  COMMENT "",
  15.   `order_revenue` FLOAT NOT NULL  COMMENT "",
  16.   `order_region` STRING NOT NULL  COMMENT "",
  17.   `customer_id` INT NOT NULL  COMMENT ""
  18. ) ENGINE=olap
  19. PRIMARY KEY(`order_id`)
  20. COMMENT ""
  21. DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
  22. PROPERTIES (
  23.   "replication_num" = "1"
  24. );
复制代码
(3)基于ODS表创建DWD视图
  1. CREATE VIEW flink_cdc.dwd_order_customer_valid (
  2.   order_id,
  3.   order_revenue,
  4.   order_region,
  5.   customer_id,
  6.   customer_age,
  7.   customer_name
  8. )
  9. AS
  10. SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name
  11. FROM flink_cdc.customers c JOIN flink_cdc.orders o
  12. ON c.customer_id=o.customer_id
  13. WHERE c.customer_id != -1;
复制代码
(4)基于DWD表创建DWS视图
  1. CREATE VIEW flink_cdc.dws_agg_by_region (
  2.   order_region,
  3.   order_cnt,
  4.   order_total_revenue)
  5. AS
  6. SELECT order_region, count(order_region), sum(order_revenue)
  7. FROM flink_cdc.dwd_order_customer_valid
  8. GROUP BY order_region;
复制代码
6.4.3 步骤三:执行Flink任务,启动数据流

(1)打开阿里云flink控制台
(2)创建MySQL CDC映射表
注意:hostname等需要根据自己的实际情况举行修改。
  1. CREATE DATABASE IF NOT EXISTS `vvp`.`flinkcdc`;
  2. CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`customers_src` (
  3.   `customer_id` INT NOT NULL,
  4.   `customer_age` FLOAT NOT NULL,
  5.   `customer_name` STRING NOT NULL,
  6.   PRIMARY KEY(`customer_id`) NOT ENFORCED
  7. ) with (
  8. 'connector' = 'mysql',
  9. 'hostname' = 'rm-cn-x0r3fp1lj000qa.rwlb.rds.aliyuncs.com',
  10. 'port' = '3306',
  11. 'username' = 'xxxxxx',
  12. 'password' = 'xxxxxx',
  13. 'database-name' = 'flink_cdc',
  14. 'table-name' = 'customers'
  15. );
  16. CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`orders_src` (
  17.   `order_id` INT NOT NULL,
  18.   `order_revenue` FLOAT NOT NULL,
  19.   `order_region` STRING NOT NULL,
  20.   `customer_id` INT NOT NULL,
  21.   PRIMARY KEY(`order_id`) NOT ENFORCED
  22. ) with (
  23.   'connector' = 'mysql',
  24.   'hostname' = 'rm-cn-x0r3fp1lj000qa.rwlb.rds.aliyuncs.com',
  25.   'port' = '3306',
  26.   'username' = 'xxxx',
  27.   'password' = 'xxxxxx!',
  28.   'database-name' = 'flink_cdc',
  29.   'table-name' = 'orders'
  30. );
复制代码
(3)创建StarRocks映射表
注意:jdbc-url、load-url等需要根据自己的实际情况举行修改。查询位置为EMR控制台-》StarRocks-》点击实例-》实例详情
  1. CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`customers_sink` (
  2.   `customer_id` INT NOT NULL,
  3.   `customer_age` FLOAT NOT NULL,
  4.   `customer_name` STRING NOT NULL,
  5.   PRIMARY KEY(`customer_id`)
  6. NOT ENFORCED
  7. ) with (
  8.   'connector' = 'starrocks'
  9.   ,'jdbc-url' = 'jdbc:mysql://fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:9030'
  10.   ,'load-url' = 'fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:8030'
  11.   ,'database-name' = 'flink_cdc'
  12.   ,'table-name' = 'customers'
  13.   ,'username' = 'xxxxxx'
  14.   ,'password' = 'xxxxxx'
  15.   ,'sink.buffer-flush.interval-ms' = '5000'
  16.   ,'sink.semantic' = 'exactly-once'
  17. );
  18. CREATE TABLE IF NOT EXISTS `vvp`.`flinkcdc`.`orders_sink` (
  19.   `order_id` INT NOT NULL,
  20.   `order_revenue` FLOAT NOT NULL,
  21.   `order_region` STRING NOT NULL,
  22.   `customer_id` INT NOT NULL,
  23.   PRIMARY KEY(`order_id`)
  24. NOT ENFORCED
  25. ) with (
  26.   'connector' = 'starrocks'
  27.   ,'jdbc-url' = 'jdbc:mysql://fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:9030'
  28.   ,'load-url' = 'fe-c-838fd7a4db1550a6-internal.starrocks.aliyuncs.com:8030'
  29.   ,'database-name' = 'flink_cdc'
  30.   ,'table-name' = 'orders'
  31.   ,'username' = 'xxxxxx''
  32.   ,'password' = 'xxxxxx'
  33.   ,'sink.buffer-flush.interval-ms' = '5000'
  34.   ,'sink.semantic' = 'exactly-once'
  35. );
复制代码
 参数含义


(4)将MySQL数据插入到StarRocks
以下代码写到一个流作业中,然后摆设运行。
需要无状态启动,并且设置checkpoint周期为5秒

  1. BEGIN STATEMENT SET;
  2. INSERT INTO `vvp`.`flinkcdc`.`customers_sink` SELECT * FROM `vvp`.`flinkcdc`.`customers_src`;
  3. INSERT INTO `vvp`.`flinkcdc`.`orders_sink` SELECT * FROM `vvp`.`flinkcdc`.`orders_src`;
  4. END;
复制代码
6.4.4 步骤四:验证数据

  1. (1)在RDS数据库窗口执行以下命令,向表orders和customers中插入数据。
  2. INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
  3. INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
  4. INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
复制代码
(2)在EMR StarRocks Manager中举行查询
1)查看orders表信息
  1. select * from flink_cdc.orders;
复制代码

2)查看customers表信息
  1. select * from flink_cdc.customers;
复制代码

3)查询DWD层数据
  1. select * from flink_cdc.dwd_order_customer_valid;
复制代码

4)查询DWS层数据
  1. select * from flink_cdc.dws_agg_by_region;
复制代码

(3)在RDS数据库窗口执行以下命令,从orders表中删除一条记载
  1. DELETE FROM flink_cdc.orders where order_id = 2;
复制代码
(4)再次在EMR StarRocks Manager中举行查询,查看变化
1)查询DWD层数据
  1. select * from flink_cdc.dwd_order_customer_valid;
复制代码

2)查询DWS层数据
  1. select * from flink_cdc.dws_agg_by_region;
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4