Kafka 如何包管消息不丢失
目录Kafka 如何包管消息不丢失
一、生产者层面包管消息不丢失
(一)acks 参数设置
(二)重试机制
二、Kafka 集群层面包管消息不丢失
(一)副本机制
(二)ISR(In - Sync Replicas)机制
三、消耗者层面包管消息不丢失
(一)手动提交偏移量
在消息中心件领域,Kafka 是一款广泛使用的分布式消息体系,在众多大数据和实时数据处理场景中有着紧张职位。包管消息不丢失是 Kafka 可靠性的关键部门,这在很多对数据正确性要求高的业务场景中至关紧张,比如金融生意业务数据传输、日记收团体系等。下面我们来详细探究 Kafka 是如何做到这一点的。
一、生产者层面包管消息不丢失
(一)acks 参数设置
Kafka 生产者发送消息时,可以通过配置 acks 参数来控制消息的确认机制。
[*]acks = 0:生产者在发送消息后不会等候任何来自 Kafka 集群的确认。这种模式下,消息可能在发送过程中丢失,比如网络问题导致消息根本没到达 Kafka 服务器,但它能提供最高的吞吐量,适用于对消息丢失不太敏感的场景。
[*]acks = 1:生产者发送消息后,只要消息乐成写入 Kafka 分区的主副本(leader replica),就会收到确认。不过,假如在消息写入主副本后,但还没来得及同步到其他副本(follower replica)时主副本地点节点宕机,消息可能会丢失。示例代码(Java):
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_servers");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("your_topic", "your_message");
producer.send(record);
producer.close();
[*]acks = -1(或 all):生产者会等候所有同步副本(in - sync replicas)都乐成写入消息后才收到确认。这是最安全的模式,能最大程度包管消息不丢失,但会影响吞吐量。
(二)重试机制
当消息发送失败时,生产者可以设置重试机制。在 Java 中,可以通过以下方式配置:
props.put("retries", 3); // 设置重试次数为3次
同时,可以联合自界说的错误处理逻辑,比方:
producer.send(record, (metadata, exception) -> {
if (exception!= null) {
// 处理发送失败的逻辑,比如记录到日志
System.err.println("Message send failed: " + exception.getMessage());
}
});
二、Kafka 集群层面包管消息不丢失
(一)副本机制
Kafka 通过副本(replica)来实现数据冗余。每个分区(partition)可以有多个副本,其中一个是主副本(leader replica),其余是从副本(follower replica)。主副本负责处理读写请求,从副本则定期从主副本同步数据。当主副本不可用时,会从从副本中推举出新的主副本。
[*]在配置 Kafka 集群时,可以通过以下参数设置副本数目(假设使用 Kafka 的配置文件,以服务器端配置为例,可使用 Python 脚本等方式修改配置文件):
# 假设配置文件名为server.properties
with open('server.properties', 'r') as file:
lines = file.readlines()
new_lines = []
for line in lines:
if line.startswith('default.replication.factor'):
new_lines.append('default.replication.factor=3\n') # 设置副本数为3
else:
new_lines.append(line)
with open('server.properties', 'w') as file:
file.writelines(new_lines)
(二)ISR(In - Sync Replicas)机制
ISR 是与主副本保持同步的副本集合。只有在 ISR 中的副本都乐成写入消息后,生产者才会收到确认(当 acks=-1 或 all 时)。假如一个副本长时间未与主副本同步(可通过参数 replica.lag.time.max.ms 配置),它会被移出 ISR。
三、消耗者层面包管消息不丢失
(一)手动提交偏移量
Kafka 消耗者可以通过手动提交偏移量(offset)来精确控制消息的消耗进度。在消耗者乐成处理消息后,手动提交偏移量,确保消息不会被重复消耗或丢失。以下是 Java 示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_servers");
props.put("group.id", "your_consumer_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("your_topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息的业务逻辑
System.out.println("Received message: " + record.value());
}
// 手动提交偏移量
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
通过以上在生产者、Kafka 集群和消耗者三个层面的机制,可以有用包管 Kafka 消息不丢失,确保整个消息传递体系的可靠性和数据完整性。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]