Kafka Producer之幂等性

打印 上一主题 下一主题

主题 1010|帖子 1010|积分 3030

幂等性通过消耗时间和性能的方式,办理乱序和重复问题。
但是只能包管同一生产者在一个分区中的幂等性。
1. 启用幂等性

  1.         //创建producer
  2.         HashMap<String, Object> config = new HashMap<>();
  3.         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
  4.         config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  5.         config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  6.         //配置acks等级
  7.         config.put(ProducerConfig.ACKS_CONFIG, "-1");
  8.         //启用幂等性
  9.         config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  10.         // 消息失败重试次数
  11.         config.put(ProducerConfig.RETRIES_CONFIG, 5);
  12.         config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
  13.         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
复制代码
幂等性操作要求:

  • ACKS = -1
  • 开启重试机制
  • 在途哀求缓冲区不能大于5
2. 底层变化

消息会被标志,包含生产者ID和消息序列号。
( 如果生产者重启,那么ID会变化,这会使得下图记录无效,幂等性短暂失效。)
并且broker中的ProducerState会记录每个分区的生产者状态,包括最新5个消息的序列号。

3. 数据不重复

消息来到broker分区,经过ProducerState的数据进行对比,


  • 重复则抛弃消息,返回ack。
  • 否则Broker存储消息并返回ack。
4. 数据有序

消息来到broker分区,经过ProducerState的数据进行对比,


  • 如果新消息的序列号是连续的,Broker会接受并存储该消息,然后更新最新序列号。
  • 如果新消息的序列号不连续,Broker会认为这是重复消息或乱序消息,根据配置,它可能会抛弃或拒绝该消息。
  • 无论消息被接受还是抛弃,Broker都会返回一个ack给生产者。
不连续时可能拒绝多个消息,那么这些消息都会返回生产者重新发送,直到按顺序下一个消息到来,才存储并更新。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

河曲智叟

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表