目录
Kafka 如何包管消息不丢失
一、生产者层面包管消息不丢失
(一)acks 参数设置
(二)重试机制
二、Kafka 集群层面包管消息不丢失
(一)副本机制
(二)ISR(In - Sync Replicas)机制
三、消耗者层面包管消息不丢失
(一)手动提交偏移量
在消息中心件领域,Kafka 是一款广泛使用的分布式消息体系,在众多大数据和实时数据处理场景中有着紧张职位。包管消息不丢失是 Kafka 可靠性的关键部门,这在很多对数据正确性要求高的业务场景中至关紧张,比如金融生意业务数据传输、日记收团体系等。下面我们来详细探究 Kafka 是如何做到这一点的。
一、生产者层面包管消息不丢失
(一)acks 参数设置
Kafka 生产者发送消息时,可以通过配置 acks 参数来控制消息的确认机制。
- acks = 0:生产者在发送消息后不会等候任何来自 Kafka 集群的确认。这种模式下,消息可能在发送过程中丢失,比如网络问题导致消息根本没到达 Kafka 服务器,但它能提供最高的吞吐量,适用于对消息丢失不太敏感的场景。
- acks = 1:生产者发送消息后,只要消息乐成写入 Kafka 分区的主副本(leader replica),就会收到确认。不过,假如在消息写入主副本后,但还没来得及同步到其他副本(follower replica)时主副本地点节点宕机,消息可能会丢失。示例代码(Java):
- Properties props = new Properties();
- props.put("bootstrap.servers", "your_kafka_servers");
- props.put("acks", "1");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<>(props);
- ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "your_message");
- producer.send(record);
- producer.close();
复制代码
- acks = -1(或 all):生产者会等候所有同步副本(in - sync replicas)都乐成写入消息后才收到确认。这是最安全的模式,能最大程度包管消息不丢失,但会影响吞吐量。
(二)重试机制
当消息发送失败时,生产者可以设置重试机制。在 Java 中,可以通过以下方式配置:
- props.put("retries", 3); // 设置重试次数为3次
复制代码
同时,可以联合自界说的错误处理逻辑,比方:
- producer.send(record, (metadata, exception) -> {
- if (exception!= null) {
- // 处理发送失败的逻辑,比如记录到日志
- System.err.println("Message send failed: " + exception.getMessage());
- }
- });
复制代码
二、Kafka 集群层面包管消息不丢失
(一)副本机制
Kafka 通过副本(replica)来实现数据冗余。每个分区(partition)可以有多个副本,其中一个是主副本(leader replica),其余是从副本(follower replica)。主副本负责处理读写请求,从副本则定期从主副本同步数据。当主副本不可用时,会从从副本中推举出新的主副本。
- 在配置 Kafka 集群时,可以通过以下参数设置副本数目(假设使用 Kafka 的配置文件,以服务器端配置为例,可使用 Python 脚本等方式修改配置文件):
- # 假设配置文件名为server.properties
- with open('server.properties', 'r') as file:
- lines = file.readlines()
- new_lines = []
- for line in lines:
- if line.startswith('default.replication.factor'):
- new_lines.append('default.replication.factor=3\n') # 设置副本数为3
- else:
- new_lines.append(line)
- with open('server.properties', 'w') as file:
- file.writelines(new_lines)
复制代码
(二)ISR(In - Sync Replicas)机制
ISR 是与主副本保持同步的副本集合。只有在 ISR 中的副本都乐成写入消息后,生产者才会收到确认(当 acks=-1 或 all 时)。假如一个副本长时间未与主副本同步(可通过参数 replica.lag.time.max.ms 配置),它会被移出 ISR。
三、消耗者层面包管消息不丢失
(一)手动提交偏移量
Kafka 消耗者可以通过手动提交偏移量(offset)来精确控制消息的消耗进度。在消耗者乐成处理消息后,手动提交偏移量,确保消息不会被重复消耗或丢失。以下是 Java 示例:
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
- public class KafkaConsumerExample {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "your_kafka_servers");
- props.put("group.id", "your_consumer_group");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Collections.singletonList("your_topic"));
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- // 处理消息的业务逻辑
- System.out.println("Received message: " + record.value());
- }
- // 手动提交偏移量
- consumer.commitSync();
- }
- } finally {
- consumer.close();
- }
- }
- }
复制代码
通过以上在生产者、Kafka 集群和消耗者三个层面的机制,可以有用包管 Kafka 消息不丢失,确保整个消息传递体系的可靠性和数据完整性。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |