【flink-cdc】flink-cdc 3版本debug启动pipeline任务,mysql-doris
官方文档github堆栈地址
Flink cdc debug调试动态变动表结构
颠末测试利用,在启动任务设置Modify classpath添加jar的方式,容易出错classNotFoundException等等。
https://i-blog.csdnimg.cn/direct/28735d2ef772488cb752e2f025f58085.png
一、build project
flink-cdc版本:3.2.1
mvn clean package "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
# 当然install 也可以
mvn clean install "-Dmaven.test.skip=true" "-Drat.skip=true" "-Dcheckstyle.skip"
二、create module for test
在flink-cdc项目中新建一个module flink-cdc-test
将必要的flink cdc的connector依靠添加至pom
<properties>
<flink.version>1.18.1</flink.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-cli</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-mysql</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-doris</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
</dependencies>
create pipeline yaml
创建flink cdc pipeline的设置文件mysql-to-doris.yaml:
注意:drois的端口号不是9030,而是8030
source:
type: mysql
name: MySQL-Source
hostname: 192.168.100.123
port: 3306
username: root
password: 123456
tables: test.student
server-id: 5401-5404
server-time-zone: UTC
sink:
type: doris
name: Doris-Sink
fenodes: 192.168.101:8030
username: root
password: 123456
pipeline:
name: MySQL to Doris Schema Evolution
parallelism: 1
create pipeline start entrypoint
测试类,需在src/test目次下:
import org.apache.flink.cdc.cli.CliFrontend;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
public class MysqlPipelineTest {
@Test
public void testMysql() throws Exception {
List<String> args= new ArrayList<>();
args.add("D:\\flink-cdc-release-3.2.1\\flink-cdc-test\\pipelines\\mysql-to-doris.yaml");
args.add("--use-mini-cluster");
args.add("true");
CliFrontend.main(args.toArray(new String));
}
}
修改启动设置添加FLINK_HOME情况变量:
https://i-blog.csdnimg.cn/direct/949029f1c8904c45be61faf41643ec58.png
启动任务:
https://i-blog.csdnimg.cn/direct/2e5cdbbb64ab446dbeb3304a9c94dfbe.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]