用户名
Email
论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
ToB门户
了解全球最新的ToB事件
博客
Blog
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
应用中心
帖子
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
IT评测·应用市场-qidao123.com技术社区
»
论坛
›
数据库
›
分布式数据库
›
【flink-cdc】flink-cdc 3版本debug启动pipeline任务,m ...
【flink-cdc】flink-cdc 3版本debug启动pipeline任务,mysql-doris ...
兜兜零元
论坛元老
|
2025-1-4 17:34:39
|
显示全部楼层
|
阅读模式
楼主
主题
1756
|
帖子
1756
|
积分
5268
官方文档
github堆栈地址
Flink cdc debug调试动态变动表结构
颠末测试利用,在启动任务设置Modify classpath添加jar的方式,容易出错classNotFoundException等等。
一、build project
flink-cdc版本:3.2.1
mvn clean package "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
# 当然install 也可以
mvn clean install "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
复制代码
二、create module for test
在flink-cdc项目中新建一个module flink-cdc-test
将必要的flink cdc的connector依靠添加至pom
<properties>
<flink.version>1.18.1</flink.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-cli</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
</dependencies>
复制代码
create pipeline yaml
创建flink cdc pipeline的设置文件mysql-to-doris.yaml:
注意:drois的端口号不是9030,而是8030
source:
type: mysql
name: MySQL-Source
hostname: 192.168.100.123
port: 3306
username: root
password: 123456
tables: test.student
server-id: 5401-5404
server-time-zone: UTC
sink:
type: doris
name: Doris-Sink
fenodes: 192.168.101:8030
username: root
password: 123456
pipeline:
name: MySQL to Doris Schema Evolution
parallelism: 1
复制代码
create pipeline start entrypoint
测试类,需在src/test目次下:
import org.apache.flink.cdc.cli.CliFrontend;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
public class MysqlPipelineTest {
@Test
public void testMysql() throws Exception {
List<String> args= new ArrayList<>();
args.add("D:\\flink-cdc-release-3.2.1\\flink-cdc-test\\pipelines\\mysql-to-doris.yaml");
args.add("--use-mini-cluster");
args.add("true");
CliFrontend.main(args.toArray(new String[0]));
}
}
复制代码
修改启动设置添加FLINK_HOME情况变量:
启动任务:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
本帖子中包含更多资源
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
回复
举报
0 个回复
倒序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
回帖后跳转到最后一页
发新帖
回复
兜兜零元
论坛元老
这个人很懒什么都没写!
楼主热帖
数据库的建立、增、删、改、查 ...
深入解析kubernetes中的选举机制 ...
【黄啊码】MySQL入门—4、掌握这些数据 ...
clang-format的使用
MySQL安装配置
Oracle调度器Scheduler
V Rising 服务器搭建
02-MySQL高级
2万多条健康网站文章大全ACCESS\EXCEL ...
【黄啊码】MySQL入门—5、数据库小技巧 ...
标签云
集成商
AI
运维
CIO
存储
服务器
浏览过的版块
程序人生
登录参与点评抽奖加入IT实名职场社区
下次自动登录
忘记密码?点此找回!
登陆
新用户注册
用其它账号登录:
关闭
快速回复
返回顶部
返回列表