Flink+Kafka+Mysql(8.0)

打印 上一主题 下一主题

主题 854|帖子 854|积分 2562

一、前言阐明

Flink版本:1.14.6
Mysql版本:8.0
JDK版本:1.8
本实例主要实现功能如下:
模拟消息生成->Kafka->Flink->Mysql
其中Flink做数据流网络并定时批量写入到Mysql

本例使用Intellij IDEA作为项目开发的IDE。
整个项目结构如图所示:

二、项目代码已提交至 gitee

flink-kafka-mysql: Flink读取Kafka 消息并批量写入到 MySQL8.0
三、项目具体代码

POM文件内容:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.example</groupId>
  7.     <artifactId>flink-kafka-mysql</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>8</maven.compiler.source>
  11.         <maven.compiler.target>8</maven.compiler.target>
  12.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  13.         <flink.version>1.14.6</flink.version>
  14.         <scala.binary.version>2.11</scala.binary.version>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>org.apache.flink</groupId>
  19.             <artifactId>flink-java</artifactId>
  20.             <version>${flink.version}</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.apache.flink</groupId>
  24.             <artifactId>flink-clients_${scala.binary.version}</artifactId>
  25.             <version>${flink.version}</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.apache.flink</groupId>
  29.             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  30.             <version>${flink.version}</version>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>org.apache.flink</groupId>
  34.             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  35.             <version>${flink.version}</version>
  36.         </dependency>
  37.         <dependency>
  38.             <groupId>org.apache.flink</groupId>
  39.             <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  40.             <version>${flink.version}</version>
  41.         </dependency>
  42.         <dependency>
  43.             <groupId>org.slf4j</groupId>
  44.             <artifactId>slf4j-log4j12</artifactId>
  45.             <version>1.7.25</version>
  46.         </dependency>
  47.         <dependency>
  48.             <groupId>log4j</groupId>
  49.             <artifactId>log4j</artifactId>
  50.             <version>1.2.17</version>
  51.         </dependency>
  52.         <dependency>
  53.             <groupId>mysql</groupId>
  54.             <artifactId>mysql-connector-java</artifactId>
  55.             <version>8.0.23</version>
  56.         </dependency>
  57.         <dependency>
  58.             <groupId>com.alibaba</groupId>
  59.             <artifactId>druid</artifactId>
  60.             <version>1.2.8</version>
  61.         </dependency>
  62.         <dependency>
  63.             <groupId>org.apache.kafka</groupId>
  64.             <artifactId>kafka-clients</artifactId>
  65.             <version>2.1.1</version>
  66.         </dependency>
  67.         <dependency>
  68.             <groupId>com.alibaba</groupId>
  69.             <artifactId>fastjson</artifactId>
  70.             <version>1.2.70</version>
  71.         </dependency>
  72.     </dependencies>
  73.     <build>
  74.         <plugins>
  75.             <!-- 构建主 JAR 文件(瘦包) -->
  76.             <plugin>
  77.                 <groupId>org.apache.maven.plugins</groupId>
  78.                 <artifactId>maven-jar-plugin</artifactId>
  79.                 <version>3.2.0</version>
  80.                 <configuration>
  81.                     <archive>
  82.                         <manifest>
  83.                             <addClasspath>true</addClasspath>
  84.                             <classpathPrefix>lib/</classpathPrefix> <!-- 设置依赖的类路径前缀为 lib/ -->
  85.                             <mainClass>com.slink.StudentRunner</mainClass> <!-- 替换为你的主类 -->
  86.                         </manifest>
  87.                     </archive>
  88.                 </configuration>
  89.             </plugin>
  90.             <!-- 将依赖项复制到 lib 目录 -->
  91.             <plugin>
  92.                 <groupId>org.apache.maven.plugins</groupId>
  93.                 <artifactId>maven-dependency-plugin</artifactId>
  94.                 <version>3.1.2</version>
  95.                 <executions>
  96.                     <execution>
  97.                         <id>copy-dependencies</id>
  98.                         <phase>package</phase>
  99.                         <goals>
  100.                             <goal>copy-dependencies</goal>
  101.                         </goals>
  102.                         <configuration>
  103.                             <!-- 输出目录为 target/lib -->
  104.                             <outputDirectory>${project.build.directory}/lib</outputDirectory>
  105.                             <!-- 指定复制运行时依赖 -->
  106.                             <includeScope>runtime</includeScope>
  107.                         </configuration>
  108.                     </execution>
  109.                 </executions>
  110.             </plugin>
  111.             <!-- 可选:使用 maven-assembly-plugin 打包主 JAR 和 lib 目录 -->
  112.             <plugin>
  113.                 <groupId>org.apache.maven.plugins</groupId>
  114.                 <artifactId>maven-assembly-plugin</artifactId>
  115.                 <version>3.3.0</version>
  116.                 <executions>
  117.                     <execution>
  118.                         <id>make-assembly</id>
  119.                         <phase>package</phase>
  120.                         <goals>
  121.                             <goal>single</goal>
  122.                         </goals>
  123.                         <configuration>
  124.                             <descriptors>
  125.                                 <descriptor>src/main/resources/assembly/assembly.xml</descriptor>
  126.                             </descriptors>
  127.                         </configuration>
  128.                     </execution>
  129.                 </executions>
  130.             </plugin>
  131.         </plugins>
  132.     </build>
  133. </project>
复制代码
application-dev.properties配置文件:
  1. # mysql-jdbc
  2. jdbc.driver=com.mysql.cj.jdbc.Driver
  3. jdbc.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
  4. jdbc.username=root
  5. jdbc.password=root
  6. jdbc.druidInitialSize=10
  7. jdbc.druidMaxActive=20
  8. jdbc.druidMinIdle=5
  9. jdbc.druidMaxWait=20000
  10. jdbc.druidTimeBetweenEvictionRunsMillis=60000
  11. jdbc.druidMaxEvictableIdleTimeMillis=3600000
  12. jdbc.druidMinEvictableIdleTimeMillis=3240000
  13. jdbc.druidTestWhileIdle=true
  14. jdbc.druidTestOnBorrow=true
  15. jdbc.druidTestOnReturn=false
  16. jdbc.druidPoolPreparedStatements=true
  17. jdbc.druidMaxPoolPreparedStatementPerConnectionSize=10
  18. jdbc.druidFilters=stat,slf4j
  19. jdbc.druidValidationQuery=select 1
  20. # kafka
  21. kafka.servers=127.0.0.1:9092
  22. kafka.consumer.groupId=kafka.consumer.group
  23. kafka.auto.offset.reset=latest
  24. kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
  25. kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
  26. kafka.user.event.topic=testTopic
复制代码
Mysql数据表创建:
  1. CREATE TABLE `student` (
  2.                            `id` bigint(20) NOT NULL AUTO_INCREMENT,
  3.                            `name1` varchar(50),
  4.                            `name2` varchar(50),
  5.                            `name3` varchar(50),
  6.                            `name4` varchar(50),
  7.                            `name5` varchar(50),
  8.                            `name6` varchar(50),
  9.                            `name7` varchar(50),
  10.                            `name8` varchar(50),
  11.                            `name9` varchar(50),
  12.                            `name10` varchar(50),
  13.                            `name11` varchar(50),
  14.                            `name12` varchar(50),
  15.                            `name13` varchar(50),
  16.                            `name14` varchar(50),
  17.                            `name15` varchar(50),
  18.                            `name16` varchar(50),
  19.                            `name17` varchar(50),
  20.                            `name18` varchar(50),
  21.                            `name19` varchar(50),
  22.                            `name20` varchar(50),
  23.                            `name21` varchar(50),
  24.                            `name22` varchar(50),
  25.                            `name23` varchar(50),
  26.                            `name24` varchar(50),
  27.                            `name25` varchar(50),
  28.                            `name26` varchar(50),
  29.                            `name27` varchar(50),
  30.                            `name28` varchar(50),
  31.                            `name29` varchar(50),
  32.                            `name30` varchar(50),
  33.                            `name31` varchar(50),
  34.                            `name32` varchar(50),
  35.                            `name33` varchar(50),
  36.                            `name34` varchar(50),
  37.                            `name35` varchar(50),
  38.                            `name36` varchar(50),
  39.                            `name37` varchar(50),
  40.                            `name38` varchar(50),
  41.                            `name39` varchar(50),
  42.                            `name40` varchar(50),
  43.                            `name41` varchar(50),
  44.                            `name42` varchar(50),
  45.                            `name43` varchar(50),
  46.                            `name44` varchar(50),
  47.                            `name45` varchar(50),
  48.                            `name46` varchar(50),
  49.                            `name47` varchar(50),
  50.                            `name48` varchar(50),
  51.                            `name49` varchar(50),
  52.                            `name50` varchar(50),
  53.                            PRIMARY KEY (`id`)
  54. ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='学生表';
复制代码
创建Student实体类:
  1. package com.slink.entity;
  2. import java.io.Serializable;
  3. public class Student implements Serializable {
  4.     private long id;
  5.     private String name1;
  6.     private String name2;
  7.     private String name3;
  8.     private String name4;
  9.     private String name5;
  10.     private String name6;
  11.     private String name7;
  12.     private String name8;
  13.     private String name9;
  14.     private String name10;
  15.     private String name11;
  16.     private String name12;
  17.     private String name13;
  18.     private String name14;
  19.     private String name15;
  20.     private String name16;
  21.     private String name17;
  22.     private String name18;
  23.     private String name19;
  24.     private String name20;
  25.     private String name21;
  26.     private String name22;
  27.     private String name23;
  28.     private String name24;
  29.     private String name25;
  30.     private String name26;
  31.     private String name27;
  32.     private String name28;
  33.     private String name29;
  34.     private String name30;
  35.     private String name31;
  36.     private String name32;
  37.     private String name33;
  38.     private String name34;
  39.     private String name35;
  40.     private String name36;
  41.     private String name37;
  42.     private String name38;
  43.     private String name39;
  44.     private String name40;
  45.     private String name41;
  46.     private String name42;
  47.     private String name43;
  48.     private String name44;
  49.     private String name45;
  50.     private String name46;
  51.     private String name47;
  52.     private String name48;
  53.     private String name49;
  54.     private String name50;
  55.     public long getId() {
  56.         return id;
  57.     }
  58.     public void setId(long id) {
  59.         this.id = id;
  60.     }
  61.     public String getName1() {
  62.         return name1;
  63.     }
  64.     public void setName1(String name1) {
  65.         this.name1 = name1;
  66.     }
  67.     public String getName2() {
  68.         return name2;
  69.     }
  70.     public void setName2(String name2) {
  71.         this.name2 = name2;
  72.     }
  73.     public String getName3() {
  74.         return name3;
  75.     }
  76.     public void setName3(String name3) {
  77.         this.name3 = name3;
  78.     }
  79.     public String getName4() {
  80.         return name4;
  81.     }
  82.     public void setName4(String name4) {
  83.         this.name4 = name4;
  84.     }
  85.     public String getName5() {
  86.         return name5;
  87.     }
  88.     public void setName5(String name5) {
  89.         this.name5 = name5;
  90.     }
  91.     public String getName6() {
  92.         return name6;
  93.     }
  94.     public void setName6(String name6) {
  95.         this.name6 = name6;
  96.     }
  97.     public String getName7() {
  98.         return name7;
  99.     }
  100.     public void setName7(String name7) {
  101.         this.name7 = name7;
  102.     }
  103.     public String getName8() {
  104.         return name8;
  105.     }
  106.     public void setName8(String name8) {
  107.         this.name8 = name8;
  108.     }
  109.     public String getName9() {
  110.         return name9;
  111.     }
  112.     public void setName9(String name9) {
  113.         this.name9 = name9;
  114.     }
  115.     public String getName10() {
  116.         return name10;
  117.     }
  118.     public void setName10(String name10) {
  119.         this.name10 = name10;
  120.     }
  121.     public String getName11() {
  122.         return name11;
  123.     }
  124.     public void setName11(String name11) {
  125.         this.name11 = name11;
  126.     }
  127.     public String getName12() {
  128.         return name12;
  129.     }
  130.     public void setName12(String name12) {
  131.         this.name12 = name12;
  132.     }
  133.     public String getName13() {
  134.         return name13;
  135.     }
  136.     public void setName13(String name13) {
  137.         this.name13 = name13;
  138.     }
  139.     public String getName14() {
  140.         return name14;
  141.     }
  142.     public void setName14(String name14) {
  143.         this.name14 = name14;
  144.     }
  145.     public String getName15() {
  146.         return name15;
  147.     }
  148.     public void setName15(String name15) {
  149.         this.name15 = name15;
  150.     }
  151.     public String getName16() {
  152.         return name16;
  153.     }
  154.     public void setName16(String name16) {
  155.         this.name16 = name16;
  156.     }
  157.     public String getName17() {
  158.         return name17;
  159.     }
  160.     public void setName17(String name17) {
  161.         this.name17 = name17;
  162.     }
  163.     public String getName18() {
  164.         return name18;
  165.     }
  166.     public void setName18(String name18) {
  167.         this.name18 = name18;
  168.     }
  169.     public String getName19() {
  170.         return name19;
  171.     }
  172.     public void setName19(String name19) {
  173.         this.name19 = name19;
  174.     }
  175.     public String getName20() {
  176.         return name20;
  177.     }
  178.     public void setName20(String name20) {
  179.         this.name20 = name20;
  180.     }
  181.     public String getName21() {
  182.         return name21;
  183.     }
  184.     public void setName21(String name21) {
  185.         this.name21 = name21;
  186.     }
  187.     public String getName22() {
  188.         return name22;
  189.     }
  190.     public void setName22(String name22) {
  191.         this.name22 = name22;
  192.     }
  193.     public String getName23() {
  194.         return name23;
  195.     }
  196.     public void setName23(String name23) {
  197.         this.name23 = name23;
  198.     }
  199.     public String getName24() {
  200.         return name24;
  201.     }
  202.     public void setName24(String name24) {
  203.         this.name24 = name24;
  204.     }
  205.     public String getName25() {
  206.         return name25;
  207.     }
  208.     public void setName25(String name25) {
  209.         this.name25 = name25;
  210.     }
  211.     public String getName26() {
  212.         return name26;
  213.     }
  214.     public void setName26(String name26) {
  215.         this.name26 = name26;
  216.     }
  217.     public String getName27() {
  218.         return name27;
  219.     }
  220.     public void setName27(String name27) {
  221.         this.name27 = name27;
  222.     }
  223.     public String getName28() {
  224.         return name28;
  225.     }
  226.     public void setName28(String name28) {
  227.         this.name28 = name28;
  228.     }
  229.     public String getName29() {
  230.         return name29;
  231.     }
  232.     public void setName29(String name29) {
  233.         this.name29 = name29;
  234.     }
  235.     public String getName30() {
  236.         return name30;
  237.     }
  238.     public void setName30(String name30) {
  239.         this.name30 = name30;
  240.     }
  241.     public String getName31() {
  242.         return name31;
  243.     }
  244.     public void setName31(String name31) {
  245.         this.name31 = name31;
  246.     }
  247.     public String getName32() {
  248.         return name32;
  249.     }
  250.     public void setName32(String name32) {
  251.         this.name32 = name32;
  252.     }
  253.     public String getName33() {
  254.         return name33;
  255.     }
  256.     public void setName33(String name33) {
  257.         this.name33 = name33;
  258.     }
  259.     public String getName34() {
  260.         return name34;
  261.     }
  262.     public void setName34(String name34) {
  263.         this.name34 = name34;
  264.     }
  265.     public String getName35() {
  266.         return name35;
  267.     }
  268.     public void setName35(String name35) {
  269.         this.name35 = name35;
  270.     }
  271.     public String getName36() {
  272.         return name36;
  273.     }
  274.     public void setName36(String name36) {
  275.         this.name36 = name36;
  276.     }
  277.     public String getName37() {
  278.         return name37;
  279.     }
  280.     public void setName37(String name37) {
  281.         this.name37 = name37;
  282.     }
  283.     public String getName38() {
  284.         return name38;
  285.     }
  286.     public void setName38(String name38) {
  287.         this.name38 = name38;
  288.     }
  289.     public String getName39() {
  290.         return name39;
  291.     }
  292.     public void setName39(String name39) {
  293.         this.name39 = name39;
  294.     }
  295.     public String getName40() {
  296.         return name40;
  297.     }
  298.     public void setName40(String name40) {
  299.         this.name40 = name40;
  300.     }
  301.     public String getName41() {
  302.         return name41;
  303.     }
  304.     public void setName41(String name41) {
  305.         this.name41 = name41;
  306.     }
  307.     public String getName42() {
  308.         return name42;
  309.     }
  310.     public void setName42(String name42) {
  311.         this.name42 = name42;
  312.     }
  313.     public String getName43() {
  314.         return name43;
  315.     }
  316.     public void setName43(String name43) {
  317.         this.name43 = name43;
  318.     }
  319.     public String getName44() {
  320.         return name44;
  321.     }
  322.     public void setName44(String name44) {
  323.         this.name44 = name44;
  324.     }
  325.     public String getName45() {
  326.         return name45;
  327.     }
  328.     public void setName45(String name45) {
  329.         this.name45 = name45;
  330.     }
  331.     public String getName46() {
  332.         return name46;
  333.     }
  334.     public void setName46(String name46) {
  335.         this.name46 = name46;
  336.     }
  337.     public String getName47() {
  338.         return name47;
  339.     }
  340.     public void setName47(String name47) {
  341.         this.name47 = name47;
  342.     }
  343.     public String getName48() {
  344.         return name48;
  345.     }
  346.     public void setName48(String name48) {
  347.         this.name48 = name48;
  348.     }
  349.     public String getName49() {
  350.         return name49;
  351.     }
  352.     public void setName49(String name49) {
  353.         this.name49 = name49;
  354.     }
  355.     public String getName50() {
  356.         return name50;
  357.     }
  358.     public void setName50(String name50) {
  359.         this.name50 = name50;
  360.     }
  361. }
复制代码
创建吸取Kafka消息的实体SinkLog:
  1. package com.slink.entity;
  2. import com.alibaba.fastjson.JSON;
  3. import com.slink.util.EmptyNullUtil;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.io.Serializable;
  8. public class SinkLog implements Serializable {
  9.     private static final Logger log = LoggerFactory.getLogger(SinkLog.class);
  10.     /**
  11.      * 请求时间(2024-10-28T17:06:51+08:00)
  12.      */
  13.     private String timestamp;
  14.     /**
  15.      * 客户端地址
  16.      */
  17.     private String remoteAddr;
  18.     /**
  19.      * HTTP请求状态
  20.      */
  21.     private int status;
  22.     /**
  23.      * 请求的URI和HTTP协议(POST /sink/ HTTP/1.1)
  24.      */
  25.     private String request;
  26.     /**
  27.      * 后台upstream的地址,即真正提供服务的主机地址(127.0.0.1:80)
  28.      */
  29.     private String upstrAddr;
  30.     /**
  31.      * 发送给客户端文件内容大小
  32.      */
  33.     private long bytes;
  34.     /**
  35.      * 请求参数主体
  36.      */
  37.     private Student requestBody;
  38.     /**
  39.      * 用户终端浏览器等信息(Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; SV1; GTB7.0; .NET4.0C)
  40.      */
  41.     private String agent;
  42.     public String getTimestamp() {
  43.         return timestamp;
  44.     }
  45.     public void setTimestamp(String timestamp) {
  46.         this.timestamp = timestamp;
  47.     }
  48.     public String getRemoteAddr() {
  49.         return remoteAddr;
  50.     }
  51.     public void setRemoteAddr(String remoteAddr) {
  52.         this.remoteAddr = remoteAddr;
  53.     }
  54.     public int getStatus() {
  55.         return status;
  56.     }
  57.     public void setStatus(int status) {
  58.         this.status = status;
  59.     }
  60.     public String getRequest() {
  61.         return request;
  62.     }
  63.     public void setRequest(String request) {
  64.         this.request = request;
  65.     }
  66.     public String getUpstrAddr() {
  67.         return upstrAddr;
  68.     }
  69.     public void setUpstrAddr(String upstrAddr) {
  70.         this.upstrAddr = upstrAddr;
  71.     }
  72.     public long getBytes() {
  73.         return bytes;
  74.     }
  75.     public void setBytes(long bytes) {
  76.         this.bytes = bytes;
  77.     }
  78.     public Student getRequestBody() {
  79.         return requestBody;
  80.     }
  81.     public void setRequestBody(Student requestBody) {
  82.         this.requestBody = requestBody;
  83.     }
  84.     public String getAgent() {
  85.         return agent;
  86.     }
  87.     public void setAgent(String agent) {
  88.         this.agent = agent;
  89.     }
  90.     /**
  91.      * 解析字符串成Student对象
  92.      */
  93.     public static Student build(String val) {
  94.         if (StringUtils.isEmpty(val) || !val.contains("/sink")) {
  95.             return null;
  96.         }
  97.         log.info("消息:{}", val);
  98.         try {
  99.             val = val.replace("\", StringUtils.EMPTY);
  100.             Student student = JSON.parseObject(val, SinkLog.class).getRequestBody();
  101.             EmptyNullUtil.stringNullToEmpty(student);
  102.             return student;
  103.         } catch (Exception e) {
  104.             log.error("解析字符串成Student对象异常:", e);
  105.             return null;
  106.         }
  107.     }
  108. }
复制代码
创建读取配置文件实体(SinkProperties)以及方法:
  1. package com.slink.properties;
  2. import org.apache.commons.lang3.StringUtils;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.Serializable;
  6. import java.util.Properties;
  7. public class SinkProperties implements Serializable {
  8.     private String jdbcDriver;
  9.     private String jdbcUrl;
  10.     private String jdbcUsername;
  11.     private String jdbcPassword;
  12.     private int jdbcDruidInitialSize;
  13.     private int jdbcDruidMaxActive;
  14.     private int jdbcDruidMinIdle;
  15.     private int jdbcDruidMaxWait;
  16.     private long jdbcDruidTimeBetweenEvictionRunsMillis;
  17.     private long jdbcDruidMaxEvictableIdleTimeMillis;
  18.     private long jdbcDruidMinEvictableIdleTimeMillis;
  19.     private boolean jdbcDruidTestWhileIdle;
  20.     private boolean jdbcDruidTestOnBorrow;
  21.     private boolean jdbcDruidTestOnReturn;
  22.     private boolean jdbcDruidPoolPreparedStatements;
  23.     private int jdbcDruidMaxPoolPreparedStatementPerConnectionSize;
  24.     private String jdbcDruidFilters;
  25.     private String jdbcDruidValidationQuery;
  26.     private String kafkaServer;
  27.     private String kafkaConsumerGroupId;
  28.     private String kafkaKeySerializer;
  29.     private String kafkaValueSerializer;
  30.     private String kafkaUserEventTopic;
  31.     private String kafkaAutoOffsetReset;
  32.     public String getJdbcDriver() {
  33.         return jdbcDriver;
  34.     }
  35.     public void setJdbcDriver(String jdbcDriver) {
  36.         this.jdbcDriver = jdbcDriver;
  37.     }
  38.     public String getJdbcUrl() {
  39.         return jdbcUrl;
  40.     }
  41.     public void setJdbcUrl(String jdbcUrl) {
  42.         this.jdbcUrl = jdbcUrl;
  43.     }
  44.     public String getJdbcUsername() {
  45.         return jdbcUsername;
  46.     }
  47.     public void setJdbcUsername(String jdbcUsername) {
  48.         this.jdbcUsername = jdbcUsername;
  49.     }
  50.     public String getJdbcPassword() {
  51.         return jdbcPassword;
  52.     }
  53.     public void setJdbcPassword(String jdbcPassword) {
  54.         this.jdbcPassword = jdbcPassword;
  55.     }
  56.     public int getJdbcDruidInitialSize() {
  57.         return jdbcDruidInitialSize;
  58.     }
  59.     public void setJdbcDruidInitialSize(int jdbcDruidInitialSize) {
  60.         this.jdbcDruidInitialSize = jdbcDruidInitialSize;
  61.     }
  62.     public int getJdbcDruidMaxActive() {
  63.         return jdbcDruidMaxActive;
  64.     }
  65.     public void setJdbcDruidMaxActive(int jdbcDruidMaxActive) {
  66.         this.jdbcDruidMaxActive = jdbcDruidMaxActive;
  67.     }
  68.     public int getJdbcDruidMinIdle() {
  69.         return jdbcDruidMinIdle;
  70.     }
  71.     public void setJdbcDruidMinIdle(int jdbcDruidMinIdle) {
  72.         this.jdbcDruidMinIdle = jdbcDruidMinIdle;
  73.     }
  74.     public int getJdbcDruidMaxWait() {
  75.         return jdbcDruidMaxWait;
  76.     }
  77.     public void setJdbcDruidMaxWait(int jdbcDruidMaxWait) {
  78.         this.jdbcDruidMaxWait = jdbcDruidMaxWait;
  79.     }
  80.     public long getJdbcDruidTimeBetweenEvictionRunsMillis() {
  81.         return jdbcDruidTimeBetweenEvictionRunsMillis;
  82.     }
  83.     public void setJdbcDruidTimeBetweenEvictionRunsMillis(long jdbcDruidTimeBetweenEvictionRunsMillis) {
  84.         this.jdbcDruidTimeBetweenEvictionRunsMillis = jdbcDruidTimeBetweenEvictionRunsMillis;
  85.     }
  86.     public long getJdbcDruidMaxEvictableIdleTimeMillis() {
  87.         return jdbcDruidMaxEvictableIdleTimeMillis;
  88.     }
  89.     public void setJdbcDruidMaxEvictableIdleTimeMillis(long jdbcDruidMaxEvictableIdleTimeMillis) {
  90.         this.jdbcDruidMaxEvictableIdleTimeMillis = jdbcDruidMaxEvictableIdleTimeMillis;
  91.     }
  92.     public long getJdbcDruidMinEvictableIdleTimeMillis() {
  93.         return jdbcDruidMinEvictableIdleTimeMillis;
  94.     }
  95.     public void setJdbcDruidMinEvictableIdleTimeMillis(long jdbcDruidMinEvictableIdleTimeMillis) {
  96.         this.jdbcDruidMinEvictableIdleTimeMillis = jdbcDruidMinEvictableIdleTimeMillis;
  97.     }
  98.     public boolean isJdbcDruidTestWhileIdle() {
  99.         return jdbcDruidTestWhileIdle;
  100.     }
  101.     public void setJdbcDruidTestWhileIdle(boolean jdbcDruidTestWhileIdle) {
  102.         this.jdbcDruidTestWhileIdle = jdbcDruidTestWhileIdle;
  103.     }
  104.     public boolean isJdbcDruidTestOnBorrow() {
  105.         return jdbcDruidTestOnBorrow;
  106.     }
  107.     public void setJdbcDruidTestOnBorrow(boolean jdbcDruidTestOnBorrow) {
  108.         this.jdbcDruidTestOnBorrow = jdbcDruidTestOnBorrow;
  109.     }
  110.     public boolean isJdbcDruidTestOnReturn() {
  111.         return jdbcDruidTestOnReturn;
  112.     }
  113.     public void setJdbcDruidTestOnReturn(boolean jdbcDruidTestOnReturn) {
  114.         this.jdbcDruidTestOnReturn = jdbcDruidTestOnReturn;
  115.     }
  116.     public boolean isJdbcDruidPoolPreparedStatements() {
  117.         return jdbcDruidPoolPreparedStatements;
  118.     }
  119.     public void setJdbcDruidPoolPreparedStatements(boolean jdbcDruidPoolPreparedStatements) {
  120.         this.jdbcDruidPoolPreparedStatements = jdbcDruidPoolPreparedStatements;
  121.     }
  122.     public int getJdbcDruidMaxPoolPreparedStatementPerConnectionSize() {
  123.         return jdbcDruidMaxPoolPreparedStatementPerConnectionSize;
  124.     }
  125.     public void setJdbcDruidMaxPoolPreparedStatementPerConnectionSize(int jdbcDruidMaxPoolPreparedStatementPerConnectionSize) {
  126.         this.jdbcDruidMaxPoolPreparedStatementPerConnectionSize = jdbcDruidMaxPoolPreparedStatementPerConnectionSize;
  127.     }
  128.     public String getJdbcDruidFilters() {
  129.         return jdbcDruidFilters;
  130.     }
  131.     public void setJdbcDruidFilters(String jdbcDruidFilters) {
  132.         this.jdbcDruidFilters = jdbcDruidFilters;
  133.     }
  134.     public String getJdbcDruidValidationQuery() {
  135.         return jdbcDruidValidationQuery;
  136.     }
  137.     public void setJdbcDruidValidationQuery(String jdbcDruidValidationQuery) {
  138.         this.jdbcDruidValidationQuery = jdbcDruidValidationQuery;
  139.     }
  140.     public String getKafkaServer() {
  141.         return kafkaServer;
  142.     }
  143.     public void setKafkaServer(String kafkaServer) {
  144.         this.kafkaServer = kafkaServer;
  145.     }
  146.     public String getKafkaConsumerGroupId() {
  147.         return kafkaConsumerGroupId;
  148.     }
  149.     public void setKafkaConsumerGroupId(String kafkaConsumerGroupId) {
  150.         this.kafkaConsumerGroupId = kafkaConsumerGroupId;
  151.     }
  152.     public String getKafkaKeySerializer() {
  153.         return kafkaKeySerializer;
  154.     }
  155.     public void setKafkaKeySerializer(String kafkaKeySerializer) {
  156.         this.kafkaKeySerializer = kafkaKeySerializer;
  157.     }
  158.     public String getKafkaValueSerializer() {
  159.         return kafkaValueSerializer;
  160.     }
  161.     public void setKafkaValueSerializer(String kafkaValueSerializer) {
  162.         this.kafkaValueSerializer = kafkaValueSerializer;
  163.     }
  164.     public String getKafkaUserEventTopic() {
  165.         return kafkaUserEventTopic;
  166.     }
  167.     public void setKafkaUserEventTopic(String kafkaUserEventTopic) {
  168.         this.kafkaUserEventTopic = kafkaUserEventTopic;
  169.     }
  170.     public String getKafkaAutoOffsetReset() {
  171.         return kafkaAutoOffsetReset;
  172.     }
  173.     public void setKafkaAutoOffsetReset(String kafkaAutoOffsetReset) {
  174.         this.kafkaAutoOffsetReset = kafkaAutoOffsetReset;
  175.     }
  176.     public void create() throws IOException {
  177.         String active = System.getenv("active");
  178.         active = StringUtils.isEmpty(active) ? "dev" : active;
  179.         String resource = "application-".concat(active).concat(".properties");
  180.         InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(resource);
  181.         Properties properties = new Properties();
  182.         properties.load(inputStream);
  183.         properties.list(System.out);
  184.         this.jdbcDriver = properties.getProperty("jdbc.driver");
  185.         this.jdbcUrl = properties.getProperty("jdbc.url");
  186.         this.jdbcUsername = properties.getProperty("jdbc.username");
  187.         this.jdbcPassword = properties.getProperty("jdbc.password");
  188.         this.jdbcDruidInitialSize = Integer.parseInt(properties.getProperty("jdbc.druidInitialSize"));
  189.         this.jdbcDruidMaxActive = Integer.parseInt(properties.getProperty("jdbc.druidMaxActive"));
  190.         this.jdbcDruidMinIdle = Integer.parseInt(properties.getProperty("jdbc.druidMinIdle"));
  191.         this.jdbcDruidMaxWait = Integer.parseInt(properties.getProperty("jdbc.druidMaxWait"));
  192.         this.jdbcDruidTimeBetweenEvictionRunsMillis = Long.parseLong(properties.getProperty("jdbc.druidTimeBetweenEvictionRunsMillis"));
  193.         this.jdbcDruidMaxEvictableIdleTimeMillis = Long.parseLong(properties.getProperty("jdbc.druidMaxEvictableIdleTimeMillis"));
  194.         this.jdbcDruidMinEvictableIdleTimeMillis = Long.parseLong(properties.getProperty("jdbc.druidMinEvictableIdleTimeMillis"));
  195.         this.jdbcDruidTestWhileIdle = Boolean.valueOf(properties.getProperty("jdbc.druidTestWhileIdle"));
  196.         this.jdbcDruidTestOnBorrow = Boolean.valueOf(properties.getProperty("jdbc.druidTestOnBorrow"));
  197.         this.jdbcDruidTestOnReturn = Boolean.valueOf(properties.getProperty("jdbc.druidTestOnReturn"));
  198.         this.jdbcDruidPoolPreparedStatements = Boolean.valueOf(properties.getProperty("jdbc.druidPoolPreparedStatements"));
  199.         this.jdbcDruidMaxPoolPreparedStatementPerConnectionSize = Integer.parseInt(properties.getProperty("jdbc.druidMaxPoolPreparedStatementPerConnectionSize"));
  200.         this.jdbcDruidFilters = properties.getProperty("jdbc.druidFilters");
  201.         this.jdbcDruidValidationQuery = properties.getProperty("jdbc.druidValidationQuery");
  202.         this.kafkaServer = properties.getProperty("kafka.servers");
  203.         this.kafkaConsumerGroupId = properties.getProperty("kafka.consumer.groupId");
  204.         this.kafkaKeySerializer = properties.getProperty("kafka.key.serializer");
  205.         this.kafkaValueSerializer = properties.getProperty("kafka.value.serializer");
  206.         this.kafkaUserEventTopic = properties.getProperty("kafka.user.event.topic");
  207.         this.kafkaAutoOffsetReset = properties.getProperty("kafka.auto.offset.reset");
  208.     }
  209. }
复制代码

主体函数
主要实现读取kafka消息,使用map和json把消息转换为Student对象数据流
创建10秒的滚动窗口聚合Student数据,末了调用自定义sink存入至于mysql
  1. package com.slink;
  2. import com.slink.entity.Student;
  3. import com.slink.function.StudentProcessWindowFunction;
  4. import com.slink.properties.SinkProperties;
  5. import com.slink.util.Constant;
  6. import com.slink.util.KafkaSourceUtil;
  7. import com.slink.util.SinkFunctionUtil;
  8. import com.slink.util.StreamExecutionEnvironmentUtil;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. public class StudentRunner {
  15.     public static void main(String[] args) throws Exception {
  16.         // 创建流处理执行环境
  17.         final StreamExecutionEnvironment env = StreamExecutionEnvironmentUtil.buildStreamExecutionEnvironment();
  18.         // 读取环境变量 jdbc、kafka
  19.         SinkProperties properties = new SinkProperties();
  20.         properties.create();
  21.         // 构建kafka源
  22.         DataStream<Student> kafkaOut = KafkaSourceUtil.buildDataStream(properties, env, Constant.TWO);
  23.         // 构建SinkFunction
  24.         SinkFunction<Student> sinkFunction = SinkFunctionUtil.buildSinkFunction(properties);
  25.         /**
  26.          * 将清洗后的数据通过窗口去聚合(每10秒滚动窗口聚合一次)写入Mysql
  27.          * 在数据清洗完成后,将数据写入Mysql数据库。这里我们设置了写入Mysql的并行度为2(setParallelism(2))。
  28.          * 这意味着将有2个并发任务负责将数据写入到Mysql。由于Mysql的写入通常涉及磁盘I/O操作,设置较低的并行度可以避免I/O争用
  29.          */
  30.         // 全局10秒滚动窗口
  31.         kafkaOut.rebalance().windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  32.                 .process(new StudentProcessWindowFunction())
  33.                 .disableChaining()
  34.                 .addSink(sinkFunction)
  35.                 .name("KafkaFoMysql");
  36. //                .setParallelism(2);
  37.         kafkaOut.print(); //调度输出
  38.         env.execute("flink kafka to Mysql");
  39.     }
  40. }
复制代码
StreamExecutionEnvironmentUtil:
  1. package com.slink.util;
  2. import org.apache.flink.streaming.api.CheckpointingMode;
  3. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. public class StreamExecutionEnvironmentUtil {
  6.     private static final String statebackend_address = "file:/Users/xumingzhong/Desktop/xmz";
  7.     public static StreamExecutionEnvironment buildStreamExecutionEnvironment(){
  8.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9.         /**
  10.          * 设置全局并行度(默认并行度)所有算子,默认的并行度就都为6。一般不会在程序中设置全局并行度。
  11.          * 因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
  12.          * 注意:由于keyBy不是算子,所以无法对keyBy设置并行度
  13.          */
  14. //        env.setParallelism(1);
  15.         //每隔10s进行启动一个检查点【设置checkpoint的周期】
  16.         env.enableCheckpointing(10000);
  17.         //设置EXACTLY_ONCE语义,默认就是这个
  18.         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  19.         //确保检查点之间有1s的时间间隔【checkpoint最小间隔】
  20.         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
  21.         //检查点必须在60s之内完成,或者被丢弃【checkpoint超时时间】
  22.         env.getCheckpointConfig().setCheckpointTimeout(60000);
  23.         //同一时间只允许进行一次检查点
  24.         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  25.         //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
  26.         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  27.         //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
  28. //        env.setStateBackend(new FsStateBackend(statebackend_address));
  29.         return env;
  30.     }
  31. }
复制代码
SinkFunctionUtil:
  1. package com.slink.util;
  2. import com.slink.entity.Student;
  3. import com.slink.properties.SinkProperties;
  4. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  5. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  6. import org.apache.flink.connector.jdbc.JdbcSink;
  7. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  8. import java.sql.PreparedStatement;
  9. public class SinkFunctionUtil {
  10.     /**
  11.      * 插入Student表SQL语句
  12.      */
  13.     private static final String STUDENT_INSERT = "insert into student(name1, name2, name3, name4, name5, name6, name7, name8, name9, name10, name11, name12, name13, name14, name15, name16, name17, name18, name19, name20, name21, name22, name23, name24, name25, name26, name27, name28, name29, name30, name31, name32, name33, name34, name35, name36, name37, name38, name39, name40, name41, name42, name43, name44, name45, name46, name47, name48, name49, name50) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
  14.     /**
  15.      * 构建SinkFunction
  16.      */
  17.     public static SinkFunction<Student> buildSinkFunction(SinkProperties properties){
  18.         return JdbcSink.sink(STUDENT_INSERT,   // SQL 插入语句
  19.                 (PreparedStatement ps, Student student) -> {
  20.                     ps.setString(1, student.getName1());
  21.                     ps.setString(2, student.getName2());
  22.                     ps.setString(3, student.getName3());
  23.                     ps.setString(4, student.getName4());
  24.                     ps.setString(5, student.getName5());
  25.                     ps.setString(6, student.getName6());
  26.                     ps.setString(7, student.getName7());
  27.                     ps.setString(8, student.getName8());
  28.                     ps.setString(9, student.getName9());
  29.                     ps.setString(10, student.getName10());
  30.                     ps.setString(11, student.getName11());
  31.                     ps.setString(12, student.getName12());
  32.                     ps.setString(13, student.getName13());
  33.                     ps.setString(14, student.getName14());
  34.                     ps.setString(15, student.getName15());
  35.                     ps.setString(16, student.getName16());
  36.                     ps.setString(17, student.getName17());
  37.                     ps.setString(18, student.getName18());
  38.                     ps.setString(19, student.getName19());
  39.                     ps.setString(20, student.getName20());
  40.                     ps.setString(21, student.getName21());
  41.                     ps.setString(22, student.getName22());
  42.                     ps.setString(23, student.getName23());
  43.                     ps.setString(24, student.getName24());
  44.                     ps.setString(25, student.getName25());
  45.                     ps.setString(26, student.getName26());
  46.                     ps.setString(27, student.getName27());
  47.                     ps.setString(28, student.getName28());
  48.                     ps.setString(29, student.getName29());
  49.                     ps.setString(30, student.getName30());
  50.                     ps.setString(31, student.getName31());
  51.                     ps.setString(32, student.getName32());
  52.                     ps.setString(33, student.getName33());
  53.                     ps.setString(34, student.getName34());
  54.                     ps.setString(35, student.getName35());
  55.                     ps.setString(36, student.getName36());
  56.                     ps.setString(37, student.getName37());
  57.                     ps.setString(38, student.getName38());
  58.                     ps.setString(39, student.getName39());
  59.                     ps.setString(40, student.getName40());
  60.                     ps.setString(41, student.getName41());
  61.                     ps.setString(42, student.getName42());
  62.                     ps.setString(43, student.getName43());
  63.                     ps.setString(44, student.getName44());
  64.                     ps.setString(45, student.getName45());
  65.                     ps.setString(46, student.getName46());
  66.                     ps.setString(47, student.getName47());
  67.                     ps.setString(48, student.getName48());
  68.                     ps.setString(49, student.getName49());
  69.                     ps.setString(50, student.getName50());
  70.                 },
  71.                 getJdbcExecutionOptions(properties),
  72.                 getJdbcConnectionOptions(properties)
  73.         );
  74.     }
  75.     /**
  76.      * 设置jdbc批处理
  77.      */
  78.     private static JdbcExecutionOptions getJdbcExecutionOptions(SinkProperties properties) {
  79.         return JdbcExecutionOptions.builder()
  80.                 .withBatchSize(5000)            // 设置批量插入大小
  81.                 .withBatchIntervalMs(200)       // 设置批量插入的时间间隔-毫秒
  82.                 .withMaxRetries(3)            // 设置最大重试次数
  83.                 .build();
  84.     }
  85.     /**
  86.      * 设置jdbc连接
  87.      */
  88.     public static JdbcConnectionOptions getJdbcConnectionOptions(SinkProperties properties){
  89.         return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  90.                 .withUrl(properties.getJdbcUrl())
  91.                 .withDriverName(properties.getJdbcDriver())
  92.                 .withUsername(properties.getJdbcUsername())
  93.                 .withPassword(properties.getJdbcPassword())
  94.                 //.withDataSource(dataSource)   1.17.X版本支持数据池连接配置,需JDK11
  95.                 .build();
  96.     }
  97. }
复制代码
KafkaSourceUtil:
  1. package com.slink.util;
  2. import com.slink.entity.SinkLog;
  3. import com.slink.entity.Student;
  4. import com.slink.properties.SinkProperties;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  11. import java.util.Properties;
  12. public class KafkaSourceUtil {
  13.     /**
  14.      * 设置Kafka源
  15.      * Kafka 消费并行度:通过 setParallelism(x) 为从 Kafka 读取数据的操作设置了并行度为x。
  16.      * 也就是说,Flink 将会启动 x 个并行任务来从Kafka 的 kafkaTopic 主题中消费数据。
  17.      * 这个并行度可以根据 Kafka 分区的数量调整。如果 Kafka 有 x 个分区,那么设置并行度为 x 是合理的,
  18.      * 这样可以保证每个分区都有一个并发实例进行处理
  19.      */
  20.     public static DataStream<Student> buildDataStream(SinkProperties properties, StreamExecutionEnvironment env, int parallelism) {
  21.         // 构建kafka环境变量对象
  22.         Properties props = new Properties();
  23.         props.put("bootstrap.servers", properties.getKafkaServer());
  24.         props.put("group.id", properties.getKafkaConsumerGroupId());
  25.         props.put("auto.offset.reset", properties.getKafkaAutoOffsetReset());
  26.         return env.addSource(new FlinkKafkaConsumer<>(
  27.                         properties.getKafkaUserEventTopic(),
  28.                         new SimpleStringSchema(),
  29.                         props)).setParallelism(parallelism)
  30.                 .map(new MapFunction<String, Student>() {
  31.                     @Override
  32.                     public Student map(String val) {
  33.                         // 解析字符串转换Student对象
  34.                         return SinkLog.build(val);
  35.                     }
  36.                 }).filter(v -> v != null);
  37.     }
  38. }
复制代码
工具类:EmptyNullUtil
  1. package com.slink.util;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import java.lang.reflect.Field;
  6. import java.lang.reflect.Method;
  7. public class EmptyNullUtil {
  8.     private static final Logger log = LoggerFactory.getLogger(EmptyNullUtil.class);
  9.     /**
  10.      * 将属性类型为String,值为null的设置为 ""
  11.      */
  12.     public static <T> void stringNullToEmpty(T t) {
  13.         if (null == t) {
  14.             return;
  15.         }
  16.         Field[] declaredFields = t.getClass().getDeclaredFields();
  17.         for (Field field : declaredFields) {
  18.             field.setAccessible(true);
  19.             if (field.getType().equals(String.class)) {
  20.                 // 将属性的首字母大写
  21.                 String methodName = field.getName().replaceFirst(field.getName().substring(0, 1), field.getName().substring(0, 1).toUpperCase());
  22.                 try {
  23.                     Method methodGet = t.getClass().getMethod("get" + methodName);
  24.                     // 调用getter方法获取属性值
  25.                     String str = (String) methodGet.invoke(t);
  26.                     if (StringUtils.isBlank(str)) {
  27.                         // 如果为null的String类型的属性则重新复制为空字符串
  28.                         field.set(t, field.getType().getConstructor(field.getType()).newInstance(StringUtils.EMPTY));
  29.                     }
  30.                 } catch (Exception e) {
  31.                     log.warn("[EmptyNullUtil.stringBlankToNull] e:{}", e);
  32.                 }
  33.             }
  34.         }
  35.     }
  36. }
复制代码
窗口函数自定义处理类:StudentProcessWindowFunction
  1. package com.slink.function;
  2. import com.slink.StudentRunner;
  3. import com.slink.entity.Student;
  4. import org.apache.commons.compress.utils.Lists;
  5. import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  7. import org.apache.flink.util.Collector;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import java.util.List;
  11. import java.util.stream.Collectors;
  12. import java.util.stream.StreamSupport;
  13. public class StudentProcessWindowFunction extends ProcessAllWindowFunction<Student, Student, TimeWindow> {
  14.     private static final Logger log = LoggerFactory.getLogger(StudentRunner.class);
  15.     @Override
  16.     public void process(ProcessAllWindowFunction<Student, Student, TimeWindow>.Context context, Iterable<Student> iterable, Collector<Student> out) {
  17.         log.info("窗口聚合数据条数:{}", StreamSupport.stream(iterable.spliterator(), Boolean.FALSE).collect(Collectors.toList()).size());
  18.         // 直接将窗口内的每个 Student 输出
  19.         for (Student order : iterable) {
  20.             out.collect(order);
  21.         }
  22.     }
  23. }
复制代码
四、运行测试

模拟数据每秒推送一条数据至Kafka:
  1. package com.slink;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. public class Test2 {
  8.     public static void main(String[] args) {
  9.         String value = "{ "timestamp": "2024-11-05T17:38:56+08:00", "remoteAddr": "61.132.71.10","costime": "0.000","realtime": "0.000","status": 405,"xForwarded": "","referer": "","request": "POST /sink HTTP/1.1","upstrAddr": "127.0.0.1:80","bytes":157,"requestBody":{"name39":"拓扑丝路39号","name38":"拓扑丝路38号","name37":"拓扑丝路37号","name36":"拓扑丝路36号","name35":"拓扑丝路35号","name34":"拓扑丝路34号","name33":"拓扑丝路33号","name32":"拓扑丝路32号","name31":"拓扑丝路31号","name30":"拓扑丝路30号","name29":"拓扑丝路29号","name28":"拓扑丝路28号","name27":"拓扑丝路27号","name6":"拓扑丝路6号","name26":"拓扑丝路26号","name5":"拓扑丝路5号","name25":"拓扑丝路25号","name4":"拓扑丝路4号","name24":"拓扑丝路24号","name3":"拓扑丝路3号","name23":"拓扑丝路23号","name22":"拓扑丝路22号","name9":"拓扑丝路9号","name21":"拓扑丝路21号","name8":"拓扑丝路8号","name20":"拓扑丝路20号","name7":"拓扑丝路7号","name2":"拓扑丝路2号","name1":"拓扑丝路1号","name19":"拓扑丝路19号","name18":"拓扑丝路18号","name17":"拓扑丝路17号","name16":"拓扑丝路16号","name15":"拓扑丝路15号","name14":"拓扑丝路14号","name13":"拓扑丝路13号","name12":"拓扑丝路12号","name11":"拓扑丝路11号","name10":"拓扑丝路10号","name50":"拓扑丝路50号","name49":"拓扑丝路49号","name48":"拓扑丝路48号","name47":"拓扑丝路47号","name46":"拓扑丝路46号","name45":"拓扑丝路45号","name44":"拓扑丝路44号","name43":"拓扑丝路43号","name42":"拓扑丝路42号","name41":"拓扑丝路41号","name40":"拓扑丝路40号"},"agent": "PostmanRuntime/7.42.0" }";
  10.         //创建生产者
  11.         Properties properties = new Properties();
  12.         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
  13.         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  15.         //优化参数
  16.         properties.put(ProducerConfig.BATCH_SIZE_CONFIG,1024*1024);//生产者尝试缓存记录,为每一个分区缓存一个mb的数据
  17.         properties.put(ProducerConfig.LINGER_MS_CONFIG,500);//最多等待0.5秒.
  18.         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
  19.         for(int i=0; i<10000; i++){
  20.             ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "key001", value);
  21.             kafkaProducer.send(record);
  22.             try {
  23.                 Thread.sleep(1000);
  24.             } catch (InterruptedException e) {
  25.                 e.printStackTrace();
  26.             }
  27.         }
  28.         kafkaProducer.flush();
  29.         kafkaProducer.close();
  30.     }
  31. }
复制代码
运行主体函数:StudentRunner

查询Mysql日记
​​​​​​​
实例运行乐成
五、总结

本文实例了实现了从Kafka及时读取数据,根据定制化处理数据,通过flink窗口模式批量写入数据库mysql,可根据自身需求写入其他存储(ES、Redis等)。该实例适合在对数据库及时性要求不高,大概是准及时数据分析时的场景。如若数据量大的环境下,聚合十秒钟数据达万条,那么这样批量写会比单条性能进步很多倍。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表