Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 StarRocks ...

打印 上一主题 下一主题

主题 811|帖子 811|积分 2433

本教程将展示怎样利用 Flink CDC 快速构建从 MySQL 到 StarRocks 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中实行,整个过程利用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。
预备

预备一台安装了 Docker 的 Linux 或 MacOS 电脑。
预备 Flink Standalone 集群

下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。
利用以下下令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。
  1. cd flink-1.18.0
复制代码
通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒实行一次检查点。
  1. execution.checkpointing.interval: 3000
复制代码
利用以下下令启动 Flink 集群。
  1. ./bin/start-cluster.sh
复制代码
如果启动乐成,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。

多次实行start-cluster.sh可以启动多个TaskManager。
预备 docker compose

以下教程将利用 docker-compose 预备所需的组件。利用下面提供的内容创建 docker-compose.yml 文件:
  1. version: '2.1'
  2. services:
  3.    StarRocks:
  4.       image: starrocks/allin1-ubuntu:3.2.6
  5.       ports:
  6.          - "8080:8080"
  7.          - "9030:9030"
  8.    MySQL:
  9.       image: debezium/example-mysql:1.1
  10.       ports:
  11.          - "3306:3306"
  12.       environment:
  13.          - MYSQL_ROOT_PASSWORD=123456
  14.          - MYSQL_USER=mysqluser
  15.          - MYSQL_PASSWORD=mysqlpw
复制代码
Docker Compose 应包含以下服务(容器):


  • MySQL:包含一个名为 app_db 的数据库
  • StarRocks:存储来自 MySQL 的表
  1. docker-compose up -d
复制代码
该下令会主动以分离模式启动 Docker Compose 配置中界说的所有容器。运行 docker ps 检查这些容器是否正常运行。您也可以访问 http://localhost:8030/ 检查 StarRocks 是否正在运行。
为 MySQL 预备记录

进入 MySQL 容器
  1. docker-compose exec mysql mysql -uroot -p123456
复制代码
创建 app_db 数据库和订单、产物、发货表,然后插入记录
  1. -- create database
  2. CREATE DATABASE app_db;
  3. USE app_db;
  4. -- create orders table
  5. CREATE TABLE `orders` (
  6. `id` INT NOT NULL,
  7. `price` DECIMAL(10,2) NOT NULL,
  8. PRIMARY KEY (`id`)
  9. );
  10. -- insert records
  11. INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
  12. INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
  13. -- create shipments table
  14. CREATE TABLE `shipments` (
  15. `id` INT NOT NULL,
  16. `city` VARCHAR(255) NOT NULL,
  17. PRIMARY KEY (`id`)
  18. );
  19. -- insert records
  20. INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
  21. INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
  22. -- create products table
  23. CREATE TABLE `products` (
  24. `id` INT NOT NULL,
  25. `product` VARCHAR(255) NOT NULL,
  26. PRIMARY KEY (`id`)
  27. );
  28. -- insert records
  29. INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
  30. INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
  31. INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
复制代码
利用 Flink CDC CLI 提交作业



  • 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
    flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。
  • 下载下面列出的连接器包并移动到 lib 目录
    下载链接只针对稳固版本,SNAPSHOT 依赖必要自行基于 master 或 release 分支构建。请注意,必要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。

    • MySQL 管道连接器 3.1.0
    • StarRocks 管道连接器 3.1.0

您还必要将 MySQL 连接器放入 Flink lib 文件夹或利用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:


  • MySQL Connector Java
编写任务配置yaml文件。下面是同步整个数据库的示例文件mysql-to-starrocks.yaml:
  1. ################################################################################
  2. # Description: Sync MySQL all tables to StarRocks
  3. ################################################################################
  4. source:
  5.   type: mysql
  6.   hostname: localhost
  7.   port: 3306
  8.   username: root
  9.   password: 123456
  10.   tables: app_db.\.*
  11.   server-id: 5400-5404
  12.   server-time-zone: UTC
  13. sink:
  14.   type: starrocks
  15.   name: StarRocks Sink
  16.   jdbc-url: jdbc:mysql://127.0.0.1:9030
  17.   load-url: 127.0.0.1:8080
  18.   username: root
  19.   password: ""
  20.   table.create.properties.replication_num: 1
  21. pipeline:
  22.   name: Sync MySQL Database to StarRocks
  23.   parallelism: 2
复制代码
注意:


  • source 中的 tables: app_db..* 通过正则匹配同步 app_db 中的所有表。
  • sink 中的 table.create.properties.replication_num 是因为 Docker 镜像中只有一个 StarRocks BE 节点。
最后,利用Cli将作业提交到Flink Standalone集群。
  1. bash bin/flink-cdc.sh mysql-to-starrocks.yaml
复制代码
提交乐成后返回信息如下:
  1. Pipeline has been submitted to cluster.
  2. Job ID: 02a31c92f0e7bc9a1f4c0051980088a0
  3. Job Description: Sync MySQL Database to StarRocks
复制代码
我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to StarRocks“的作业正在运行。

通过Dbeaver等数据库连接工具利用mysql://127.0.0.1:9030连接jdbc,可以在StarRocks中检察写入三张表的数据。

同步架构和数据更改

进入MySQL容器
  1. docker-compose exec mysql mysql -uroot -p123456
复制代码
然后修改MySQL中的schema和记录,Doris的表也会实时改变:
在MySQL中的orders中插入一条记录:
  1. INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
复制代码
在 MySQL 的订单中添加一列:
  1. ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
复制代码
从 MySQL 更新订单中的一条记录:
  1. UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
复制代码
从 MySQL 中删除订单中的一条记录:
  1. DELETE FROM app_db.orders WHERE id=2;
复制代码
每实行一步革新一下Dbeaver,可以看到StarRocks中展示的订单表会实时更新,如下图:

同样的,通过修改shipping和products表,你也可以在StarRocks中实时看到同步修改的结果。
路由变更

Flink CDC 提供了将源表的表布局/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是利用路由功能的示例文件:
  1. ################################################################################
  2. # Description: Sync MySQL all tables to StarRocks
  3. ################################################################################
  4. source:
  5.    type: mysql
  6.    hostname: localhost
  7.    port: 3306
  8.    username: root
  9.    password: 123456
  10.    tables: app_db.\.*
  11.    server-id: 5400-5404
  12.    server-time-zone: UTC
  13. sink:
  14.    type: starrocks
  15.    jdbc-url: jdbc:mysql://127.0.0.1:9030
  16.    load-url: 127.0.0.1:8030
  17.    username: root
  18.    password: ""
  19.    table.create.properties.replication_num: 1
  20. route:
  21.    - source-table: app_db.orders
  22.      sink-table: ods_db.ods_orders
  23.    - source-table: app_db.shipments
  24.      sink-table: ods_db.ods_shipments
  25.    - source-table: app_db.products
  26.      sink-table: ods_db.ods_products
  27. pipeline:
  28.    name: Sync MySQL Database to StarRocks
  29.    parallelism: 2
复制代码
通过上面的路由配置,我们可以将app_db.orders的表布局和数据同步到ods_db.ods_orders中,从而实现数据库迁徙的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:
  1. route:
  2.   - source-table: app_db.order\.*
  3.     sink-table: ods_db.ods_orders
复制代码
这样我们就可以将app_db.order01、app_db.order02、app_db.order03中分片表同步到一张ods_db.ods_orders表中了。
注意,目前还不支持多张表存在雷同主键数据的场景,后续版本会支持。
清理

完成教程后,运行以下下令制止docker-compose.yml目录中的所有容器:
  1. docker-compose down
复制代码
在Flink flink-1.18.0目录下,实行以下下令制止Flink集群:
  1. ./bin/stop-cluster.sh
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

光之使者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表