3.7 Spring Boot整合Kafka:消息次序性与消耗幂等性保障

打印 上一主题 下一主题

主题 1599|帖子 1599|积分 4797

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
在Spring Boot中整合Kafka并保障消息次序性与消耗幂等性,可以通过以下步骤实现:
一、消息次序性保障

1. 生产者设置



  • 相同Key写入同一分区:Kafka保证同一分区内消息的次序性,生产者发送消息时指定相同Key,确保相关消息进入同一分区。
    1. [/code] java
    2. [code]@Autowired
    3. private KafkaTemplate<String, String> kafkaTemplate;
    4. public void sendMessage(String key, String message) {
    5.     kafkaTemplate.send("my-topic", key, message);
    6. }
    复制代码
  • 设置重试与飞行哀求:防止网络重试导致消息乱序。
    1. [/code] properties
    2. [code]spring.kafka.producer.properties.retries=3
    3. spring.kafka.producer.properties.max.in.flight.requests.per.connection=1
    复制代码
2. 消耗者设置



  • 单线程按分区消耗:确保每个分区由单独线程处理,避免并发消耗同一分区。
    1. [/code] java
    2. [code]@KafkaListener(topics = "my-topic", concurrency = "3") // 与分区数一致
    3. public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
    4.     processOrderly(record.value());
    5.     ack.acknowledge(); // 手动提交偏移量
    6. }
    复制代码
  • 设置手动提交偏移量:处理完成后提交,避免消息丢失或重复。
    1. [/code] properties
    2. [code]spring.kafka.consumer.enable-auto-commit=false
    3. spring.kafka.listener.ack-mode=manual
    复制代码
二、消耗幂等性保障

1. 生产者端幂等性

启用Kafka生产者幂等性,防止网络重试导致消息重复:
  1. [/code] properties
  2. [code]spring.kafka.producer.enable-idempotence=true
复制代码
2. 消耗者端幂等性处理



  • 唯一标识检查:利用业务唯一标识(如订单ID)进行重复判定。
    1. [/code] java
    2. [code]@KafkaListener(topics = "order-topic")
    3. public void processOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {
    4.     String orderId = record.value().getId();
    5.     if (orderService.isOrderProcessed(orderId)) {
    6.         ack.acknowledge();
    7.         return;
    8.     }
    9.     orderService.saveOrder(record.value());
    10.     ack.acknowledge();
    11. }
    复制代码
  • 数据库唯一束缚:通过数据库唯一索引或插入前检查实现。
    1. [/code] sql
    2. [code]CREATE TABLE orders (
    3.     id VARCHAR(50) PRIMARY KEY,
    4.     -- 其他字段
    5. );
    复制代码
三、完备设置示例

1. 依赖引入

  1. [/code] xml
  2. [code]<dependency>
  3.     <groupId>org.springframework.kafka</groupId>
  4.     <artifactId>spring-kafka</artifactId>
  5. </dependency>
复制代码
2. 生产者设置(application.yml)

  1. [/code] yaml
  2. [code]spring:
  3.   kafka:
  4.     bootstrap-servers: localhost:9092
  5.     producer:
  6.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  8.       enable-idempotence: true
  9.       properties:
  10.         max.in.flight.requests.per.connection: 1
  11.         retries: 3
复制代码
3. 消耗者设置(application.yml)

  1. [/code] yaml
  2. [code]spring:
  3.   kafka:
  4.     bootstrap-servers: localhost:9092
  5.     consumer:
  6.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  8.       group-id: my-group
  9.       enable-auto-commit: false
  10.       auto-offset-reset: earliest
  11.     listener:
  12.       ack-mode: manual
  13.       concurrency: 3
复制代码
四、非常处理与优化



  • 消耗者重试计谋:使用RetryTemplate处理瞬时故障。
    1. [/code] java
    2. [code]@Bean
    3. public RetryTemplate retryTemplate() {
    4.     RetryTemplate retryTemplate = new RetryTemplate();
    5.     retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
    6.     return retryTemplate;
    7. }
    复制代码
  • 死信队列(DLQ)​:处理多次重试失败的消息。
    1. [/code] java
    2. [code]@KafkaListener(topics = "my-topic")
    3. @RetryableTopic(attempts = "3", dltTopicSuffix = "-dlt")
    4. public void handleMessage(ConsumerRecord<String, String> record) {
    5.     // 业务处理
    6. }
    复制代码
五、测试验证


  • 次序性测试:发送一连消息(相同Key),观察消耗次序是否一致。
  • 幂等性测试:重复发送相同消息,检查是否仅处理一次。
通过以上步骤,Spring Boot应用能够确保Kafka消息的次序性和消耗的幂等性,实用于订单处理、状态更新等场景。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

水军大提督

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表