金歌 发表于 2025-4-12 19:20:42

Kafka 的 ACK 机制详解

1.ACK 机制的基本概念

ACK(Acknowledgment)即确认机制,是 Kafka 生产者和 Broker 之间的一种消息确认约定。生产者在发送消息后,需要等候 Broker 端简直认信号,以此判定消息是否成功发送。这一机制主要是为了包管消息不会由于网络问题或 Broker 故障而丢失,确保数据的可靠性。
2.ACK 的级别

acks = 0:生产者发送消息后,不需要等候 Broker 的任何确认,直接继续发送下一条消息。这种模式下,消息发送的吞吐量最高,但可靠性最低。由于假如在消息发送过程中发生网络问题,生产者并不知道消息是否成功到达 Broker,消息很可能会丢失。适用于对消息可靠性要求不高,且追求极致性能的场景,比如一些日志数据的采集。
acks = 1:生产者发送消息后,只要 Leader 副本成功接收消息,就会向生产者返回确认信号。这种模式下,消息的可靠性得到了肯定程度的保障,只要 Leader 副本存活,消息就不会丢失。不过,假如在 Leader 副本将消息写入日志后,还未来得及将消息同步给 Follower 副本时,Leader 发生故障,那么这条消息就会丢失。适用于对消息可靠性有肯定要求,但又希望有较高吞吐量的场景,比如一些实时监控数据的传输。
acks = -1(或 acks = all):生产者发送消息后,需要等候 ISR(In-Sync Replicas,同步副本聚集)中的所有副本都成功接收消息,才会收到确认信号。这种模式下,消息的可靠性最高,几乎不会丢失。但由于需要等候所有 ISR 副本简直认,消息发送的耽误会增加,吞吐量也会相应降低。适用于对消息可靠性要求极高的场景,比如金融交易数据的传输。
3.影响 ACK 机制的因素

ISR 聚集:ISR 聚集中的副本是与 Leader 副本保持同步的 Follower 副本。当 Leader 发生故障时,会从 ISR 聚集中选举出新的 Leader。假如 ISR 聚集中只有一个副本(即 Leader 本身),那么 acks = -1 就等同于 acks = 1,由于只有 Leader 成功接收消息就会返回确认。
Broker 的配置:Broker 的一些配置参数,如replica.lag.time.max.ms(Follower 副本与 Leader 副本的最大耽误时间)会影响 ISR 聚集的成员。假如某个 Follower 副本的耽误超过这个时间,就会被移出 ISR 聚集,这可能会改变消息确认的条件,进而影响 ACK 机制。
4.实际应用中的留意事项

性能与可靠性的平衡:在选择 ACK 级别时,需要根据业务场景,衡量消息的可靠性和体系的性能。假如对可靠性要求高,就选择 acks = -1;假如对性能要求高,且能接受肯定的消息丢失,可选择 acks = 0 或 acks = 1。
监控与调优:要实时监控 Kafka 集群的状态,包括 ISR 聚集的变革、消息的发送耽误和吞吐量等。根据监控数据,对 Kafka 的配置参数举行调优,以达到最佳的性能和可靠性。
5.示例代码

下面是利用 Java 编写的 Kafka 生产者设置不同 ACK 级别的示例代码,假设已经导入了必要的 Kafka 依赖。
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
      // Kafka集群地址
      String bootstrapServers = "localhost:9092";
      // 主题名称
      String topic = "test-topic";

      // 配置生产者属性
      Properties props = new Properties();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

      // 设置acks = 0
      props.put(ProducerConfig.ACKS_CONFIG, "0");
      Producer<String, String> producer = new KafkaProducer<>(props);
      sendMessage(producer, topic, "Message with acks = 0");
      producer.close();

      // 设置acks = 1
      props.put(ProducerConfig.ACKS_CONFIG, "1");
      producer = new KafkaProducer<>(props);
      sendMessage(producer, topic, "Message with acks = 1");
      producer.close();

      // 设置acks = -1 (或acks = all)
      props.put(ProducerConfig.ACKS_CONFIG, "-1");
      producer = new KafkaProducer<>(props);
      sendMessage(producer, topic, "Message with acks = -1");
      producer.close();
    }

    private static void sendMessage(Producer<String, String> producer, String topic, String message) {
      ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
      producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception!= null) {
                  System.err.println("Failed to send message: " + exception.getMessage());
                } else {
                  System.out.println("Message sent successfully: " + metadata);
                }
            }
      });
    }
}
上述代码中,通过设置ProducerConfig.ACKS_CONFIG属性来配置不同的 ACK 级别,然后分别发送消息,并在发送消息时设置了回调函数,用于处理消息发送成功或失败的环境。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka 的 ACK 机制详解