兜兜零元 发表于 2025-1-4 17:34:39

【flink-cdc】flink-cdc 3版本debug启动pipeline任务,mysql-doris

官方文档
github堆栈地址
Flink cdc debug调试动态变动表结构
颠末测试利用,在启动任务设置Modify classpath添加jar的方式,容易出错classNotFoundException等等。
https://i-blog.csdnimg.cn/direct/28735d2ef772488cb752e2f025f58085.png
一、build project

flink-cdc版本:3.2.1
mvn clean package "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
# 当然install 也可以
mvn clean install "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
二、create module for test

在flink-cdc项目中新建一个module flink-cdc-test
将必要的flink cdc的connector依靠添加至pom

    <properties>      
      <flink.version>1.18.1</flink.version>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>      
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.18.1</version>
            <scope>test</scope>
      </dependency>

      <dependency>            
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.1</version>
            <scope>test</scope>
      </dependency>

      <dependency>            
      <groupId>org.apache.flink</groupId>
            <artifactId>flink-cdc-cli</artifactId>
            <version>${project.version}</version>
            <scope>test</scope>
      </dependency>

      <dependency>            
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
            <version>${project.version}</version>
            <scope>test</scope>
      </dependency>

      <dependency>            
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
            <version>${project.version}</version>
            <scope>test</scope>
      </dependency>

      <dependency>            
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cdc-pipeline-connector-doris</artifactId>
            <version>${project.version}</version>
            <scope>test</scope>
      </dependency>

      <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
      <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
      </dependency>
   
    </dependencies>

create pipeline yaml

创建flink cdc pipeline的设置文件mysql-to-doris.yaml:
注意:drois的端口号不是9030,而是8030
source:
type: mysql
name: MySQL-Source
hostname: 192.168.100.123
port: 3306
username: root
password: 123456
tables: test.student
server-id: 5401-5404
server-time-zone: UTC

sink:
type: doris
name: Doris-Sink
fenodes: 192.168.101:8030
username: root
password: 123456


pipeline:
name: MySQL to Doris Schema Evolution
parallelism: 1
create pipeline start entrypoint

测试类,需在src/test目次下:
import org.apache.flink.cdc.cli.CliFrontend;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

public class MysqlPipelineTest {

    @Test
    public void testMysql() throws Exception {
      List<String> args= new ArrayList<>();
      args.add("D:\\flink-cdc-release-3.2.1\\flink-cdc-test\\pipelines\\mysql-to-doris.yaml");
      args.add("--use-mini-cluster");
      args.add("true");
      CliFrontend.main(args.toArray(new String));
    }
}
修改启动设置添加FLINK_HOME情况变量:
https://i-blog.csdnimg.cn/direct/949029f1c8904c45be61faf41643ec58.png
启动任务:
https://i-blog.csdnimg.cn/direct/2e5cdbbb64ab446dbeb3304a9c94dfbe.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【flink-cdc】flink-cdc 3版本debug启动pipeline任务,mysql-doris