ToB企服应用市场:ToB评测及商务社交产业平台

标题: kafka怎样包管消息消费的顺序性 [打印本页]

作者: 农民    时间: 2025-1-10 06:09
标题: kafka怎样包管消息消费的顺序性

一、Kafka 消息顺序性的根本概念

1.1 什么是消息顺序性?

消息顺序性是指在生产者将消息发送到 Kafka 后,消费者按照消息发送的顺序进行消费。

1.2 Kafka 的消息顺序性特点



二、Kafka 消息顺序性的挑衅


三、Kafka 包管消息顺序性的办理方案

3.1 分区级顺序性

Kafka 原生支持分区级顺序性,具体方法:

实现方法
3.2 消费者串行处置惩罚


3.3 分区与业务绑定

将业务 ID 或某些属性作为 Key,确保同一业务的消息落在同一分区,从而包管业务内的顺序性。
3.4 消费失败的处置惩罚策略



四、Kafka 消息顺序性实践

以下通过一个示例展示怎样在生产者和消费者中实现分区级的消息顺序消费。
4.1 项目依赖

在 pom.xml 中添加 Kafka 依赖:
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4.     <version>3.5.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.springframework.kafka</groupId>
  8.     <artifactId>spring-kafka</artifactId>
  9.     <version>3.0.0</version>
  10. </dependency>
复制代码

4.2 设置 Kafka

1. Kafka 设置文件

在 application.yml 中设置 Kafka:
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: localhost:9092
  4.     consumer:
  5.       group-id: test-group
  6.       enable-auto-commit: false
  7.       auto-offset-reset: earliest
  8.     producer:
  9.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  10.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.     consumer:
  12.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  13.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码

4.3 生产者实现

通过指定消息的 Key,确保同一类型的消息进入同一分区。
  1. package com.example.kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.springframework.stereotype.Service;
  5. import java.util.Properties;
  6. @Service
  7. public class KafkaMessageProducer {
  8.     private final KafkaProducer<String, String> producer;
  9.     public KafkaMessageProducer() {
  10.         Properties props = new Properties();
  11.         props.put("bootstrap.servers", "localhost:9092");
  12.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  13.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  14.         this.producer = new KafkaProducer<>(props);
  15.     }
  16.     public void sendMessage(String topic, String key, String value) {
  17.         ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
  18.         producer.send(record, (metadata, exception) -> {
  19.             if (exception != null) {
  20.                 System.err.println("Message failed to send: " + exception.getMessage());
  21.             } else {
  22.                 System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
  23.             }
  24.         });
  25.     }
  26. }
复制代码

4.4 消费者实现

通过 @KafkaListener 确保一个分区内只有一个消费者。
  1. package com.example.kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class KafkaMessageConsumer {
  7.     @KafkaListener(topics = "test-topic", groupId = "test-group", concurrency = "1")
  8.     public void consumeMessage(ConsumerRecord<String, String> record) {
  9.         System.out.println("Consumed message: Key=" + record.key() + ", Value=" + record.value()
  10.                 + ", Partition=" + record.partition() + ", Offset=" + record.offset());
  11.         // 模拟顺序处理
  12.         processMessage(record.value());
  13.     }
  14.     private void processMessage(String message) {
  15.         System.out.println("Processing message: " + message);
  16.     }
  17. }
复制代码

4.5 测试

1. 发送测试消息

调用生产者发送消息:
  1. package com.example.kafka;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.CommandLineRunner;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class KafkaTestRunner implements CommandLineRunner {
  7.     @Autowired
  8.     private KafkaMessageProducer producer;
  9.     @Override
  10.     public void run(String... args) throws Exception {
  11.         for (int i = 1; i <= 10; i++) {
  12.             String key = "user" + (i % 3); // 模拟将相同用户的消息分配到同一分区
  13.             String message = "Message " + i;
  14.             producer.sendMessage("test-topic", key, message);
  15.         }
  16.     }
  17. }
复制代码
2. 观察消费顺序

运行消费者,输出:
  1. Consumed message: Key=user0, Value=Message 3, Partition=2, Offset=0
  2. Processing message: Message 3
  3. Consumed message: Key=user1, Value=Message 1, Partition=1, Offset=0
  4. Processing message: Message 1
  5. Consumed message: Key=user2, Value=Message 2, Partition=0, Offset=0
  6. Processing message: Message 2
  7. ...
复制代码
通过 Key 包管了相同用户的消息进入同一分区,而且按顺序消费。

五、Kafka 消息顺序性的留意事项


六、总结

Kafka 默认包管分区内的消息顺序,但无法实现全局顺序性。通过合理设置分区策略、确保单分区消费者处置惩罚,以及计划幂等性操作,可以有效实现业务上的消息顺序消费。本文通过 Java 示例展示了怎样利用分区和 Key 来实现分区级的顺序消费。在实际项目中,应结合业务需求权衡性能温顺序性,计划出最优的办理方案。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4