SpringBoot Kafka发送消息与接收消息实例

打印 上一主题 下一主题

主题 847|帖子 847|积分 2541

媒介 
Kafka的根本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(署理),这样就可以大抵描画出这样一个局面:
生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操纵。 
 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>2.7.4</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <groupId>com.example</groupId>
  12.     <artifactId>SpringBootKafka</artifactId>
  13.     <version>0.0.1-SNAPSHOT</version>
  14.     <name>SpringBootKafka</name>
  15.     <description>SpringBootKafka</description>
  16.     <properties>
  17.         <java.version>1.8</java.version>
  18.     </properties>
  19.     <dependencies>
  20.         <dependency>
  21.             <groupId>org.springframework.boot</groupId>
  22.             <artifactId>spring-boot-starter-web</artifactId>
  23.         </dependency>
  24.         <dependency>
  25.             <groupId>org.springframework.boot</groupId>
  26.             <artifactId>spring-boot-starter-test</artifactId>
  27.             <scope>test</scope>
  28.         </dependency>
  29.         <!-- pom.xml -->
  30.         <dependency>
  31.             <groupId>org.springframework.kafka</groupId>
  32.             <artifactId>spring-kafka</artifactId>
  33.         </dependency>
  34.         <dependency>
  35.             <groupId>log4j</groupId>
  36.             <artifactId>log4j</artifactId>
  37.             <version>1.2.17</version>
  38.         </dependency>
  39.         <dependency>
  40.             <groupId>org.projectlombok</groupId>
  41.             <artifactId>lombok</artifactId>
  42.         </dependency>
  43.         <dependency>
  44.             <groupId>org.apache.kafka</groupId>
  45.             <artifactId>kafka-streams</artifactId>
  46.         </dependency>
  47.     </dependencies>
  48.     <repositories>
  49.         <repository>
  50.             <id>central</id>
  51.             <name>aliyun maven</name>
  52.             <url>https://maven.aliyun.com/repository/public/</url>
  53.             <layout>default</layout>
  54.             <!-- 是否开启发布版构件下载 -->
  55.             <releases>
  56.                 <enabled>true</enabled>
  57.             </releases>
  58.             <!-- 是否开启快照版构件下载 -->
  59.             <snapshots>
  60.                 <enabled>false</enabled>
  61.             </snapshots>
  62.         </repository>
  63.     </repositories>
  64.     <build>
  65.         <plugins>
  66.             <plugin>
  67.                 <groupId>org.springframework.boot</groupId>
  68.                 <artifactId>spring-boot-maven-plugin</artifactId>
  69.             </plugin>
  70.         </plugins>
  71.     </build>
  72. </project>
复制代码
2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 192.168.110.105:9092
  4.     #streams:
  5.       #application-id: my-streams-app
  6.     consumer:
  7.       group-id: myGroupId
  8.       auto-offset-reset: latest
  9.       enable-auto-commit: true
  10.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12.     producer:
  13.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  14.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15.       retries: 5
复制代码
3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果
  1. package com.example.springbootkafka.service;
  2. import com.example.springbootkafka.entity.User;
  3. import com.fasterxml.jackson.core.JsonProcessingException;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.kafka.support.SendResult;
  9. import org.springframework.stereotype.Service;
  10. import org.springframework.util.concurrent.ListenableFuture;
  11. import java.util.concurrent.CompletableFuture;
  12. import java.util.concurrent.Future;
  13. @Slf4j
  14. @Service
  15. public class KafkaProducer {
  16.     private final KafkaTemplate<String, String> kafkaTemplate;
  17.     private final ObjectMapper objectMapper;
  18.     @Autowired
  19.     public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
  20.         this.kafkaTemplate = kafkaTemplate;
  21.         this.objectMapper = objectMapper;
  22.     }
  23.     public void sendMessage(String message) {
  24.         log.info("KafkaProducer message:{}", message);
  25.         //kafkaTemplate.send("test", message).addCallback();
  26.         Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);
  27.         CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
  28.             try {
  29.                 future.get(); // 等待原始future完成
  30.             } catch (Exception e) {
  31.                 throw new RuntimeException(e);
  32.             }
  33.         });
  34. // 使用whenComplete方法
  35.         completableFuture.whenComplete((result, ex) -> {
  36.             if (ex != null) {
  37.                 System.out.println("Error occurred: " + ex.getMessage());
  38.                 // 成功发送
  39.             } else {
  40.                 System.out.println("Completed successfully");
  41.             }
  42.         });
  43.         /*future.whenComplete((result, ex) -> {
  44.             if (ex == null) {
  45.                 // 成功发送
  46.                 RecordMetadata metadata = result.getRecordMetadata();
  47.                 System.out.println("Message sent successfully with offset: " + metadata.offset());
  48.             } else {
  49.                 // 发送失败
  50.                 System.err.println("Failed to send message due to: " + ex.getMessage());
  51.             }
  52.         });*/
  53.     }
  54.     public void sendUser(User user) throws JsonProcessingException {
  55.         //final ProducerRecord<String, String> record = createRecord(data);
  56.         //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);
  57.         //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);
  58.         String userJson = objectMapper.writeValueAsString(user);
  59.         ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);
  60.         /*future.addCallback(
  61.                 success -> System.out.println("Message sent successfully: " + userJson),
  62.                 failure -> System.err.println("Failed to send message: " + failure.getMessage())
  63.         );*/
  64.         CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
  65.             try {
  66.                 future.get(); // 等待原始future完成
  67.             } catch (Exception e) {
  68.                 throw new RuntimeException(e);
  69.             }
  70.         });
  71.         completableFuture.whenComplete((result, ex) -> {
  72.             if (ex != null) {
  73.                 System.out.println("Error occurred: " + ex.getMessage());
  74.                 // 成功发送
  75.             } else {
  76.                 System.out.println("Completed successfully");
  77.             }
  78.         });
  79.     }
  80. }
复制代码
 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息
  1. package com.example.springbootkafka.service;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.log4j.Logger;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Service;
  6. @Slf4j
  7. @Service
  8. public class KafkaConsumer {
  9.     @KafkaListener(topics = "test", groupId = "myGroupId")
  10.     public void consume(String message) {
  11.         System.out.println("Received message: " + message);
  12.         log.info("KafkaConsumer message:{}", message);
  13.     }
  14. }
复制代码
5.测试消息的生成与发送

  1. package com.example.springbootkafka.controller;
  2. import com.example.springbootkafka.entity.User;
  3. import com.example.springbootkafka.service.KafkaProducer;
  4. import com.fasterxml.jackson.core.JsonProcessingException;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @Slf4j
  10. @RestController
  11. public class MessageController {
  12.     private final KafkaProducer producer;
  13.     @Autowired
  14.     public MessageController(KafkaProducer producer) {
  15.         this.producer = producer;
  16.     }
  17.     @GetMapping("/send-message")
  18.     public String sendMessage() {
  19.         log.info("MessageController sendMessage start!");
  20.         producer.sendMessage("hello, Kafka!");
  21.         log.info("MessageController sendMessage end!");
  22.         return "Message sent successfully.";
  23.     }
  24.     @GetMapping("/send")
  25.     public String sendMessage1() {
  26.         log.info("MessageController sendMessage1 start!");
  27.         User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();
  28.         try {
  29.             producer.sendUser(user);
  30.         } catch (JsonProcessingException e) {
  31.             throw new RuntimeException(e);
  32.         }
  33.         log.info("MessageController sendMessage1 end!");
  34.         return "Message sendMessage1 successfully.";
  35.     }
  36. }
复制代码
 6.检察结果:


 

具体代码见https://gitee.com/dylan_2017/springboot-kafka.git

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小小小幸运

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表