【flink-cdc】flink-cdc 3版本debug启动pipeline任务,mysql-doris ...

打印 上一主题 下一主题

主题 802|帖子 802|积分 2406

官方文档
github堆栈地址
Flink cdc debug调试动态变动表结构
颠末测试利用,在启动任务设置Modify classpath添加jar的方式,容易出错classNotFoundException等等。

一、build project

flink-cdc版本:3.2.1
  1. mvn clean package "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
  2. # 当然install 也可以
  3. mvn clean install "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
复制代码
二、create module for test

在flink-cdc项目中新建一个module flink-cdc-test
将必要的flink cdc的connector依靠添加至pom
  1.     <properties>        
  2.         <flink.version>1.18.1</flink.version>  
  3.         <maven.compiler.source>8</maven.compiler.source>  
  4.         <maven.compiler.target>8</maven.compiler.target>  
  5.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  6.     </properties>  
  7.   
  8.     <dependencies>        
  9.         <dependency>  
  10.             <groupId>org.apache.flink</groupId>  
  11.             <artifactId>flink-table-common</artifactId>  
  12.             <version>${flink.version}</version>  
  13.             <scope>test</scope>  
  14.         </dependency>  
  15.   
  16.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->  
  17.         <dependency>  
  18.             <groupId>org.apache.flink</groupId>  
  19.             <artifactId>flink-table-runtime</artifactId>  
  20.             <version>${flink.version}</version>  
  21.             <scope>test</scope>  
  22.         </dependency>  
  23.   
  24.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->  
  25.         <dependency>  
  26.             <groupId>org.apache.flink</groupId>  
  27.             <artifactId>flink-table-api-java-bridge</artifactId>  
  28.             <version>${flink.version}</version>  
  29.             <scope>test</scope>  
  30.         </dependency>  
  31.   
  32.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->  
  33.         <dependency>  
  34.             <groupId>org.apache.flink</groupId>  
  35.             <artifactId>flink-core</artifactId>  
  36.             <version>${flink.version}</version>  
  37.             <scope>test</scope>  
  38.         </dependency>  
  39.   
  40.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->  
  41.         <dependency>  
  42.             <groupId>org.apache.flink</groupId>  
  43.             <artifactId>flink-streaming-java</artifactId>  
  44.             <version>1.18.1</version>  
  45.             <scope>test</scope>  
  46.         </dependency>  
  47.   
  48.         <dependency>            
  49.             <groupId>org.apache.flink</groupId>  
  50.             <artifactId>flink-clients</artifactId>  
  51.             <version>1.18.1</version>  
  52.             <scope>test</scope>  
  53.         </dependency>  
  54.   
  55.         <dependency>            
  56.         <groupId>org.apache.flink</groupId>  
  57.             <artifactId>flink-cdc-cli</artifactId>  
  58.             <version>${project.version}</version>  
  59.             <scope>test</scope>  
  60.         </dependency>  
  61.   
  62.         <dependency>            
  63.             <groupId>org.apache.flink</groupId>  
  64.             <artifactId>flink-cdc-pipeline-connector-mysql</artifactId>  
  65.             <version>${project.version}</version>  
  66.             <scope>test</scope>  
  67.         </dependency>  
  68.   
  69.         <dependency>            
  70.             <groupId>org.apache.flink</groupId>  
  71.             <artifactId>flink-cdc-pipeline-connector-kafka</artifactId>  
  72.             <version>${project.version}</version>  
  73.             <scope>test</scope>  
  74.         </dependency>  
  75.   
  76.         <dependency>            
  77.             <groupId>org.apache.flink</groupId>  
  78.             <artifactId>flink-cdc-pipeline-connector-doris</artifactId>  
  79.             <version>${project.version}</version>  
  80.             <scope>test</scope>  
  81.         </dependency>  
  82.   
  83.         <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->  
  84.         <dependency>  
  85.             <groupId>mysql</groupId>  
  86.             <artifactId>mysql-connector-java</artifactId>  
  87.             <version>8.0.27</version>  
  88.         </dependency>  
  89.    
  90.     </dependencies>  
复制代码
create pipeline yaml

创建flink cdc pipeline的设置文件mysql-to-doris.yaml:
注意:drois的端口号不是9030,而是8030
  1. source:  
  2.   type: mysql  
  3.   name: MySQL-Source  
  4.   hostname: 192.168.100.123
  5.   port: 3306  
  6.   username: root  
  7.   password: 123456  
  8.   tables: test.student  
  9.   server-id: 5401-5404  
  10.   server-time-zone: UTC  
  11.   
  12. sink:  
  13.   type: doris  
  14.   name: Doris-Sink  
  15.   fenodes: 192.168.101:8030
  16.   username: root
  17.   password: 123456  
  18.   
  19.   
  20. pipeline:  
  21.   name: MySQL to Doris Schema Evolution  
  22.   parallelism: 1
复制代码
create pipeline start entrypoint

测试类,需在src/test目次下:
  1. import org.apache.flink.cdc.cli.CliFrontend;  
  2. import org.junit.jupiter.api.Test;  
  3.   
  4. import java.util.ArrayList;  
  5. import java.util.List;  
  6.   
  7. public class MysqlPipelineTest {  
  8.   
  9.     @Test  
  10.     public void testMysql() throws Exception {  
  11.         List<String> args= new ArrayList<>();  
  12.         args.add("D:\\flink-cdc-release-3.2.1\\flink-cdc-test\\pipelines\\mysql-to-doris.yaml");  
  13.         args.add("--use-mini-cluster");  
  14.         args.add("true");  
  15.         CliFrontend.main(args.toArray(new String[0]));  
  16.     }  
  17. }
复制代码
修改启动设置添加FLINK_HOME情况变量:

启动任务:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

兜兜零元

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表