IT评测·应用市场-qidao123.com

标题: 基于SpringBoot 使用 Flink 收发Kafka消息 [打印本页]

作者: 用多少眼泪才能让你相信    时间: 2023-1-6 18:52
标题: 基于SpringBoot 使用 Flink 收发Kafka消息
前言

这周学习下Flink相关的知识,学习到一个读写Kafka消息的示例, 自己动手实践了一下,别人示例使用的是普通的Java Main方法,没有用到spring boot. 我们在实际工作中会使用spring boot。 因此我做了些加强, 把流程打通了,过程记录下来。
准备工作

首先我们通过docker安装一个kafka服务,参照Kafka的官方知道文档
https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html
主要的是有个docker-compose.yml文件
  1. ---
  2. version: '2'
  3. services:
  4.   zookeeper:
  5.     image: confluentinc/cp-zookeeper:7.3.0
  6.     hostname: zookeeper
  7.     container_name: zookeeper
  8.     ports:
  9.       - "2181:2181"
  10.     environment:
  11.       ZOOKEEPER_CLIENT_PORT: 2181
  12.       ZOOKEEPER_TICK_TIME: 2000
  13.   broker:
  14.     image: confluentinc/cp-kafka:7.3.0
  15.     hostname: broker
  16.     container_name: broker
  17.     depends_on:
  18.       - zookeeper
  19.     ports:
  20.       - "29092:29092"
  21.     environment:
  22.       KAFKA_BROKER_ID: 1
  23.       KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  24.       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  25.       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
  26.       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  27.       KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
复制代码
docker compose up -d
就可以把kafka docker 环境搭起来,
使用以下命令,创建一个flink.kafka.streaming.source的topic
docker exec -t broker kafka-topics --create --topic flink.kafka.streaming.source --bootstrap-server broker:9092
然后使用命令,就可以进入到kafka机器的命令行
docker exec -it broker bash
官方文档示例中没有-it, 运行后没有进入broker的命令行,加上来才可以。这里说明下
Flink我们打算直接采用开发工具运行,暂时未搭环境,以体验为主。
开发阶段

首先需要引入的包POM文件
  1.     <properties>
  2.         <jdk.version>1.8</jdk.version>
  3.         <maven.compiler.source>8</maven.compiler.source>
  4.         <maven.compiler.target>8</maven.compiler.target>
  5.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  6.         <spring-boot.version>2.7.7</spring-boot.version>
  7.         <flink.version>1.16.0</flink.version>
  8.     </properties>
  9.     <dependencyManagement>
  10.         <dependencies>
  11.             <dependency>
  12.                 <groupId>org.springframework.boot</groupId>
  13.                 <artifactId>spring-boot-dependencies</artifactId>
  14.                 <version>${spring-boot.version}</version>
  15.                 <type>pom</type>
  16.                 <scope>import</scope>
  17.             </dependency>
  18.         </dependencies>
  19.     </dependencyManagement>
  20.     <dependencies>
  21.         <dependency>
  22.             <groupId>org.springframework.boot</groupId>
  23.             <artifactId>spring-boot-starter</artifactId>
  24.         </dependency>
  25.         <dependency>
  26.             <groupId>org.projectlombok</groupId>
  27.             <artifactId>lombok</artifactId>
  28.             <optional>true</optional>
  29.         </dependency>
  30.         <dependency>
  31.             <groupId>org.apache.flink</groupId>
  32.             <artifactId>flink-java</artifactId>
  33.             <version>${flink.version}</version>
  34.             <scope>provided</scope>
  35.         </dependency>
  36.         <dependency>
  37.             <groupId>org.apache.flink</groupId>
  38.             <artifactId>flink-clients</artifactId>
  39.             <version>${flink.version}</version>
  40.             <scope>provided</scope>
  41.         </dependency>
  42.         <dependency>
  43.             <groupId>org.apache.flink</groupId>
  44.             <artifactId>flink-streaming-java</artifactId>
  45.             <version>${flink.version}</version>
  46.             <scope>provided</scope>
  47.         </dependency>
  48.         <dependency>
  49.             <groupId>org.apache.flink</groupId>
  50.             <artifactId>flink-connector-kafka</artifactId>
  51.             <version>${flink.version}</version>
  52.             <scope>provided</scope>
  53.         </dependency>
  54.     </dependencies>
复制代码
这里我们使用Java8, 本来想使用Spring Boot 3的,但是Spring Boot 3 最低需要Java17了, 目前Flink支持Java8和Java11,所以我们使用Spring Boot 2, Java 8来开发。
spring-boot-starter 我们就一个命令行程序,所以用这个就够了
lombok  用来定义model
flink-java, flink-clients, flink-streaming-java 是使用基本组件, 缺少flink-clients编译阶段不会报错,运行的时候会报java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
flink-connector-kafka 是连接kafka用
我们这里把provided, 打包的时候不用打包flink相关组件,由运行环境提供。但是IDEA运行的时候会报java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema,
在运行的configuration上面勾选上“add dependencies with provided scope to classpath”可以解决这个问题。
主要代码
  1. @Component
  2. @Slf4j
  3. public class KafkaRunner implements ApplicationRunner
  4. {
  5.     @Override
  6.     public void run(ApplicationArguments args) throws Exception {
  7.         try{
  8.             /****************************************************************************
  9.              *                 Setup Flink environment.
  10.              ****************************************************************************/
  11.             // Set up the streaming execution environment
  12.             final StreamExecutionEnvironment streamEnv
  13.                     = StreamExecutionEnvironment.getExecutionEnvironment();
  14.             /****************************************************************************
  15.              *                  Read Kafka Topic Stream into a DataStream.
  16.              ****************************************************************************/
  17.             //Set connection properties to Kafka Cluster
  18.             Properties properties = new Properties();
  19.             properties.setProperty("bootstrap.servers", "localhost:29092");
  20.             properties.setProperty("group.id", "flink.learn.realtime");
  21.             //Setup a Kafka Consumer on Flnk
  22.             FlinkKafkaConsumer<String> kafkaConsumer =
  23.                     new FlinkKafkaConsumer<>
  24.                             ("flink.kafka.streaming.source", //topic
  25.                                     new SimpleStringSchema(), //Schema for data
  26.                                     properties); //connection properties
  27.             //Setup to receive only new messages
  28.             kafkaConsumer.setStartFromLatest();
  29.             //Create the data stream
  30.             DataStream<String> auditTrailStr = streamEnv
  31.                     .addSource(kafkaConsumer);
  32.             //Convert each record to an Object
  33.             DataStream<Tuple2<String, Integer>> userCounts
  34.                     = auditTrailStr
  35.                     .map(new MapFunction<String,Tuple2<String,Integer>>() {
  36.                         @Override
  37.                         public Tuple2<String,Integer> map(String auditStr) {
  38.                             System.out.println("--- Received Record : " + auditStr);
  39.                             AuditTrail at = new AuditTrail(auditStr);
  40.                             return new Tuple2<String,Integer>(at.getUser(),at.getDuration());
  41.                         }
  42.                     })
  43.                     .keyBy(0)  //By user name
  44.                     .reduce((x,y) -> new Tuple2<String,Integer>( x.f0, x.f1 + y.f1));
  45.             //Print User and Durations.
  46.             userCounts.print();
  47.             /****************************************************************************
  48.              *                  Setup data source and execute the Flink pipeline
  49.              ****************************************************************************/
  50.             //Start the Kafka Stream generator on a separate thread
  51.             System.out.println("Starting Kafka Data Generator...");
  52.             Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
  53.             kafkaThread.start();
  54.             // execute the streaming pipeline
  55.             streamEnv.execute("Flink Windowing Example");
  56.         }
  57.         catch(Exception e) {
  58.             e.printStackTrace();
  59.         }
  60.     }
  61. }
复制代码
简单说明下程序
DataStream auditTrailStr = streamEnv
.addSource(kafkaConsumer);
就是接通了Kafka Source
  1.         Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
  2.         kafkaThread.start();
复制代码
这段代码是另外开一个线程往kafka里面去发送文本消息
我们在这个示例中就是一个线程发,然后flink就读出来,然后统计出每个用户的操作时间。
auditTrailStr.map 就是来进行统计操作。
运行效果


可以看到Kafka一边发送,然后我们就一边读出来,然后就统计出了每个用户的时间。
总结

本文只是简单的打通了几个环节,对于flink的知识没有涉及太多,算是一个环境入门。后面学习更多的以后我们再深入些来记录flink. 示例代码会放到 https://github.com/dengkun39/redisdemo.git  spring-boot-flink 文件夹。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4