Kafka 消息不丢失:全方位保障策略

打印 上一主题 下一主题

主题 1014|帖子 1014|积分 3042


Kafka 消息不丢失:全方位保障策略

引言

在现代分布式体系中,Kafka 作为一款高性能、高可扩展性的消息队列,被广泛应用于数据传输、日志收集、实时流处置惩罚等场景。然而,消息丢失是使用 Kafka 时大概面临的一个严重标题,这大概会导致数据不同等、业务逻辑错误等后果。因此,确保 Kafka 消息不丢失至关紧张。本文将从生产者、Broker 和消耗者三个层面详细先容保障 Kafka 消息不丢失的方法。
生产者层面保障

确认机制(acks)

生产者在发送消息时,acks 参数决定了消息简直认机制,它有三个可选值:


  • acks = 0:生产者发送消息后,不等待 Broker 简直认,直接认为消息发送成功。这种方式虽然吞吐量最高,但存在消息丢失的风险。由于如果消息在传输过程中丢失,生产者无法得知。
  • acks = 1:生产者发送消息后,等待 Leader 分区简直认。只要 Leader 分区接收到消息并写入当地日志,就会给生产者返回确认信息。此方式能保证消息在 Leader 分区不丢失,但如果 Leader 分区在将消息同步到 Follower 分区之前发生故障,大概会导致消息丢失。
  • acks = all 或 -1:生产者发送消息后,等待 Leader 分区和所有 ISR(In - Sync Replicas,同步副本聚集)中的 Follower 分区都确认收到消息后,才认为消息发送成功。这种方式能最大水平保证消息不丢失,但会降低吞吐量。
以下是 Java 代码示例:
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class KafkaProducerExample {
  4.     public static void main(String[] args) {
  5.         Properties props = new Properties();
  6.         props.put("bootstrap.servers", "localhost:9092");
  7.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  9.         // 设置 acks 为 all
  10.         props.put("acks", "all");
  11.         Producer<String, String> producer = new KafkaProducer<>(props);
  12.         ProducerRecord<String, String> record = new ProducerRecord<>("test - topic", "key", "value");
  13.         producer.send(record, new Callback() {
  14.             @Override
  15.             public void onCompletion(RecordMetadata metadata, Exception exception) {
  16.                 if (exception != null) {
  17.                     System.err.println("Failed to send message: " + exception.getMessage());
  18.                 } else {
  19.                     System.out.println("Message sent successfully. Offset: " + metadata.offset());
  20.                 }
  21.             }
  22.         });
  23.         producer.close();
  24.     }
  25. }
复制代码
重试机制

生产者可以设置 retries 参数来指定消息发送失败时的重试次数。当消息发送失败时,生产者会自动进行重试,直到达到重试次数上限。示例代码如下:
  1. props.put("retries", 3);
复制代码
Broker 层面保障

多副本机制

Kafka 通过多副本机制保证消息的可靠性。每个分区可以有多个副本,其中一个是 Leader 副本,其余是 Follower 副本。生产者和消耗者只与 Leader 副本进行交互,Follower 副本会从 Leader 副本同步消息。
通过设置 replication.factor 参数来指定每个分区的副本数,建议将其设置为大于 1 的值,比方在 server.properties 中设置:
  1. default.replication.factor = 3
复制代码
最小同步副本数(min.insync.replicas)

设置 min.insync.replicas 参数可以指定 ISR 聚集中至少需要多少个副本同步消息,生产者才能认为消息发送成功。比方:
  1. min.insync.replicas = 2
复制代码
当 acks = all 时,只有当 ISR 聚集中至少有 min.insync.replicas 个副本同步了消息,生产者才会收到确认信息。
刷盘策略

可以通过调整 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制消息刷盘的频率。比方:
  1. # 每 10000 条消息刷一次盘
  2. log.flush.interval.messages = 10000
  3. # 每 1000 毫秒刷一次盘
  4. log.flush.interval.ms = 1000
复制代码
消耗者层面保障

手动提交偏移量

消耗者可以通过设置 enable.auto.commit 为 false 来关闭自动提交偏移量功能,然后手动提交偏移量。只有在消息处置惩罚完成后,才提交偏移量,这样可以避免在消息处置惩罚过程中出现非常导致消息丢失。
以下是 Java 代码示例:
  1. import org.apache.kafka.clients.consumer.*;
  2. import java.time.Duration;
  3. import java.util.Collections;
  4. import java.util.Properties;
  5. public class KafkaConsumerExample {
  6.     public static void main(String[] args) {
  7.         Properties props = new Properties();
  8.         props.put("bootstrap.servers", "localhost:9092");
  9.         props.put("group.id", "test - group");
  10.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  12.         // 关闭自动提交偏移量
  13.         props.put("enable.auto.commit", "false");
  14.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  15.         consumer.subscribe(Collections.singletonList("test - topic"));
  16.         try {
  17.             while (true) {
  18.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  19.                 for (ConsumerRecord<String, String> record : records) {
  20.                     System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());
  21.                     // 处理消息
  22.                 }
  23.                 // 手动提交偏移量
  24.                 consumer.commitSync();
  25.             }
  26.         } finally {
  27.             consumer.close();
  28.         }
  29.     }
  30. }
复制代码
处置惩罚消耗非常

在消耗者处置惩罚消息的过程中,需要捕获并处置惩罚大概出现的非常,确保在非常环境下不会丢失消息。比方,在处置惩罚消息时可以使用事务机制,保证消息处置惩罚的原子性。
总结

要确保 Kafka 消息不丢失,需要从生产者、Broker 和消耗者三个层面进行综合思量和设置。生产者通过合理设置确认机制和重试机制,Broker 使用多副本、最小同步副本数和刷盘策略,消耗者接纳手动提交偏移量和非常处置惩罚等方法,全方位保障消息的可靠传输。在实际应用中,需要根据详细的业务场景和性能要求,机动调整这些设置,以达到消息可靠性和体系性能的平衡。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

火影

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表