ToB企服应用市场:ToB评测及商务社交产业平台

标题: Dinky教程--Flink CDC pipline整库同步Doris [打印本页]

作者: 圆咕噜咕噜    时间: 2024-12-11 09:47
标题: Dinky教程--Flink CDC pipline整库同步Doris
概述

Dinky 是一个基于 Apache Flink 的实时计算平台,它提供了一站式的 Flink 任务开发、运维、监控等功能。
本教程一步一步的教你怎样使用dinky运行CDC pipline任务实现整库同步Doris并自动建表功能。Starrocks同理
原文阅读:【巨人肩膀社区·专栏·分享】Dinky教程--Flink CDC pipline整库同步Doris
   项目地址:https://github.com/DataLinkDC/dinky
  如果以为项目不错接待前去点下 Star, 感谢您的支持!
前置条件

   如果还没有准备好dinky与flink集群,可以参考我以前的文章或官网举行摆设
  快速摆设Doris与Mysql测试环境

Flink CDC为我们提供了可快速摆设的docker-compose yaml文件,我们可以很方便的创建一个测试环境出来
   如果你mysql与Doris环境都已经具备,那么可以跳过此章节
  yaml 复制代码 
  1. version: '2.1'
  2. services:
  3.   doris:
  4.     image: yagagagaga/doris-standalone
  5.     ports:
  6.       - "8030:8030"
  7.       - "8040:8040"
  8.       - "9030:9030"
  9.   mysql:
  10.     image: debezium/example-mysql:1.1
  11.     ports:
  12.       - "3306:3306"
  13.     environment:
  14.       - MYSQL_ROOT_PASSWORD=123456
  15.       - MYSQL_USER=mysqluser
  16.       - MYSQL_PASSWORD=mysqlpw
复制代码
在 docker-compose.yml 地点目次下实验下面的命令来启动本教程需要的组件:
bash 复制代码 
  1. docker-compose up -d
复制代码
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问http://localhost:8030/ 来查看 Doris 是否运行正常。
准备数据

进入 MySQL 容器
bash 复制代码 
  1. docker-compose exec mysql mysql -uroot -p123456
复制代码
创建数据库 app_db 和表 orders,products,shipments,并插入数据
sql 复制代码 
  1. [/code] Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
  2. 进入 Doris Web UI。
  3. http://localhost:8030/
  4. 默认的用户名为 root,默认密码为空。
  5. 通过 Web UI 创建 app_db 数据库
  6. sql 复制代码 
  7. [code]create database app_db;
复制代码


image.png
下载CDC相关依赖

flink-cdc-3.1.0-bin.tar.gz
MySQL pipeline connector 3.1.0
Apache Doris pipeline connector 3.1.0
   上述依赖下载完成后,把flink-cdc-pipeline-connector-doris-3.1.0.jar 与 flink-cdc-pipeline-connector-mysql-3.1.0.jar 放到dinky的依赖目次下(dinky/extends 或者 docker摆设的customJar下面)
  办理CDC依赖辩论题目

   如果直接在dinky使用flink-cdc-dist-3.1.0.jar 会有java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList;错误,所以我们需要先处理一下
  sh 复制代码 
  1. # 解压 flink-cdc-3.1.0-bin.tar.gz
  2. tar -zxvf flink-cdc-3.1.0-bin.tar.gz      
  3. cd flink-cdc-3.1.0/lib/
  4. # 解压jar文件·
  5. jar -xvf flink-cdc-dist-3.1.0.jar
  6. # 删除冲突包
  7. rm -rf org/apache/calcite
  8. # 重新打包
  9. jar -cvf  flink-cdc-dist-3.1.0-new.jar *
复制代码
把新打包的flink-cdc-dist-3.1.0-new.jar文件放到dinky依赖目次下,重启dinky
开始运行

打开dinky页面,新建Flink Sql任务,输入以下代码,注意把相关IP替换成你本身的
Flink集群需要本身提前注册好,选择对应集群
sql 复制代码 
  1. SET 'execution.checkpointing.interval' = '30s';
  2. EXECUTE PIPELINE WITHYAML (
  3. source:
  4.   type: mysql
  5.   hostname: localhost
  6.   port: 3306
  7.   username: root
  8.   password: '123456'
  9.   tables: app_db.\.*
  10.   server-id: 5400-5404
  11. sink:
  12.   type: doris
  13.   fenodes: localhost:8030
  14.   username: root
  15.   password: ''
  16.   table.create.properties.light_schema_change: true
  17.   table.create.properties.replication_num: 1
  18. pipeline:
  19.   name: Sync MySQL Database to Doris
  20.   parallelism: 1
  21. )
复制代码


image.png
运行并验证

点击运行提交到Flink集群运行

前往运维中央查询任务状态,可以看到正常起来了

去Doris验证数据,可以看到表已经自动建好了,数据也同步过来了


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4