Spring Boot项目集成Kafka

打印 上一主题 下一主题

主题 908|帖子 908|积分 2724

在Spring Boot项目中集成Kafka,可以按照以下步骤进行:
1. 添加依赖

首先,在pom.xml文件中添加Kafka的依赖:
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4.     <version>2.8.0</version> <!-- 请根据需要选择合适的版本 -->
  5. </dependency>
复制代码
2. 设置Kafka

在application.yml或application.properties文件中设置Kafka的相关属性:
application.yml

  1. spring:
  2.   kafka:
  3.     bootstrap-servers: localhost:9092
  4.     consumer:
  5.       group-id: my-group
  6.       auto-offset-reset: earliest
  7.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9.     producer:
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
application.properties

  1. spring.kafka.bootstrap-servers=localhost:9092
  2. spring.kafka.consumer.group-id=my-group
  3. spring.kafka.consumer.auto-offset-reset=earliest
  4. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  5. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  6. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  7. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
复制代码
3. 创建Kafka生产者

创建一个Kafka生产者类,用于发送消息:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaProducer {
  6.     @Autowired
  7.     private KafkaTemplate<String, String> kafkaTemplate;
  8.     public void sendMessage(String topic, String message) {
  9.         kafkaTemplate.send(topic, message);
  10.     }
  11. }
复制代码
4. 创建Kafka消耗者

创建一个Kafka消耗者类,用于接收消息:
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class KafkaConsumer {
  5.     @KafkaListener(topics = "my-topic", groupId = "my-group")
  6.     public void listen(String message) {
  7.         System.out.println("Received Message: " + message);
  8.     }
  9. }
复制代码
5. 测试

创建一个控制器或测试类,用于测试生产者和消耗者的功能:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.GetMapping;
  3. import org.springframework.web.bind.annotation.RequestParam;
  4. import org.springframework.web.bind.annotation.RestController;
  5. @RestController
  6. public class KafkaController {
  7.     @Autowired
  8.     private KafkaProducer kafkaProducer;
  9.     @GetMapping("/send")
  10.     public String send(@RequestParam("message") String message) {
  11.         kafkaProducer.sendMessage("my-topic", message);
  12.         return "Message sent to the Kafka Topic";
  13.     }
  14. }
复制代码
6. 启动应用

启动Spring Boot应用,访问/send接口发送消息,查看控制台输出,确认消息是否被乐成消耗。
7. 异常处置惩罚

为了更好地处置惩罚异常情况,可以在生产者和消耗者中添加异常处置惩罚逻辑:
生产者异常处置惩罚

  1. import org.apache.kafka.clients.producer.ProducerRecord;
  2. import org.apache.kafka.clients.producer.RecordMetadata;
  3. import org.springframework.kafka.support.SendResult;
  4. import org.springframework.util.concurrent.ListenableFuture;
  5. import org.springframework.util.concurrent.ListenableFutureCallback;
  6. @Service
  7. public class KafkaProducer {
  8.     @Autowired
  9.     private KafkaTemplate<String, String> kafkaTemplate;
  10.     public void sendMessage(String topic, String message) {
  11.         ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(new ProducerRecord<>(topic, message));
  12.         future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  13.             @Override
  14.             public void onSuccess(SendResult<String, String> result) {
  15.                 RecordMetadata metadata = result.getRecordMetadata();
  16.                 System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
  17.             }
  18.             @Override
  19.             public void onFailure(Throwable ex) {
  20.                 System.err.println("Unable to send message due to : " + ex.getMessage());
  21.             }
  22.         });
  23.     }
  24. }
复制代码
消耗者异常处置惩罚

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.kafka.listener.ListenerExecutionFailedException;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class KafkaConsumer {
  7.     @KafkaListener(topics = "my-topic", groupId = "my-group")
  8.     public void listen(ConsumerRecord<String, String> record) {
  9.         try {
  10.             System.out.println("Received Message: " + record.value());
  11.         } catch (Exception e) {
  12.             System.err.println("Error processing message: " + e.getMessage());
  13.         }
  14.     }
  15. }
复制代码
通过以上步骤,可以在Spring Boot项目中乐成集成Kafka,并实现消息的生产和消耗。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表