说明,本文章只提供借鉴,博主也是初次接触flink,请包涵
flink版本1.17.0,flink cdc版本2.3.0,百分百可以及时同步,亲测可用
起首看下我的程序结构吧
准备
1、依赖
2、打包插件
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.1.1</version>
- <executions>
- <!-- Run shade goal on package phase -->
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <excludes>
- <exclude>org.apache.flink:force-shading</exclude>
- <exclude>com.google.code.findbugs:jsr305</exclude>
- <!-- <exclude>org.slf4j:*</exclude>-->
- <!-- <exclude>org.apache.logging.log4j:*</exclude>-->
- </excludes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers combine.children="append">
- <!-- TODO:这个防止多个connector的相同类名覆盖-->
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <!--指定 主类-->
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.hbos.FlinkCDCApplication</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <includes>
- <include>application.yml</include>
- <include>**/*.ftl</include>
- <include>**/*.xml</include>
- <include>**/*.properties</include>
- </includes>
- <filtering>true</filtering>
- </resource>
- </resources>
- </build>
复制代码 编写主类
- @SpringBootApplication(excludeName = {"org.springframework.security"})
- @Slf4j
- public class FlinkCDCApplication {
- public static void main(String[] args){
- SpringApplication.run(FlinkCDCApplication.class,args);
- log.info("CDC服务启动成功");
- System.out.println("=========================="+"\n"+
- "(♥◠‿◠)ノ゙CDC服务启动成功"+"\n"
- +"==========================");
- }
- }
复制代码 编写源表SourceConfig 类设置
- package com.hbos.cdc.config;
- import com.hbos.cdc.entity.SourceDataProperties;
- import com.hbos.cdc.listener.DataChangeSink;
- import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
- import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.kafka.connect.json.DecimalFormat;
- import org.apache.kafka.connect.json.JsonConverterConfig;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Properties;
- @Configuration
- public class SourceConfig {
- @Bean(name = "sourceInfo")
- @ConfigurationProperties(prefix = "source")
- public SourceDataProperties sourceInfo() {
- return new SourceDataProperties();
- }
- @Bean
- public StreamExecutionEnvironment source(@Qualifier("sourceInfo") SourceDataProperties sourceInfo) throws Exception {
- Map config = new HashMap();
- config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
- JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
- //通过FlinkCDC构建SourceFunction
- DebeziumSourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
- .hostname(sourceInfo.getHost())
- .port(sourceInfo.getPort())
- .username(sourceInfo.getUsername())
- .password(sourceInfo.getPassword())
- .database(sourceInfo.getDatabase()) //设置捕获的库名
- .tableList(sourceInfo.getTableList()) //设置捕获的表,也可以去掉表,那就证明监控某库下所有的表
- .deserializer(jdd) //转换成JSON
- /*
- * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
- * latest:只进行增量导入(不读取历史变化)
- */
- .startupOptions(StartupOptions.initial())//启动模式
- .build();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(5); //全局并行度为5
- env.addSource(sourceFunction).addSink(new DataChangeSink()).setParallelism(2);
- //4、启动任务
- env.execute("JHYGCDC1");
- return env;
- }
- }
复制代码 由于flink的结构和springboot的结构不一样,以是要想把bean交给spring管理那就必要手动去创建
创建bean工具,也就是SpringBeanUtil类
- @Component
- public class SpringBeanUtil implements ApplicationContextAware {
- private static ApplicationContext applicationContext;
- public void setApplicationContext(ApplicationContext applicationContext)
- throws BeansException {
- SpringBeanUtil.applicationContext = applicationContext;
- }
- public static <T> T getBean(Class<T> clazz) {
- return applicationContext.getBean(clazz);
- }
- public static Object getBean(String name) throws BeansException {
- return applicationContext.getBean(name);
- }
- }
复制代码 创建源表实体和目标表实体
源实体SourceDataProperties类
- //@Component
- @Data
- //@ConfigurationProperties(prefix = "source")
- public class SourceDataProperties {
- private String driverClassName;
- private String url;
- private String host;
- private int port;
- private String username;
- private String password;
- private String database;
- private String tableList;
- }
复制代码 目标实体TargetDataProperties 类
- @Component
- @Data
- @ConfigurationProperties(prefix = "target")
- public class TargetDataProperties {
- private String driverClassName;
- private String url;
- private String host;
- private String port;
- private String username;
- private String password;
- private String database;
- // @Value("${CDC.DataSource.target.schema}")
- // private String schema;
- //
- // @Value("${CDC.DataSource.target.tableList}")
- // private String tableList;
- }
复制代码 sink类
也就是处置处罚业务逻辑的代码,有点多我就不粘贴出来了,你们可以自己处置处罚你们自己的业务逻辑
也可以借鉴我的,地址如下
flink cdc业务逻辑层处置处罚-CSDN博客https://blog.csdn.net/weixin_50448162/article/details/140635326?spm=1001.2014.3001.5502
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |