ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink cdc 实现源表sqlserver到目标表sqlserver或者mysql [打印本页]

作者: 麻花痒    时间: 2024-10-30 18:38
标题: Flink cdc 实现源表sqlserver到目标表sqlserver或者mysql
说明,本文章只提供借鉴,博主也是初次接触flink,请包涵
flink版本1.17.0,flink cdc版本2.3.0,百分百可以及时同步,亲测可用

起首看下我的程序结构吧

准备

1、依赖

  1.     <properties>
  2.         <flink.version>1.17.0</flink.version>
  3.         <flink.cdc.version>2.3.0</flink.cdc.version>
  4.         <logback-classic.version>1.1.11</logback-classic.version>
  5.         <slf4j-api.version>1.7.25</slf4j-api.version>
  6.     </properties>
  7.     <!-- 依赖器 -->
  8.     <dependencyManagement>
  9.         <dependencies>
  10.             <!-- SpringCloud 微服务 -->
  11.             <dependency>
  12.                 <groupId>org.springframework.cloud</groupId>
  13.                 <artifactId>spring-cloud-dependencies</artifactId>
  14.                 <version>2021.0.4</version>
  15.                 <type>pom</type>
  16.                 <scope>import</scope>
  17.             </dependency>
  18.             <!-- SpringCloud Alibaba 微服务 -->
  19.             <dependency>
  20.                 <groupId>com.alibaba.cloud</groupId>
  21.                 <artifactId>spring-cloud-alibaba-dependencies</artifactId>
  22.                 <version>2021.0.4.0</version>
  23.                 <type>pom</type>
  24.                 <scope>import</scope>
  25.             </dependency>
  26.             <!-- SpringBoot 依赖配置 -->
  27.             <dependency>
  28.                 <groupId>org.springframework.boot</groupId>
  29.                 <artifactId>spring-boot-dependencies</artifactId>
  30.                 <version>2.7.3</version>
  31.                 <type>pom</type>
  32.                 <scope>import</scope>
  33.             </dependency>
  34.         </dependencies>
  35.     </dependencyManagement>
  36.     <dependencies>
  37.         <!-- bootstrap 启动器 -->
  38.         <dependency>
  39.             <groupId>org.springframework.cloud</groupId>
  40.             <artifactId>spring-cloud-starter-bootstrap</artifactId>
  41.         </dependency>
  42.         <dependency>
  43.             <groupId>org.springframework.boot</groupId>
  44.             <artifactId>spring-boot-starter-web</artifactId>
  45.             <version>2.7.3</version>
  46.         </dependency>
  47.         <dependency>
  48.             <groupId>org.springframework.boot</groupId>
  49.             <artifactId>spring-boot-starter-freemarker</artifactId>
  50.             <version>2.4.4</version>
  51.         </dependency>
  52.         <dependency>
  53.             <groupId>org.mybatis</groupId>
  54.             <artifactId>mybatis</artifactId>
  55.             <version>3.5.9</version>
  56.         </dependency>
  57.         <dependency>
  58.             <groupId>com.microsoft.sqlserver</groupId>
  59.             <artifactId>mssql-jdbc</artifactId>
  60.             <version>9.4.1.jre8</version>
  61.         </dependency>
  62.         <dependency>
  63.             <groupId>com.google.code.gson</groupId>
  64.             <artifactId>gson</artifactId>
  65.             <version>2.8.2</version>
  66.         </dependency>
  67.         <dependency>
  68.             <groupId>org.apache.flink</groupId>
  69.             <artifactId>flink-java</artifactId>
  70.             <version>${flink.version}</version>
  71.         </dependency>
  72.         <dependency>
  73.             <groupId>org.apache.flink</groupId>
  74.             <artifactId>flink-streaming-java</artifactId>
  75.             <version>${flink.version}</version>
  76.         </dependency>
  77.         
  78.         <dependency>
  79.             <groupId>com.ververica</groupId>
  80.             <artifactId>flink-connector-sqlserver-cdc</artifactId>
  81.             <version>${flink.cdc.version}</version>
  82.         </dependency>
  83.         <dependency>
  84.             <groupId>com.ververica</groupId>
  85.             <artifactId>flink-connector-oracle-cdc</artifactId>
  86.             <version>${flink.cdc.version}</version>
  87.         </dependency>
  88.         <dependency>
  89.             <groupId>mysql</groupId>
  90.             <artifactId>mysql-connector-java</artifactId>
  91.             <version>8.0.33</version>
  92.         </dependency>
  93.         <dependency>
  94.             <groupId>org.springframework</groupId>
  95.             <artifactId>spring-context-support</artifactId>
  96.             <version>5.3.22</version>
  97.         </dependency>
  98.         <dependency>
  99.             <groupId>org.apache.flink</groupId>
  100.             <artifactId>flink-runtime-web</artifactId>
  101.             <version>${flink.version}</version>
  102.         </dependency>
  103.         <dependency>
  104.             <groupId>com.oracle.database.jdbc</groupId>
  105.             <artifactId>ojdbc8</artifactId>
  106.             <version>12.2.0.1</version>
  107.         </dependency>
  108.         <dependency>
  109.             <groupId>com.ibm.db2</groupId>
  110.             <artifactId>jcc</artifactId>
  111.             <version>11.5.0.0</version>
  112.         </dependency>
  113.         <dependency>
  114.             <groupId>org.projectlombok</groupId>
  115.             <artifactId>lombok</artifactId>
  116.             <version>1.18.26</version>
  117.             <scope>compile</scope>
  118.         </dependency>
  119.         <dependency>
  120.             <groupId>org.springframework.boot</groupId>
  121.             <artifactId>spring-boot-configuration-processor</artifactId>
  122.             <optional>true</optional>
  123.         </dependency>
  124.     </dependencies>
复制代码
2、打包插件

  1.     <build>
  2.             <plugins>
  3.                 <plugin>
  4.                     <groupId>org.apache.maven.plugins</groupId>
  5.                     <artifactId>maven-compiler-plugin</artifactId>
  6.                     <version>3.1</version>
  7.                     <configuration>
  8.                         <source>1.8</source>
  9.                         <target>1.8</target>
  10.                     </configuration>
  11.                 </plugin>
  12.                 <plugin>
  13.                     <groupId>org.apache.maven.plugins</groupId>
  14.                     <artifactId>maven-shade-plugin</artifactId>
  15.                     <version>3.1.1</version>
  16.                     <executions>
  17.                         <!-- Run shade goal on package phase -->
  18.                         <execution>
  19.                             <phase>package</phase>
  20.                             <goals>
  21.                                 <goal>shade</goal>
  22.                             </goals>
  23.                             <configuration>
  24.                                 <artifactSet>
  25.                                     <excludes>
  26.                                         <exclude>org.apache.flink:force-shading</exclude>
  27.                                         <exclude>com.google.code.findbugs:jsr305</exclude>
  28. <!--                                        <exclude>org.slf4j:*</exclude>-->
  29. <!--                                        <exclude>org.apache.logging.log4j:*</exclude>-->
  30.                                     </excludes>
  31.                                 </artifactSet>
  32.                                 <filters>
  33.                                     <filter>
  34.                                         <artifact>*:*</artifact>
  35.                                         <excludes>
  36.                                             <exclude>META-INF/*.SF</exclude>
  37.                                             <exclude>META-INF/*.DSA</exclude>
  38.                                             <exclude>META-INF/*.RSA</exclude>
  39.                                         </excludes>
  40.                                     </filter>
  41.                                 </filters>
  42.                                 <transformers combine.children="append">
  43.                                     <!-- TODO:这个防止多个connector的相同类名覆盖-->
  44.                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  45.                                     <!--指定 主类-->
  46.                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  47.                                         <mainClass>com.hbos.FlinkCDCApplication</mainClass>
  48.                                     </transformer>
  49.                                 </transformers>
  50.                             </configuration>
  51.                         </execution>
  52.                     </executions>
  53.                 </plugin>
  54.             </plugins>
  55.             <resources>
  56.                 <resource>
  57.                     <directory>src/main/resources</directory>
  58.                     <includes>
  59.                         <include>application.yml</include>
  60.                         <include>**/*.ftl</include>
  61.                         <include>**/*.xml</include>
  62.                         <include>**/*.properties</include>
  63.                     </includes>
  64.                     <filtering>true</filtering>
  65.                 </resource>
  66.             </resources>
  67.         </build>
复制代码
编写主类

  1. @SpringBootApplication(excludeName = {"org.springframework.security"})
  2. @Slf4j
  3. public class FlinkCDCApplication {
  4.     public static void main(String[] args){
  5.         SpringApplication.run(FlinkCDCApplication.class,args);
  6.         log.info("CDC服务启动成功");
  7.         System.out.println("=========================="+"\n"+
  8.                 "(♥◠‿◠)ノ゙CDC服务启动成功"+"\n"
  9.                 +"==========================");
  10.     }
  11. }
复制代码
编写源表SourceConfig 类设置

  1. package com.hbos.cdc.config;
  2. import com.hbos.cdc.entity.SourceDataProperties;
  3. import com.hbos.cdc.listener.DataChangeSink;
  4. import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
  5. import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
  6. import com.ververica.cdc.debezium.DebeziumSourceFunction;
  7. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.kafka.connect.json.DecimalFormat;
  10. import org.apache.kafka.connect.json.JsonConverterConfig;
  11. import org.springframework.beans.factory.annotation.Qualifier;
  12. import org.springframework.boot.context.properties.ConfigurationProperties;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. import java.util.Properties;
  18. @Configuration
  19. public class SourceConfig {
  20.     @Bean(name = "sourceInfo")
  21.     @ConfigurationProperties(prefix = "source")
  22.     public SourceDataProperties sourceInfo() {
  23.         return new SourceDataProperties();
  24.     }
  25.     @Bean
  26.     public StreamExecutionEnvironment source(@Qualifier("sourceInfo") SourceDataProperties sourceInfo) throws Exception {
  27.         Map config = new HashMap();
  28.         config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
  29.         JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
  30.         //通过FlinkCDC构建SourceFunction
  31.         DebeziumSourceFunction<String> sourceFunction  = SqlServerSource.<String>builder()
  32.                 .hostname(sourceInfo.getHost())
  33.                 .port(sourceInfo.getPort())
  34.                 .username(sourceInfo.getUsername())
  35.                 .password(sourceInfo.getPassword())
  36.                 .database(sourceInfo.getDatabase()) //设置捕获的库名
  37.                 .tableList(sourceInfo.getTableList()) //设置捕获的表,也可以去掉表,那就证明监控某库下所有的表
  38.                 .deserializer(jdd) //转换成JSON
  39.                 /*
  40.                  * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
  41.                  * latest:只进行增量导入(不读取历史变化)
  42.                  */
  43.                 .startupOptions(StartupOptions.initial())//启动模式
  44.                 .build();
  45.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  46.         env.setParallelism(5); //全局并行度为5
  47.         env.addSource(sourceFunction).addSink(new DataChangeSink()).setParallelism(2);
  48.         //4、启动任务
  49.         env.execute("JHYGCDC1");
  50.         return env;
  51.     }
  52. }
复制代码
由于flink的结构和springboot的结构不一样,以是要想把bean交给spring管理那就必要手动去创建
创建bean工具,也就是SpringBeanUtil类

  1. @Component
  2. public class SpringBeanUtil implements ApplicationContextAware {
  3.     private static ApplicationContext applicationContext;
  4.     public void setApplicationContext(ApplicationContext applicationContext)
  5.       throws BeansException {
  6.         SpringBeanUtil.applicationContext = applicationContext;
  7.     }
  8.     public static <T> T getBean(Class<T> clazz) {
  9.         return applicationContext.getBean(clazz);
  10.     }
  11.     public static Object getBean(String name) throws BeansException {
  12.         return applicationContext.getBean(name);
  13.     }
  14. }
复制代码
创建源表实体和目标表实体

源实体SourceDataProperties类
  1. //@Component
  2. @Data
  3. //@ConfigurationProperties(prefix = "source")
  4. public class SourceDataProperties {
  5.     private String driverClassName;
  6.     private String url;
  7.     private String host;
  8.     private int port;
  9.     private String username;
  10.     private String password;
  11.     private String database;
  12.     private String tableList;
  13. }
复制代码
目标实体TargetDataProperties 类
  1. @Component
  2. @Data
  3. @ConfigurationProperties(prefix = "target")
  4. public class TargetDataProperties {
  5.     private String driverClassName;
  6.     private String url;
  7.     private String host;
  8.     private String port;
  9.     private String username;
  10.     private String password;
  11.     private String database;
  12. //    @Value("${CDC.DataSource.target.schema}")
  13. //    private String schema;
  14. //
  15. //    @Value("${CDC.DataSource.target.tableList}")
  16. //    private String tableList;
  17. }
复制代码
sink类

也就是处置处罚业务逻辑的代码,有点多我就不粘贴出来了,你们可以自己处置处罚你们自己的业务逻辑
也可以借鉴我的,地址如下
flink cdc业务逻辑层处置处罚-CSDN博客
https://blog.csdn.net/weixin_50448162/article/details/140635326?spm=1001.2014.3001.5502


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4