自由的羽毛 发表于 6 天前

Kafka-生产者消息幂等性和消息事件机制

1、生产者消息幂等性

1.1、Kafka幂等性标题

当Producer的acks=1或-1时,Producer每次发送消息都需要获取Broker端返回的RecordMetadata。这个过程需要两次跨网络请求。
https://i-blog.csdnimg.cn/direct/d5daa023177941b8b0c2676f9a733143.jpeg#pic_center
如果要包管消息安全,对于每个消息,这两次网络请求就必须要求是幂等的。但是,网络是不可靠的,特别是在高并发场景下,通常没办法包管这两个请求是幂等的。
Producer发送消息的过程中,如果第一步请求成功了,第二步大概没有返回。这时,Producer就会认为消息发送失败了。Producer会发起重试。重试次数由参数ProducerConfig.RETRIES_CONFIG,默认值Integer.MAX。
Producer会重复发送多条消息到Broker中。Kafka如何包管无论Producer向Broker发送多少次重复的数据,Broker端都只保存一条消息,而不会重复保存多条消息呢?这就是Kafka消息生产者的幂等性标题。
Kafka中对于幂等性属性的三个参数:


[*]ENABLE_IDEMPOTENCE_CONFIG
[*]MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
[*]MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
   在配置类:ProducerConfig.java
1.2、Kafka幂等性计划

要理解幂等性计划,需要先理解分布式数据通报过程中的三个数据语义:


[*]at-least-once:至少一次,可以包管数据不丢失,但不能包管数据不重复。
[*]at-most-once:最多一次,包管数据不重复,但不能包管数据不丢失。
[*]exactly-once:精确一次,要求数据既不重复也不丢失。
Kafka 的幂等性计划通过以下两个核心组件实现:


[*]Producer ID(PID):

[*]每个生产者实例在初始化时会被分配一个唯一的 PID。
[*]PID 用于标识消息的泉源生产者。

[*]Sequence Number(SN):

[*]每个 PID 对应一个单调递增的序列号(Sequence Number)。
[*]每条消息都会附带一个 SN,用于标识消息的次序。

通过 PID 和 SN,Kafka Broker 可以检测并抛弃重复的消息,从而实现幂等性。
1.3、幂等性工作原理


[*]生产者初始化:

[*]生产者启动时,向 Broker 请求一个唯一的 PID。
[*]生产者为每条消息分配一个单调递增的 SN。

[*]消息发送:

[*]生产者发送消息时,附带 PID 和 SN。
[*]Broker 收到消息后,检查 PID 和 SN 是否合法:

[*]如果 SN 比 Broker 记载的下一个预期 SN 小,说明是重复消息,抛弃。
[*]如果 SN 等于预期 SN,担当消息并更新预期 SN。
[*]如果 SN 大于预期 SN,说明消息丢失或乱序,拒绝消息并触发错误。


[*]Broker 处理:

[*]Broker 为每个 PID 维护一个 lastSequenceNumber,记载末了担当的 SN。
[*]通过比力 lastSequenceNumber 和消息的 SN,判断消息是否重复。

1.4、配置幂等性

在 Kafka 生产者客户端中,可以通过以下配置启用幂等性:
// 启用幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
启用后,Kafka 会自动分配 PID 并管理 SN。
2、生产者消息事件

Kafka 的 生产者消息事件 是一种机制,用于确保生产者发送的消息在多个分区之间具有原子性(要么全部成功,要么全部失败)。这对于需要强一致性和精确一次语义(exactly-once)的场景非常紧张。
2.1. 事件的核心概念



[*]事件性生产者(Transactional Producer):

[*]生产者可以开启一个事件,在事件内发送多条消息。
[*]事件提交后,消息对全部消费者可见;事件回滚后,消息被抛弃。

[*]事件性消费者(Transactional Consumer):

[*]消费者可以配置为只读取已提交的事件消息(默认行为)。

[*]事件和谐器(Transaction Coordinator):

[*]Kafka Broker 中的一个组件,负责管理事件的状态(如事件的开始、提交和回滚)。

2.2、事件的工作原理


[*]初始化事件:

[*]生产者向事件和谐器注册,获取一个唯一的事件 ID(Transactional ID)。
[*]事件和谐器为每个事件 ID 维护一个 PID(Producer ID)和 Epoch(用于防止僵尸生产者)。

[*]开始事件:

[*]生产者调用 beginTransaction() 开始一个新事件。

[*]发送消息:

[*]生产者在事件内发送消息,消息会附带事件 ID 和序列号(Sequence Number)。
[*]这些消息在事件提交前对其他消费者不可见。

[*]提交事件:

[*]生产者调用 commitTransaction() 提交事件。
[*]事件和谐器将事件标记为已提交,消息对全部消费者可见。

[*]回滚事件:

[*]生产者调用 abortTransaction() 回滚事件。
[*]事件和谐器将事件标记为已回滚,消息被抛弃。

2.3、事件的注意事项



[*] 事件的配置:

[*] // 事物ID,每个生产者实例的事务 ID 必须唯一
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "自定义");


[*] 事件 ID 的唯一性:

[*]每个生产者实例的事件 ID 必须唯一,否则会导致事件冲突。

[*] 事件的隔离性:

[*]默认环境下,消费者只能读取已提交的事件消息。
[*]可以通过配置 isolation.level 控制消费者的隔离级别:

[*]read_committed:只读取已提交的消息(默认)。
[*]read_uncommitted:读取全部消息(包罗未提交的)。


[*] 事件的性能开销:

[*]事件会增加一定的性能开销,适用于需要强一致性的场景。

2.4、事件的应用场景



[*]跨分区原子写入:
比方,将订单信息和库存更新写入差别的 Topic,确保两者要么同时成功,要么同时失败。
[*]精确一次处理(Exactly-Once Semantics):
通过事件和幂等性,确保消息不会重复处理。
2.5、代码实验

测试流程:先启动一个消费者,再启动这个生产者,进行试验。
分两个场景验证:
非常事件:生产者事件回滚没有提交,消费者端没有消费记载。
https://i-blog.csdnimg.cn/direct/9d8a9ae353fa45b8a26d9c2759989788.png#pic_center
正常事件:生产者事件提交,消费者端有消费记载。
https://i-blog.csdnimg.cn/direct/d21f53cb2fcc4b089f585e3b5de1fe58.png#pic_center
   非常事件:TransactionErrorDemo.java
正常事件:TransactionProducer.java
测试代码,文末有获取方式。
3、我的公众号&资料获取

敬请关注我的公众号:大象只为你,连续更新技术知识…
相关资料获取:
如需SpringKafka项目Demo,请后台回复:【springkafka】,代码有更新。
我的代码是基于SpringBoot3.x上写的,IDEA 2024版

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka-生产者消息幂等性和消息事件机制