Kafka 如何包管消息不丢失

锦通  金牌会员 | 2024-12-6 14:47:08 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 828|帖子 828|积分 2484

目录
Kafka 如何包管消息不丢失
一、生产者层面包管消息不丢失
(一)acks 参数设置
(二)重试机制
二、Kafka 集群层面包管消息不丢失
(一)副本机制
(二)ISR(In - Sync Replicas)机制
三、消耗者层面包管消息不丢失
(一)手动提交偏移量

在消息中心件领域,Kafka 是一款广泛使用的分布式消息体系,在众多大数据和实时数据处理场景中有着紧张职位。包管消息不丢失是 Kafka 可靠性的关键部门,这在很多对数据正确性要求高的业务场景中至关紧张,比如金融生意业务数据传输、日记收团体系等。下面我们来详细探究 Kafka 是如何做到这一点的。

一、生产者层面包管消息不丢失


(一)acks 参数设置


Kafka 生产者发送消息时,可以通过配置 acks 参数来控制消息的确认机制。



  • acks = 0:生产者在发送消息后不会等候任何来自 Kafka 集群的确认。这种模式下,消息可能在发送过程中丢失,比如网络问题导致消息根本没到达 Kafka 服务器,但它能提供最高的吞吐量,适用于对消息丢失不太敏感的场景。
  • acks = 1:生产者发送消息后,只要消息乐成写入 Kafka 分区的主副本(leader replica),就会收到确认。不过,假如在消息写入主副本后,但还没来得及同步到其他副本(follower replica)时主副本地点节点宕机,消息可能会丢失。示例代码(Java):

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "your_kafka_servers");
  3. props.put("acks", "1");
  4. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. Producer<String, String> producer = new KafkaProducer<>(props);
  7. ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "your_message");
  8. producer.send(record);
  9. producer.close();
复制代码



  • acks = -1(或 all):生产者会等候所有同步副本(in - sync replicas)都乐成写入消息后才收到确认。这是最安全的模式,能最大程度包管消息不丢失,但会影响吞吐量。

(二)重试机制


当消息发送失败时,生产者可以设置重试机制。在 Java 中,可以通过以下方式配置:

  1. props.put("retries", 3); // 设置重试次数为3次
复制代码

同时,可以联合自界说的错误处理逻辑,比方:

  1. producer.send(record, (metadata, exception) -> {
  2.     if (exception!= null) {
  3.         // 处理发送失败的逻辑,比如记录到日志
  4.         System.err.println("Message send failed: " + exception.getMessage());
  5.     }
  6. });
复制代码

二、Kafka 集群层面包管消息不丢失


(一)副本机制


Kafka 通过副本(replica)来实现数据冗余。每个分区(partition)可以有多个副本,其中一个是主副本(leader replica),其余是从副本(follower replica)。主副本负责处理读写请求,从副本则定期从主副本同步数据。当主副本不可用时,会从从副本中推举出新的主副本。



  • 在配置 Kafka 集群时,可以通过以下参数设置副本数目(假设使用 Kafka 的配置文件,以服务器端配置为例,可使用 Python 脚本等方式修改配置文件):

  1. # 假设配置文件名为server.properties
  2. with open('server.properties', 'r') as file:
  3.     lines = file.readlines()
  4. new_lines = []
  5. for line in lines:
  6.     if line.startswith('default.replication.factor'):
  7.         new_lines.append('default.replication.factor=3\n') # 设置副本数为3
  8.     else:
  9.         new_lines.append(line)
  10. with open('server.properties', 'w') as file:
  11.     file.writelines(new_lines)
复制代码

(二)ISR(In - Sync Replicas)机制


ISR 是与主副本保持同步的副本集合。只有在 ISR 中的副本都乐成写入消息后,生产者才会收到确认(当 acks=-1 或 all 时)。假如一个副本长时间未与主副本同步(可通过参数 replica.lag.time.max.ms 配置),它会被移出 ISR。

三、消耗者层面包管消息不丢失


(一)手动提交偏移量


Kafka 消耗者可以通过手动提交偏移量(offset)来精确控制消息的消耗进度。在消耗者乐成处理消息后,手动提交偏移量,确保消息不会被重复消耗或丢失。以下是 Java 示例:

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import java.time.Duration;
  5. import java.util.Collections;
  6. import java.util.Properties;
  7. public class KafkaConsumerExample {
  8.     public static void main(String[] args) {
  9.         Properties props = new Properties();
  10.         props.put("bootstrap.servers", "your_kafka_servers");
  11.         props.put("group.id", "your_consumer_group");
  12.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  15.         consumer.subscribe(Collections.singletonList("your_topic"));
  16.         try {
  17.             while (true) {
  18.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  19.                 for (ConsumerRecord<String, String> record : records) {
  20.                     // 处理消息的业务逻辑
  21.                     System.out.println("Received message: " + record.value());
  22.                 }
  23.                 // 手动提交偏移量
  24.                 consumer.commitSync();
  25.             }
  26.         } finally {
  27.             consumer.close();
  28.         }
  29.     }
  30. }
复制代码

通过以上在生产者、Kafka 集群和消耗者三个层面的机制,可以有用包管 Kafka 消息不丢失,确保整个消息传递体系的可靠性和数据完整性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表