在Spring Boot项目中集成Kafka,可以按照以下步骤进行:
1. 添加依赖
首先,在pom.xml文件中添加Kafka的依赖:
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>2.8.0</version> <!-- 请根据需要选择合适的版本 -->
- </dependency>
复制代码 2. 设置Kafka
在application.yml或application.properties文件中设置Kafka的相关属性:
application.yml
- spring:
- kafka:
- bootstrap-servers: localhost:9092
- consumer:
- group-id: my-group
- auto-offset-reset: earliest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码 application.properties
- spring.kafka.bootstrap-servers=localhost:9092
- spring.kafka.consumer.group-id=my-group
- spring.kafka.consumer.auto-offset-reset=earliest
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
复制代码 3. 创建Kafka生产者
创建一个Kafka生产者类,用于发送消息:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- @Service
- public class KafkaProducer {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- public void sendMessage(String topic, String message) {
- kafkaTemplate.send(topic, message);
- }
- }
复制代码 4. 创建Kafka消耗者
创建一个Kafka消耗者类,用于接收消息:
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Service;
- @Service
- public class KafkaConsumer {
- @KafkaListener(topics = "my-topic", groupId = "my-group")
- public void listen(String message) {
- System.out.println("Received Message: " + message);
- }
- }
复制代码 5. 测试
创建一个控制器或测试类,用于测试生产者和消耗者的功能:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
- @RestController
- public class KafkaController {
- @Autowired
- private KafkaProducer kafkaProducer;
- @GetMapping("/send")
- public String send(@RequestParam("message") String message) {
- kafkaProducer.sendMessage("my-topic", message);
- return "Message sent to the Kafka Topic";
- }
- }
复制代码 6. 启动应用
启动Spring Boot应用,访问/send接口发送消息,查看控制台输出,确认消息是否被乐成消耗。
7. 异常处置惩罚
为了更好地处置惩罚异常情况,可以在生产者和消耗者中添加异常处置惩罚逻辑:
生产者异常处置惩罚
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.util.concurrent.ListenableFuture;
- import org.springframework.util.concurrent.ListenableFutureCallback;
- @Service
- public class KafkaProducer {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- public void sendMessage(String topic, String message) {
- ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(new ProducerRecord<>(topic, message));
- future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
- @Override
- public void onSuccess(SendResult<String, String> result) {
- RecordMetadata metadata = result.getRecordMetadata();
- System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
- }
- @Override
- public void onFailure(Throwable ex) {
- System.err.println("Unable to send message due to : " + ex.getMessage());
- }
- });
- }
- }
复制代码 消耗者异常处置惩罚
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.listener.ListenerExecutionFailedException;
- import org.springframework.stereotype.Service;
- @Service
- public class KafkaConsumer {
- @KafkaListener(topics = "my-topic", groupId = "my-group")
- public void listen(ConsumerRecord<String, String> record) {
- try {
- System.out.println("Received Message: " + record.value());
- } catch (Exception e) {
- System.err.println("Error processing message: " + e.getMessage());
- }
- }
- }
复制代码 通过以上步骤,可以在Spring Boot项目中乐成集成Kafka,并实现消息的生产和消耗。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |