我爱普洱茶 发表于 2024-8-4 11:35:49

MySQL到Doris的StreamingETL实现(Flink CDC 3.0)

MySQL到Doris的StreamingETL实现(Flink CDC 3.0)

1 情况准备

1)安装FlinkCDC
$ tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/
2)拖入MySQL以及Doris依靠包
将flink-cdc-pipeline-connector-doris-3.0.0.jar以及flink-cdc-pipeline-connector-mysql-3.0.0.jar防止在FlinkCDC的lib目次下
2 同步变动

2)编写MySQL到Doris的同步变动设置文件
在FlinkCDC目次下创建job文件夹,并在该目次下创建同步变动设置文件
$ mkdir job/

$ cd job

$ vim mysql-to-doris.yaml



source:

type: mysql

hostname: hadoop3

port: 3306

username: root

password: "123456"

tables: test.\.

server-id: 5400-5404

server-time-zone: UTC+8



sink:

type: doris

fenodes: hadoop1:7030

username: root

password: "123456"

table.create.properties.light_schema_change: true

table.create.properties.replication_num: 1



pipeline:

name: Sync MySQL Database to Doris

parallelism: 1
3)启动任务并测试
(1)开启Flink集群,注意:在Flink设置信息中打开CheckPoint
$ vim conf/flink-conf.yaml
添加如下设置信息
execution.checkpointing.interval: 5000
分发该文件至其他Flink呆板并启动Flink集群
$ bin/start-cluster.sh
(2)开启Doris FE
$ bin/start_fe.sh
(3)开启Doris BE
$ bin/start_be.sh
(4)在Doris中创建test数据库
$ mysql -uroot -p123456 -P9030 -hhadoop1

mysql: Using a password on the command line interface can be insecure.

Welcome to the MySQL monitor.Commands end with ; or g.

Your MySQL connection id is 0

Server version: 5.7.99 Doris version doris-1.2.4-1-Unknown



Copyright (c) 2000, 2022, Oracle and/or its affiliates.



Oracle is a registered trademark of Oracle Corporation and/or its

affiliates. Other names may be trademarks of their respective

owners.



Type 'help;' or 'h' for help. Type 'c' to clear the current input statement.



mysql> create database test;

Query OK, 1 row affected (0.00 sec)



mysql> create database doris_test_route;

Query OK, 1 row affected (0.00 sec)
(5)启动FlinkCDC同步变动任务
$ bin/flink-cdc.sh job/mysql-to-doris.yaml
(6)刷新Doris中test数据库观察结果
(7)在MySQL的test数据中对应的几张表举行新增、修改数据以及新增列操作,并刷新Doris中test数据库观察结果
3 路由变动

1)编写MySQL到Doris的路由变动设置文件
source:

type: mysql

hostname: hadoop3

port: 3306

username: root

password: "123456"

tables: test_route.\.

server-id: 5400-5404

server-time-zone: UTC+8



sink:

type: doris

fenodes: hadoop1:7030

benodes: hadoop1:7040

username: root

password: "123456"

table.create.properties.light_schema_change: true

table.create.properties.replication_num: 1



route:

- source-table: test_route.t1

sink-table: doris_test_route.doris_t1

- source-table: test_route.t2

sink-table: doris_test_route.doris_t1

- source-table: test_route.t3

sink-table: doris_test_route.doris_t3



pipeline:

name: Sync MySQL Database to Doris

parallelism: 1
2)启动任务并测试
$ bin/flink-cdc.sh job/mysql-to-doris-route.yaml
3)刷新Doris中test_route数据库观察结果
4)在MySQL的test_route数据中对应的几张表举行新增、修改数据操作,并刷新Doris中doris_test_route数据库观察结果

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: MySQL到Doris的StreamingETL实现(Flink CDC 3.0)