Kafka消息丢失全解析!原因、预防与解决方案

打印 上一主题 下一主题

主题 1556|帖子 1556|积分 4668

作为一名高并发系统开辟工程师,在使用消息中间件的过程中,无法避免碰到系统中消息丢失的题目,而Kafka作为主流的消息队列系统,消息丢失题目尤为常见。
在这篇文章中,将深入浅出地分析Kafka消息丢失的各种情况,并提供真实且实用的解决方案。
一、Kafka基础知识

Kafka是什么?

Kafka是一个分布式的流处理平台,被广泛用于构建实时数据管道和流式应用。它具有高吞吐量、可靠性、可扩展性的特点,已成为大数据生态系统中不可或缺的组件。
Kafka的焦点概念


在上面的架构图中,Kafka的几个重要组件包括:

  • Broker - Kafka服务器,负责吸收、存储消息
  • Producer - 生产者,负责发送消息到Kafka
  • Consumer - 消费者,负责从Kafka获取消息并处理
  • Topic - 主题,Kafka消息的逻辑分类
  • Partition - 分区,每个Topic可以有多个分区,分布在差别Broker上
  • Zookeeper - 管理Kafka集群元数据和和谐
二、Kafka消息丢失的情况分析

Kafka虽然以高可靠性著称,但在三个环节仍大概发生消息丢失:

  • 生产者端:消息发送失败或确认机制配置不当
  • Broker端:服务器宕机或磁盘故障
  • 消费者端:提交偏移量后处理消息失败
1. 生产者端消息丢失

生产者端消息丢失通常发生在以下情况:

生产者端丢失原因分析

  • Fire and Forget(发后即忘)模式

    • 当使用acks=0配置时,生产者不等待服务器的确认就认为消息发送乐成
    • 如果网络出现题目或Broker宕机,消息会丢失但生产者不会感知

  • 异步发送未处理回调

    • 使用异步发送时,如果回调中未精确处剃头送失败的情况
    • 消息发送失败但程序继承实行,导致消息丢失

  • 重试次数不足

    • 当网络抖动或Broker临时不可用,默认重试次数不足大概导致消息丢失

2. Broker端消息丢失

Broker端的消息丢失重要与以下因素有关:

Broker端丢失原因分析

  • 副本因子(replication.factor)设置过低

    • 当副本因子为1时,表现数据只存在于一个Broker上
    • 如果该Broker宕机,数据将完全丢失

  • 最小同步副本(min.insync.replicas)配置不当

    • 如果配置为1,则只要Leader副本确认就会返回ack
    • 如果在数据同步到follower前Leader宕机,数据会丢失

  • 允许非同步副本推举为Leader

    • 默认配置unclean.leader.election.enable=false
    • 如果设置为true,大概推举落后的副本作为新Leader,导致数据丢失

3. 消费者端消息丢失

消费者端丢失重要发生在以下情况:

消费者端丢失原因分析

  • 主动提交偏移量

    • 默认情况下,消费者会主动提交偏移量(enable.auto.commit=true)
    • 如果提交了偏移量但处理消息过程中失败,重启后会从已提交的偏移量开始消费,导致消息丢失

  • 先提交偏移量再处理消息

    • 如果手动提交偏移量时,在消息处理完成前就提交
    • 处理过程中出现异常,消息实际未处理乐成但偏移量已提交

三、如何防止Kafka消息丢失

现在让我们来看看如安在各环节防止消息丢失。
1. 生产者端防止消息丢失

  1. // 1. 生产者配置示例
  2. Properties props = new Properties();
  3. // 设置broker集群地址
  4. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
  5. // 设置key/value序列化器
  6. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  7. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8. // 设置确认机制为all,所有ISR副本都确认才算成功
  9. props.put(ProducerConfig.ACKS_CONFIG, "all");
  10. // 设置重试次数
  11. props.put(ProducerConfig.RETRIES_CONFIG, 10);
  12. // 设置重试间隔
  13. props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
  14. // 幂等性,避免消息重复
  15. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  16. // 设置拦截器(可选)
  17. props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.kafka.MyProducerInterceptor");
  18. // 创建生产者
  19. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  20. // 2. 同步发送示例
  21. try {
  22.     ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
  23.     // 同步发送,等待服务器确认
  24.     RecordMetadata metadata = producer.send(record).get();
  25.     System.out.println("Message sent successfully to partition " + metadata.partition() + " offset " + metadata.offset());
  26. } catch (Exception e) {
  27.     // 处理发送异常
  28.     System.err.println("Error sending message: " + e.getMessage());
  29.     // 可以进行重试或其他补偿措施
  30. }
  31. // 3. 异步发送示例(带回调处理)
  32. ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
  33. producer.send(record, new Callback() {
  34.     @Override
  35.     public void onCompletion(RecordMetadata metadata, Exception exception) {
  36.         if (exception != null) {
  37.             // 发送失败,进行错误处理
  38.             System.err.println("Failed to send message: " + exception.getMessage());
  39.             // 可以将消息保存到本地,稍后重试
  40.             saveToLocalStorage(record);
  41.         } else {
  42.             // 发送成功
  43.             System.out.println("Message sent successfully to partition " + metadata.partition() + " offset " + metadata.offset());
  44.         }
  45.     }
  46. });
  47. // 关闭生产者
  48. producer.close();
复制代码
关键配置

  • acks=all:确保消息被全部同步副本(ISR)吸收才视为乐成
  • retries:设置足够的重试次数,避免因网络抖动导致的发送失败
  • enable.idempotence=true:启用幂等性,避免消息重复
  • 使用同步发送或精确处理异步回调:捕获并处剃头送异常
最佳实践


  • 使用带回调的异步发送进步性能,同时确保在回调中精确处理异常
  • 对关键业务消息,可使用本地消息表或变乱消息
  • 实现自界说拦截器来记录发送失败的消息
2. Broker端防止消息丢失

  1. # broker端配置示例 (server.properties)
  2. ############################# 副本配置 #############################
  3. # 设置Topic默认副本数为3
  4. default.replication.factor=3
  5. # 设置最小同步副本数
  6. min.insync.replicas=2
  7. # 禁止非同步副本成为Leader(默认值为false)
  8. unclean.leader.election.enable=false
  9. ############################# 数据可靠性配置 #############################
  10. # 设置数据刷盘策略,有以下选项:
  11. # 0: 由操作系统控制,性能最好但风险最高
  12. # 1: 每秒刷盘一次,平衡性能和可靠性
  13. # -1: 每次写入都刷盘,可靠性最高但性能最差
  14. log.flush.interval.messages=1000
  15. log.flush.interval.ms=1000
  16. # 指定分区在内存中的保留时间,默认为-1(不受限制)
  17. log.retention.ms=604800000  # 7天
  18. ############################# Topic默认配置 #############################
  19. # 创建topic时的默认分区数
  20. num.partitions=3
  21. ############################# 其他重要配置 #############################
  22. # 控制器failover超时时间,大型集群可适当增加此值
  23. controller.socket.timeout.ms=30000
  24. # Topic创建命令示例(针对特别重要的Topic单独配置)
  25. bin/kafka-topics.sh --create --bootstrap-server broker1:9092 --topic important-topic \
  26.   --partitions 6 --replication-factor 3 \
  27.   --config min.insync.replicas=2 \
  28.   --config flush.messages=1 \
  29.   --config retention.ms=1209600000
复制代码
关键配置

  • replication.factor=3:设置足够的副本数,通常为3
  • min.insync.replicas=2:要求至少2个副本同步乐成才认为写入乐成
  • unclean.leader.election.enable=false:禁止未同步的副本成为leader
  • log.flush.interval.ms:控制数据刷盘频率,衡量性能和可靠性
最佳实践


  • 至少3个Broker的集群,保证高可用性
  • 对重要Topic增长副本因子,进步可靠性
  • 定期监控ISR副本数量,确保副本健康
  • 公道配置刷盘计谋,避免因服务器宕机导致数据丢失
3. 消费者端防止消息丢失

  1. // 1. 消费者配置示例
  2. Properties props = new Properties();
  3. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
  4. props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
  5. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  6. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  7. // 关闭自动提交
  8. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  9. // 如果使用自动提交,设置较大的提交间隔
  10. // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  11. // 从最早的偏移量开始消费(可选,首次消费时使用)
  12. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  13. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  14. consumer.subscribe(Arrays.asList("my-topic"));
  15. // 2. 手动确认 - 先处理消息再提交偏移量
  16. try {
  17.     while (true) {
  18.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  19.         if (!records.isEmpty()) {
  20.             // 处理消息批次
  21.             processRecords(records);
  22.             
  23.             // 成功处理后再提交偏移量
  24.             consumer.commitSync();
  25.         }
  26.     }
  27. } catch (Exception e) {
  28.     // 处理异常
  29. } finally {
  30.     consumer.close();
  31. }
  32. // 3. 更精细的偏移量控制 - 按分区提交偏移量
  33. try {
  34.     while (true) {
  35.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  36.         
  37.         // 按分区处理记录
  38.         Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  39.         
  40.         for (TopicPartition partition : records.partitions()) {
  41.             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  42.             for (ConsumerRecord<String, String> record : partitionRecords) {
  43.                 // 处理单条消息
  44.                 processRecord(record);
  45.             }
  46.             
  47.             // 获取该分区最后一条记录的偏移量
  48.             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
  49.             // 存储该分区的偏移量
  50.             currentOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
  51.         }
  52.         
  53.         // 提交处理完的分区偏移量
  54.         if (!currentOffsets.isEmpty()) {
  55.             consumer.commitSync(currentOffsets);
  56.         }
  57.     }
  58. } catch (Exception e) {
  59.     // 处理异常
  60. } finally {
  61.     consumer.close();
  62. }
  63. // 4. 处理消息的事务方式
  64. private void processRecordsWithTransaction(ConsumerRecords<String, String> records) {
  65.     // 开启数据库事务
  66.     Connection conn = null;
  67.     try {
  68.         conn = dataSource.getConnection();
  69.         conn.setAutoCommit(false);
  70.         
  71.         // 处理消息并执行数据库操作
  72.         for (ConsumerRecord<String, String> record : records) {
  73.             processRecordInTransaction(conn, record);
  74.         }
  75.         
  76.         // 提交数据库事务
  77.         conn.commit();
  78.         
  79.         // 提交Kafka偏移量
  80.         consumer.commitSync();
  81.     } catch (Exception e) {
  82.         // 回滚数据库事务
  83.         if (conn != null) {
  84.             try {
  85.                 conn.rollback();
  86.             } catch (SQLException ex) {
  87.                 ex.printStackTrace();
  88.             }
  89.         }
  90.         throw new RuntimeException("Failed to process records", e);
  91.     } finally {
  92.         // 关闭连接
  93.         if (conn != null) {
  94.             try {
  95.                 conn.close();
  96.             } catch (SQLException e) {
  97.                 e.printStackTrace();
  98.             }
  99.         }
  100.     }
  101. }
复制代码
关键配置与计谋

  • 关闭主动提交偏移量(enable.auto.commit=false):手动控制偏移量提交
  • 先处理消息再提交偏移量:确保消息处理乐成后再提交
  • 按分区提交偏移量:更精致的控制,进步并发处理本领
  • 使用变乱处理:将数据库操作和偏移量提交放在同一变乱中
最佳实践


  • 保证消息处理的幂等性,处理潜在的重复消费
  • 实现消费失败重试机制,例如把失败消息放入"死信队列"
  • 根据业务需求,思量将重要消息长期化到本地
  • 监控消费延迟,实时发现消费异常
四、深入明白Kafka可靠性机制

1. 生产者可靠性原理


生产者工作原理

  • 消息累加器(RecordAccumulator):生产者并不是每条消息都立即发送,而是会缓存到一个消息累加器中,比及达一定条件后批量发送
  • 消息累加器触发条件

    • batch.size:当累积的数据量达到设定值
    • linger.ms:当数据累积时间达到设定值

  • Sender线程:单独的线程负责将消息批次从累加器中取出并发送到对应的Broker
  • 确认机制(acks)

    • acks=0:不等待确认
    • acks=1:只等待Leader副本确认
    • acks=all:等待全部ISR副本确认

2. Broker数据可靠性原理


Broker副本机制

  • ISR(In-Sync Replicas):与Leader保持同步的副本集合,只有ISR中的副本才能被选为新Leader
  • HW(High Watermark):消费者能看到的最高偏移量,ISR中全部副本都复制了的位置

    • HW之前的消息对消费者可见
    • 只有写入HW之前的消息才被认为是"已提交"的消息

  • LEO(Log End Offset):每个副本的日志末端位置

    • Leader副本LEO > HW:表现有新消息写入但未同步完成
    • Follower副本LEO < Leader LEO:表现副本正在追赶Leader

  • 数据同步流程

    • Leader吸收消息后追加到本地日志
    • Follower通过fetch请求从Leader拉取消息
    • Leader更新HW(取全部ISR副本LEO的最小值)
    • 当acks=all时,只有消息被全部ISR副本同步后才返回乐成

3. 消费者可靠性原理


消费者偏移量管理

  • 偏移量(Offset)概念

    • 每条消息在分区中的唯一标识
    • 消费者通过记录消费偏移量来追踪消费进度

  • 偏移量存储

    • 0.9版本之前:存储在ZooKeeper中
    • 0.9版本之后:存储在名为__consumer_offsets的内部Topic中

  • 提交偏移量的方式

    • 主动提交:enable.auto.commit=true,按固定时间隔断主动提交
    • 同步手动提交:commitSync(),壅闭直到提交乐成
    • 异步手动提交:commitAsync(),不壅闭,通过回调获取效果

  • 消息处理与偏移量提交顺序

    • 先消费后提交:大概重复消费,但不会丢失
    • 先提交后消费:大概丢失消息,但不会重复

  • 消费者再均衡

    • 消费者加入/离开群组时触发再均衡
    • 再均衡过程中,分区全部权会转移,需要精确处理偏移量提交
    • 通过ConsumerRebalanceListener接口可以处理再均衡变乱

五、扩展话题,Kafka消息保证语义

1. 消息传递语义


Kafka支持三种消息传递语义

  • At Most Once (最多一次)

    • 消息大概丢失,但绝不重复
    • 性能最好,可靠性最低
    • 适用场景:日志收集、指标监控等允许少量数据丢失的场景

  • At Least Once (至少一次)

    • 消息不会丢失,但大概重复
    • 性能较好,可靠性较高
    • 适用场景:大多数业务场景,需要确保消费端实现幂等性

  • Exactly Once (恰恰一次)

    • 消息既不丢失也不重复
    • 需要Kafka 0.11+版本支持
    • 通过生产者幂等性和变乱特性实现
    • 适用场景:金融生意业务、计费系统等对数据精确性要求高的场景

2. 实现Exactly Once语义

Kafka 0.11版本后引入了变乱支持,可以实现真正的Exactly Once语义:
  1. // 1. 生产者端实现Exactly Once (使用事务API)
  2. Properties props = new Properties();
  3. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
  4. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  5. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  6. // 启用幂等性
  7. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  8. // 设置事务ID (必须唯一)
  9. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
  10. // 设置事务超时时间
  11. props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); // 60秒
  12. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  13. // 初始化事务
  14. producer.initTransactions();
  15. try {
  16.     // 开始事务
  17.     producer.beginTransaction();
  18.    
  19.     // 在事务中发送消息
  20.     for (int i = 0; i < 10; i++) {
  21.         ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
  22.         producer.send(record);
  23.     }
  24.    
  25.     // 提交事务
  26.     producer.commitTransaction();
  27. } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
  28.     // 这些异常无法恢复,需要关闭生产者
  29.     producer.close();
  30. } catch (KafkaException e) {
  31.     // 其他异常可以中止事务
  32.     producer.abortTransaction();
  33. } finally {
  34.     producer.close();
  35. }
  36. // 2. 消费者端实现Exactly Once
  37. Properties consumerProps = new Properties();
  38. consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
  39. consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
  40. consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  41. consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  42. // 设置隔离级别,只读取已提交的消息
  43. consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
  44. // 关闭自动提交
  45. consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  46. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
  47. consumer.subscribe(Arrays.asList("my-topic"));
  48. // 3. 消费-处理-生产模式 (消费者端读取+生产者端写入,事务性处理)
  49. Properties processorProps = new Properties();
  50. // ... 设置基础配置同上 ...
  51. // 设置事务ID
  52. processorProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-processor-transactional-id");
  53. // 启用幂等性
  54. processorProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  55. KafkaProducer<String, String> processorProducer = new KafkaProducer<>(processorProps);
  56. processorProducer.initTransactions();
  57. try {
  58.     while (true) {
  59.         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  60.         if (!records.isEmpty()) {
  61.             // 存储当前消费的偏移量
  62.             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  63.             for (ConsumerRecord<String, String> record : records) {
  64.                 TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
  65.                 OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
  66.                 offsets.put(topicPartition, offsetAndMetadata);
  67.             }
  68.             
  69.             // 开始事务
  70.             processorProducer.beginTransaction();
  71.             
  72.             try {
  73.                 // 处理消息并产生新消息
  74.                 for (ConsumerRecord<String, String> record : records) {
  75.                     // 业务处理
  76.                     String processedValue = processRecord(record.value());
  77.                     // 发送处理后的消息
  78.                     ProducerRecord<String, String> outputRecord =
  79.                         new ProducerRecord<>("output-topic", record.key(), processedValue);
  80.                     processorProducer.send(outputRecord);
  81.                 }
  82.                
  83.                 // 在同一事务中提交消费者偏移量
  84.                 processorProducer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
  85.                
  86.                 // 提交事务
  87.                 processorProducer.commitTransaction();
  88.             } catch (Exception e) {
  89.                 // 发生异常,中止事务
  90.                 processorProducer.abortTransaction();
  91.                 throw e;
  92.             }
  93.         }
  94.     }
  95. } finally {
  96.     processorProducer.close();
  97.     consumer.close();
  98. }
  99. // 简单的消息处理方法
  100. private static String processRecord(String input) {
  101.     // 实际的业务处理逻辑
  102.     return "processed-" + input;
  103. }
复制代码
变乱API关键点

  • Transactional ID

    • 每个生产者需要指定一个唯一的变乱ID
    • 变乱ID可以跨会话保持变乱状态,实现故障恢复

  • 变乱的基本操作

    • initTransactions():初始化变乱情况
    • beginTransaction():开始一个新变乱
    • commitTransaction():提交变乱
    • abortTransaction():中止变乱

  • 消费-处理-生产场景

    • 在同一个变乱中可以将消费者偏移量与生产者发送的消息绑定
    • 通过sendOffsetsToTransaction()方法实现

  • 消费者隔离级别

    • 通常设置为read_committed,只读取已提交的变乱消息
    • 避免读取到大概会被中止的变乱消息

六、Kafka消息丢失的监控与告警

为了实时发现消息丢失题目,创建健全的监控系统至关重要。
1. 关键监控指标


  • 生产者端

    • errors-per-second:错误率
    • record-error-rate:消息错误率
    • record-retry-rate:消息重试率

  • Broker端

    • UnderReplicatedPartitions:副本同步不足的分区数
    • OfflinePartitionsCount:离线分区数
    • ISRShrink/ISRExpand:ISR紧缩/扩大变乱

  • 消费者端

    • records-lag:消费延迟(表现有多少消息未消费)
    • records-lag-max:最大消费延迟
    • records-lag-avg:均匀消费延迟

2. 监控系统搭建

常用监控组合:


  • Kafka内置JMX监控 + Prometheus + Grafana
  • Kafka Manager/CMAK + 自界说告警
  • Confluent Control Center(商业版)
七、口试热点题目

1. Kafka如何保证消息不丢失?

生产者端


  • 使用acks=all确保全部副本吸收消息
  • 配置足够的重试次数
  • 实现异步发送的错误回调处理
  • 必要时使用变乱API
Broker端


  • 配置公道的副本因子(至少3)
  • 设置min.insync.replicas >= 2
  • 禁用unclean.leader.election
  • 公道配置数据长期化计谋
消费者端


  • 关闭主动提交或增长主动提交隔断
  • 使用手动提交,并在处理乐成后提交
  • 实现消息处理的幂等性
2. Kafka的消息传递语义有哪些?各适用于什么场景?



  • At Most Once:适用于允许少量数据丢失的场景,如日志收集
  • At Least Once:适用于大多数业务场景,要求实现消费端幂等性
  • Exactly Once:适用于金融、计费等对数据精确性要求高的场景
3. Kafka如何实现Exactly Once语义?

实现Exactly Once需要从两方面保证:


  • 生产者到Broker:使用幂等性生产者(enable.idempotence=true)和变乱API
  • Broker到消费者:设置消费者隔离级别为read_committed,只读取已提交的变乱消息
  • 端到端:通过变乱机制将消费者偏移量提交和生产者发送消息绑定在同一变乱中
关键实现技术:

  • 生产者PID(Producer ID)和序列号实现幂等性
  • 变乱和谐者(Transaction Coordinator)管理变乱状态
  • 控制消息(Control Messages)标记变乱边界
4. 如何处理Kafka的消息积存题目?


  • 定位原因:确定是生产速率过快还是消费速率过慢
  • 进步消费本领

    • 增长消费者数量(需要增长分区数支持)
    • 优化消费者处理逻辑,减少单条消息处理时间
    • 进步消费者批量处理本领(max.poll.records)

  • 临时措施

    • 启动独立消费者组举行"追赶"处理
    • 对非关键消息实现"抛弃计谋"
    • 消息隔断采样消费

5. Kafka的LEO和HW是什么?与消息丢失有什么关系?

LEO (Log End Offset):分区中最后一条消息的偏移量+1,表现下一条待写入消息的偏移量。
HW (High Watermark):全部ISR副本都已复制的最高偏移量,只有HW之前的消息对消费者可见。
与消息丢失的关系:


  • 只有HW之前的消息才被认为已提交,对消费者可见
  • Leader宕机时,新推举的Leader将截断日志到HW位置,HW之后的消息会丢失
  • min.insync.replicas设置过小大概导致消息在HW提交前丢失
6. 在Kafka系统中,消息重复和消息丢失哪个更轻易处理?为什么?

消息重复通常比消息丢失更轻易处理:

  • 重复处理:可以通过设计幂等性操作解决重复题目

    • 使用唯一ID去重
    • 依赖数据库唯一束缚
    • 使用业务状态机

  • 消息丢失:一旦丢失,数据大概无法恢复

    • 需要从源头重新生成数据
    • 大概需要举行数据修复和业务赔偿
    • 有时无法确定哪些数据丢失了

所以在设计系统时,通常宁可接受重复也不接受丢失。
总结

Kafka消息丢失是生产情况中常见的题目,但通过精确的配置和设计可以有效避免。
对于差别的业务场景,我们需要根据数据可靠性要求和性能需求,选择符合的解决方案。


  • 对于一样平常业务,At Least Once语义加上消费端幂等设计是最佳选择
  • 对于金融、支付等焦点业务,应思量使用Exactly Once语义
  • 对于日志、监控等非关键业务,可以使用At Most Once语义以获得更好的性能
最后,创建完善的监控系统,实时发现并处理潜在的消息丢失题目,是保障Kafka系统稳固运行的重要保障。
Kafka不丢失消息的配置就像我们一样平常生活中的"安全带+安全气囊+ABS"组合,层层保护,共同确保消息传输的安全可靠。只有明白了原理,我们才能更加胸有成竹地应对各种寻衅。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

冬雨财经

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表