3.7 Spring Boot整合Kafka:消息次序性与消耗幂等性保障
在Spring Boot中整合Kafka并保障消息次序性与消耗幂等性,可以通过以下步骤实现:一、消息次序性保障
1. 生产者设置
[*]相同Key写入同一分区:Kafka保证同一分区内消息的次序性,生产者发送消息时指定相同Key,确保相关消息进入同一分区。 java
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String key, String message) {
kafkaTemplate.send("my-topic", key, message);
}
[*]设置重试与飞行哀求:防止网络重试导致消息乱序。 properties
spring.kafka.producer.properties.retries=3
spring.kafka.producer.properties.max.in.flight.requests.per.connection=1
2. 消耗者设置
[*]单线程按分区消耗:确保每个分区由单独线程处理,避免并发消耗同一分区。 java
@KafkaListener(topics = "my-topic", concurrency = "3") // 与分区数一致
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
processOrderly(record.value());
ack.acknowledge(); // 手动提交偏移量
}
[*]设置手动提交偏移量:处理完成后提交,避免消息丢失或重复。 properties
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
二、消耗幂等性保障
1. 生产者端幂等性
启用Kafka生产者幂等性,防止网络重试导致消息重复:
properties
spring.kafka.producer.enable-idempotence=true 2. 消耗者端幂等性处理
[*]唯一标识检查:利用业务唯一标识(如订单ID)进行重复判定。 java
@KafkaListener(topics = "order-topic")
public void processOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {
String orderId = record.value().getId();
if (orderService.isOrderProcessed(orderId)) {
ack.acknowledge();
return;
}
orderService.saveOrder(record.value());
ack.acknowledge();
}
[*]数据库唯一束缚:通过数据库唯一索引或插入前检查实现。 sql
CREATE TABLE orders (
id VARCHAR(50) PRIMARY KEY,
-- 其他字段
);
三、完备设置示例
1. 依赖引入
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency> 2. 生产者设置(application.yml)
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
enable-idempotence: true
properties:
max.in.flight.requests.per.connection: 1
retries: 3 3. 消耗者设置(application.yml)
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: my-group
enable-auto-commit: false
auto-offset-reset: earliest
listener:
ack-mode: manual
concurrency: 3 四、非常处理与优化
[*]消耗者重试计谋:使用RetryTemplate处理瞬时故障。 java
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
return retryTemplate;
}
[*]死信队列(DLQ):处理多次重试失败的消息。 java
@KafkaListener(topics = "my-topic")
@RetryableTopic(attempts = "3", dltTopicSuffix = "-dlt")
public void handleMessage(ConsumerRecord<String, String> record) {
// 业务处理
}
五、测试验证
[*]次序性测试:发送一连消息(相同Key),观察消耗次序是否一致。
[*]幂等性测试:重复发送相同消息,检查是否仅处理一次。
通过以上步骤,Spring Boot应用能够确保Kafka消息的次序性和消耗的幂等性,实用于订单处理、状态更新等场景。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]