Kafka 的 ACK 机制详解

金歌  论坛元老 | 2025-4-12 19:20:42 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1694|帖子 1694|积分 5082

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1.ACK 机制的基本概念

ACK(Acknowledgment)即确认机制,是 Kafka 生产者和 Broker 之间的一种消息确认约定。生产者在发送消息后,需要等候 Broker 端简直认信号,以此判定消息是否成功发送。这一机制主要是为了包管消息不会由于网络问题或 Broker 故障而丢失,确保数据的可靠性。
2.ACK 的级别

acks = 0:生产者发送消息后,不需要等候 Broker 的任何确认,直接继续发送下一条消息。这种模式下,消息发送的吞吐量最高,但可靠性最低。由于假如在消息发送过程中发生网络问题,生产者并不知道消息是否成功到达 Broker,消息很可能会丢失。适用于对消息可靠性要求不高,且追求极致性能的场景,比如一些日志数据的采集。
acks = 1:生产者发送消息后,只要 Leader 副本成功接收消息,就会向生产者返回确认信号。这种模式下,消息的可靠性得到了肯定程度的保障,只要 Leader 副本存活,消息就不会丢失。不过,假如在 Leader 副本将消息写入日志后,还未来得及将消息同步给 Follower 副本时,Leader 发生故障,那么这条消息就会丢失。适用于对消息可靠性有肯定要求,但又希望有较高吞吐量的场景,比如一些实时监控数据的传输。
acks = -1(或 acks = all):生产者发送消息后,需要等候 ISR(In-Sync Replicas,同步副本聚集)中的所有副本都成功接收消息,才会收到确认信号。这种模式下,消息的可靠性最高,几乎不会丢失。但由于需要等候所有 ISR 副本简直认,消息发送的耽误会增加,吞吐量也会相应降低。适用于对消息可靠性要求极高的场景,比如金融交易数据的传输。
3.影响 ACK 机制的因素

ISR 聚集:ISR 聚集中的副本是与 Leader 副本保持同步的 Follower 副本。当 Leader 发生故障时,会从 ISR 聚集中选举出新的 Leader。假如 ISR 聚集中只有一个副本(即 Leader 本身),那么 acks = -1 就等同于 acks = 1,由于只有 Leader 成功接收消息就会返回确认。
Broker 的配置:Broker 的一些配置参数,如replica.lag.time.max.ms(Follower 副本与 Leader 副本的最大耽误时间)会影响 ISR 聚集的成员。假如某个 Follower 副本的耽误超过这个时间,就会被移出 ISR 聚集,这可能会改变消息确认的条件,进而影响 ACK 机制。
4.实际应用中的留意事项

性能与可靠性的平衡:在选择 ACK 级别时,需要根据业务场景,衡量消息的可靠性和体系的性能。假如对可靠性要求高,就选择 acks = -1;假如对性能要求高,且能接受肯定的消息丢失,可选择 acks = 0 或 acks = 1。
监控与调优:要实时监控 Kafka 集群的状态,包括 ISR 聚集的变革、消息的发送耽误和吞吐量等。根据监控数据,对 Kafka 的配置参数举行调优,以达到最佳的性能和可靠性。
5.示例代码

下面是利用 Java 编写的 Kafka 生产者设置不同 ACK 级别的示例代码,假设已经导入了必要的 Kafka 依赖。
  1. import org.apache.kafka.clients.producer.*;
  2. import java.util.Properties;
  3. public class KafkaProducerExample {
  4.     public static void main(String[] args) {
  5.         // Kafka集群地址
  6.         String bootstrapServers = "localhost:9092";
  7.         // 主题名称
  8.         String topic = "test-topic";
  9.         // 配置生产者属性
  10.         Properties props = new Properties();
  11.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  12.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  14.         // 设置acks = 0
  15.         props.put(ProducerConfig.ACKS_CONFIG, "0");
  16.         Producer<String, String> producer = new KafkaProducer<>(props);
  17.         sendMessage(producer, topic, "Message with acks = 0");
  18.         producer.close();
  19.         // 设置acks = 1
  20.         props.put(ProducerConfig.ACKS_CONFIG, "1");
  21.         producer = new KafkaProducer<>(props);
  22.         sendMessage(producer, topic, "Message with acks = 1");
  23.         producer.close();
  24.         // 设置acks = -1 (或acks = all)
  25.         props.put(ProducerConfig.ACKS_CONFIG, "-1");
  26.         producer = new KafkaProducer<>(props);
  27.         sendMessage(producer, topic, "Message with acks = -1");
  28.         producer.close();
  29.     }
  30.     private static void sendMessage(Producer<String, String> producer, String topic, String message) {
  31.         ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
  32.         producer.send(record, new Callback() {
  33.             @Override
  34.             public void onCompletion(RecordMetadata metadata, Exception exception) {
  35.                 if (exception!= null) {
  36.                     System.err.println("Failed to send message: " + exception.getMessage());
  37.                 } else {
  38.                     System.out.println("Message sent successfully: " + metadata);
  39.                 }
  40.             }
  41.         });
  42.     }
  43. }
复制代码
上述代码中,通过设置ProducerConfig.ACKS_CONFIG属性来配置不同的 ACK 级别,然后分别发送消息,并在发送消息时设置了回调函数,用于处理消息发送成功或失败的环境。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金歌

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表