MySQL到Doris的StreamingETL实现(Flink CDC 3.0)

打印 上一主题 下一主题

主题 517|帖子 517|积分 1551

MySQL到Doris的StreamingETL实现(Flink CDC 3.0)

1 情况准备

1)安装FlinkCDC
  1. [root@hadoop1 software]$ tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/
复制代码
2)拖入MySQL以及Doris依靠包
将flink-cdc-pipeline-connector-doris-3.0.0.jar以及flink-cdc-pipeline-connector-mysql-3.0.0.jar防止在FlinkCDC的lib目次下
2 同步变动

2)编写MySQL到Doris的同步变动设置文件
在FlinkCDC目次下创建job文件夹,并在该目次下创建同步变动设置文件
  1. [root@hadoop1 flink-cdc-3.0.0]$ mkdir job/
  2. [root@hadoop1 flink-cdc-3.0.0]$ cd job
  3. [root@hadoop1 job]$ vim mysql-to-doris.yaml
  4. source:
  5. type: mysql
  6. hostname: hadoop3
  7. port: 3306
  8. username: root
  9. password: "123456"
  10. tables: test.\.
  11. server-id: 5400-5404
  12. server-time-zone: UTC+8
  13. sink:
  14. type: doris
  15. fenodes: hadoop1:7030
  16. username: root
  17. password: "123456"
  18. table.create.properties.light_schema_change: true
  19. table.create.properties.replication_num: 1
  20. pipeline:
  21. name: Sync MySQL Database to Doris
  22. parallelism: 1
复制代码
3)启动任务并测试
(1)开启Flink集群,注意:在Flink设置信息中打开CheckPoint
  1. [root@hadoop3 flink-1.18.0]$ vim conf/flink-conf.yaml
复制代码
添加如下设置信息
  1. execution.checkpointing.interval: 5000
复制代码
分发该文件至其他Flink呆板并启动Flink集群
  1. [root@hadoop3 flink-1.18.0]$ bin/start-cluster.sh
复制代码
(2)开启Doris FE
  1. [root@hadoop1 fe]$ bin/start_fe.sh
复制代码
(3)开启Doris BE
  1. [root@hadoop1 be]$ bin/start_be.sh
复制代码
(4)在Doris中创建test数据库
  1. [root@hadoop3 doris]$ mysql -uroot -p123456 -P9030 -hhadoop1
  2. mysql: [Warning] Using a password on the command line interface can be insecure.
  3. Welcome to the MySQL monitor.  Commands end with ; or g.
  4. Your MySQL connection id is 0
  5. Server version: 5.7.99 Doris version doris-1.2.4-1-Unknown
  6. Copyright (c) 2000, 2022, Oracle and/or its affiliates.
  7. Oracle is a registered trademark of Oracle Corporation and/or its
  8. affiliates. Other names may be trademarks of their respective
  9. owners.
  10. Type 'help;' or 'h' for help. Type 'c' to clear the current input statement.
  11. mysql> create database test;
  12. Query OK, 1 row affected (0.00 sec)
  13. mysql> create database doris_test_route;
  14. Query OK, 1 row affected (0.00 sec)
复制代码
(5)启动FlinkCDC同步变动任务
  1. [root@hadoop3 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml
复制代码
(6)刷新Doris中test数据库观察结果
(7)在MySQL的test数据中对应的几张表举行新增、修改数据以及新增列操作,并刷新Doris中test数据库观察结果
3 路由变动

1)编写MySQL到Doris的路由变动设置文件
  1. source:
  2. type: mysql
  3. hostname: hadoop3
  4. port: 3306
  5. username: root
  6. password: "123456"
  7. tables: test_route.\.
  8. server-id: 5400-5404
  9. server-time-zone: UTC+8
  10. sink:
  11. type: doris
  12. fenodes: hadoop1:7030
  13. benodes: hadoop1:7040
  14. username: root
  15. password: "123456"
  16. table.create.properties.light_schema_change: true
  17. table.create.properties.replication_num: 1
  18. route:
  19. - source-table: test_route.t1
  20.   sink-table: doris_test_route.doris_t1
  21. - source-table: test_route.t2
  22.   sink-table: doris_test_route.doris_t1
  23. - source-table: test_route.t3
  24.   sink-table: doris_test_route.doris_t3
  25. pipeline:
  26. name: Sync MySQL Database to Doris
  27. parallelism: 1
复制代码
2)启动任务并测试
  1. [root@hadoop3 flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris-route.yaml
复制代码
3)刷新Doris中test_route数据库观察结果
4)在MySQL的test_route数据中对应的几张表举行新增、修改数据操作,并刷新Doris中doris_test_route数据库观察结果

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表