ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Spring Boot项目集成Kafka
[打印本页]
作者:
反转基因福娃
时间:
2025-2-20 20:00
标题:
Spring Boot项目集成Kafka
在Spring Boot项目中集成Kafka,可以按照以下步骤进行:
1. 添加依赖
首先,在pom.xml文件中添加Kafka的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.0</version> <!-- 请根据需要选择合适的版本 -->
</dependency>
复制代码
2. 设置Kafka
在application.yml或application.properties文件中设置Kafka的相关属性:
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
复制代码
3. 创建Kafka生产者
创建一个Kafka生产者类,用于发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
复制代码
4. 创建Kafka消耗者
创建一个Kafka消耗者类,用于接收消息:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
复制代码
5. 测试
创建一个控制器或测试类,用于测试生产者和消耗者的功能:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("/send")
public String send(@RequestParam("message") String message) {
kafkaProducer.sendMessage("my-topic", message);
return "Message sent to the Kafka Topic";
}
}
复制代码
6. 启动应用
启动Spring Boot应用,访问/send接口发送消息,查看控制台输出,确认消息是否被乐成消耗。
7. 异常处置惩罚
为了更好地处置惩罚异常情况,可以在生产者和消耗者中添加异常处置惩罚逻辑:
生产者异常处置惩罚
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(new ProducerRecord<>(topic, message));
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
}
@Override
public void onFailure(Throwable ex) {
System.err.println("Unable to send message due to : " + ex.getMessage());
}
});
}
}
复制代码
消耗者异常处置惩罚
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record) {
try {
System.out.println("Received Message: " + record.value());
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
}
}
}
复制代码
通过以上步骤,可以在Spring Boot项目中乐成集成Kafka,并实现消息的生产和消耗。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4