Kafka-生产者消息幂等性和消息事件机制

打印 上一主题 下一主题

主题 1958|帖子 1958|积分 5874

1、生产者消息幂等性

1.1、Kafka幂等性标题

当Producer的acks=1或-1时,Producer每次发送消息都需要获取Broker端返回的RecordMetadata。这个过程需要两次跨网络请求。

如果要包管消息安全,对于每个消息,这两次网络请求就必须要求是幂等的。但是,网络是不可靠的,特别是在高并发场景下,通常没办法包管这两个请求是幂等的。
Producer发送消息的过程中,如果第一步请求成功了,第二步大概没有返回。这时,Producer就会认为消息发送失败了。Producer会发起重试。重试次数由参数ProducerConfig.RETRIES_CONFIG,默认值Integer.MAX。
Producer会重复发送多条消息到Broker中。Kafka如何包管无论Producer向Broker发送多少次重复的数据,Broker端都只保存一条消息,而不会重复保存多条消息呢?这就是Kafka消息生产者的幂等性标题。
Kafka中对于幂等性属性的三个参数:


  • ENABLE_IDEMPOTENCE_CONFIG
  • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
  • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
   在配置类:ProducerConfig.java
  1.2、Kafka幂等性计划

要理解幂等性计划,需要先理解分布式数据通报过程中的三个数据语义:


  • at-least-once:至少一次,可以包管数据不丢失,但不能包管数据不重复。
  • at-most-once:最多一次,包管数据不重复,但不能包管数据不丢失。
  • exactly-once:精确一次,要求数据既不重复也不丢失。
Kafka 的幂等性计划通过以下两个核心组件实现:


  • Producer ID(PID)

    • 每个生产者实例在初始化时会被分配一个唯一的 PID。
    • PID 用于标识消息的泉源生产者。

  • Sequence Number(SN)

    • 每个 PID 对应一个单调递增的序列号(Sequence Number)。
    • 每条消息都会附带一个 SN,用于标识消息的次序。

通过 PID 和 SN,Kafka Broker 可以检测并抛弃重复的消息,从而实现幂等性。
1.3、幂等性工作原理


  • 生产者初始化

    • 生产者启动时,向 Broker 请求一个唯一的 PID。
    • 生产者为每条消息分配一个单调递增的 SN。

  • 消息发送

    • 生产者发送消息时,附带 PID 和 SN。
    • Broker 收到消息后,检查 PID 和 SN 是否合法:

      • 如果 SN 比 Broker 记载的下一个预期 SN 小,说明是重复消息,抛弃。
      • 如果 SN 等于预期 SN,担当消息并更新预期 SN。
      • 如果 SN 大于预期 SN,说明消息丢失或乱序,拒绝消息并触发错误。


  • Broker 处理

    • Broker 为每个 PID 维护一个 lastSequenceNumber,记载末了担当的 SN。
    • 通过比力 lastSequenceNumber 和消息的 SN,判断消息是否重复。

1.4、配置幂等性

在 Kafka 生产者客户端中,可以通过以下配置启用幂等性:
  1. // 启用幂等性
  2. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
复制代码
启用后,Kafka 会自动分配 PID 并管理 SN。
2、生产者消息事件

Kafka 的 生产者消息事件 是一种机制,用于确保生产者发送的消息在多个分区之间具有原子性(要么全部成功,要么全部失败)。这对于需要强一致性和精确一次语义(exactly-once)的场景非常紧张。
2.1. 事件的核心概念



  • 事件性生产者(Transactional Producer)

    • 生产者可以开启一个事件,在事件内发送多条消息。
    • 事件提交后,消息对全部消费者可见;事件回滚后,消息被抛弃。

  • 事件性消费者(Transactional Consumer)

    • 消费者可以配置为只读取已提交的事件消息(默认行为)。

  • 事件和谐器(Transaction Coordinator)

    • Kafka Broker 中的一个组件,负责管理事件的状态(如事件的开始、提交和回滚)。

2.2、事件的工作原理


  • 初始化事件

    • 生产者向事件和谐器注册,获取一个唯一的事件 ID(Transactional ID)。
    • 事件和谐器为每个事件 ID 维护一个 PID(Producer ID)和 Epoch(用于防止僵尸生产者)。

  • 开始事件

    • 生产者调用 beginTransaction() 开始一个新事件。

  • 发送消息

    • 生产者在事件内发送消息,消息会附带事件 ID 和序列号(Sequence Number)。
    • 这些消息在事件提交前对其他消费者不可见。

  • 提交事件

    • 生产者调用 commitTransaction() 提交事件。
    • 事件和谐器将事件标记为已提交,消息对全部消费者可见。

  • 回滚事件

    • 生产者调用 abortTransaction() 回滚事件。
    • 事件和谐器将事件标记为已回滚,消息被抛弃。

2.3、事件的注意事项



  • 事件的配置

      1. // 事物ID,每个生产者实例的事务 ID 必须唯一
      2. props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "自定义");
      复制代码

  • 事件 ID 的唯一性

    • 每个生产者实例的事件 ID 必须唯一,否则会导致事件冲突。

  • 事件的隔离性

    • 默认环境下,消费者只能读取已提交的事件消息。
    • 可以通过配置 isolation.level 控制消费者的隔离级别:

      • read_committed:只读取已提交的消息(默认)。
      • read_uncommitted:读取全部消息(包罗未提交的)。


  • 事件的性能开销

    • 事件会增加一定的性能开销,适用于需要强一致性的场景。

2.4、事件的应用场景



  • 跨分区原子写入
    比方,将订单信息和库存更新写入差别的 Topic,确保两者要么同时成功,要么同时失败。
  • 精确一次处理(Exactly-Once Semantics)
    通过事件和幂等性,确保消息不会重复处理。
2.5、代码实验

测试流程:先启动一个消费者,再启动这个生产者,进行试验。
分两个场景验证:
非常事件:生产者事件回滚没有提交,消费者端没有消费记载。

正常事件:生产者事件提交,消费者端有消费记载。

   非常事件:TransactionErrorDemo.java
  正常事件:TransactionProducer.java
  测试代码,文末有获取方式。
  3、我的公众号&资料获取

敬请关注我的公众号:大象只为你,连续更新技术知识…
相关资料获取:
如需SpringKafka项目Demo,请后台回复:【springkafka】,代码有更新。
我的代码是基于SpringBoot3.x上写的,IDEA 2024版

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表