kafka相干标题

[复制链接]
发表于 2025-12-8 10:21:54 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
Kafka 通过事件机制与幂等性功能相团结,实现了跨会话的幂等性。以下是具体表明:
知识点:kafka生产者发送消息,是一个批次数据返回一个ack,而不是一个消息返回一个ack。

精准一次消耗的典范场景:金融生意业务体系

场景形貌
在金融生意业务体系中,用户发起一笔转账哀求时,体系必要确保该生意业务被处理惩罚且仅被处理惩罚一次。比方:

  • 用户A向用户B转账100元。
  • Kafka生产者将转账哀求作为消息发送到Kafka Topic。
  • 消耗者(生意业务处理惩罚服务)从Topic拉取消息,实验扣减A账户、增长B账户的操纵。
假如消息被重复消耗(至少一次语义),会导致用户A被扣款多次;假如消息丢失(最多一次语义),会导致生意业务未实验。因此,必须包管精准一次消耗(Exactly Once Semantics)。

怎样包管Kafka精准一次消耗?

Kafka通过以下机制实现精准一次语义(从生产者到消耗者的端到端包管):
1. 生产者端设置

幂等性(Idempotence)
设置参数:enable.idempotence=true
原理
生产者会为每条消息分配一个唯一PID(Producer ID)和递增的序列号(Sequence Number)。Broker会缓存每个PID的最新序列号,假如收到重复的序列号(比方因生产者重试),Broker会直接抛弃重复消息。
事件(Transactions)
设置参数:transactional.id=unique_id
原理
生产者通过事件机制将消息发送和偏移量提交绑定为一个原子操纵。事件基于两阶段提交(2PC):

  • Prepare阶段:生产者发送消息到Broker,Broker将消息写入日志日志但标记为“未提交”。
  • Commit阶段:生产者发送事件提交哀求,Broker将消息标记为“已提交”。
2. Broker端设置

副本机制(Replication)
Topic设置replication.factor>=3,min.insync.replicas=2,确保消息写入多个副本后才返回ACK,克制数据丢失。
3. 消耗者端设置

隔离级别(Isolation Level)
设置参数:isolation.level=read_committed
原理
消耗者只会读取已提交的事件消息,克制读取到未提交的中央状态数据。
事件型消耗
消耗者通过事件机制将消息处理惩罚与偏移量提交绑定:

  • 消耗者处理惩罚消息(比方更新数据库)。
  • 将处理惩罚效果和偏移量提交到事件中(原子性包管)。

焦点原理详解

1. 幂等性实现

PID与序列号
生产者启动时向Broker申请一个全局唯一的PID,每条消息附带PID + 分区号 + 序列号。Broker通过缓存验证序列号的连续性,拒绝重复消息。
2. 事件实现

事件调和器(Transaction Coordinator)
Kafka集群中有一个特别脚色(Transaction Coordinator),负责管理事件生命周期。
• 生产者向调和器注册transactional.id,获取PID和Epoch(防止僵尸生产者)。
• 调和器纪录事件状态(Begin、Prepare、Commit、Abort)。
两阶段提交(2PC)
  1. 生产者                     Broker(协调器)              消费者
  2. |                             |                             |
  3. | 1. Begin Transaction        |                             |
  4. |---------------------------->|                             |
  5. |                             |                             |
  6. | 2. Send Messages (Prepare) |                             |
  7. |---------------------------->|                             |
  8. |                             |                             |
  9. | 3. Commit Transaction       |                             |
  10. |---------------------------->|                             |
  11. |                             | 4. Mark Messages as Committed |
  12. |                             |                             |
  13. |                             | 5. Consumer Reads Committed Msg
  14. |                             |----------------------------->|
复制代码
3. 消耗者偏移量管理

原子性提交
消耗者将偏移量(Offset)和业务处理惩罚效果(比方数据库更新)放在同一个事件中。比方:
• 使用关系型数据库时,将Offset存储在业务表中,通过数据库事件包管原子性。
• 使用Kafka事件API时,通过producer.sendOffsetsToTransaction()提交偏移量。

完备流程示例(金融转账场景)


  • 生产者发送消息
    • 开启事件,发送转账消息到Topic。
    • 提交事件(消息标记为已提交)。
  • 消耗者处理惩罚消息
    • 以read_committed隔离级别读取消息。
    • 开启事件,实验扣款和入账操纵。
    • 提交事件(包罗业务操纵和偏移量提交)。
  • 故障规复
    • 假如生产者在Commit前瓦解,调和器会回滚事件,消息不会被消耗。
    • 假如消耗者在提交前瓦解,偏移量未更新,下次重启后重新消耗。

总结

精准一次消耗的焦点:通过幂等性、事件、原子性操纵实现端到端划一性。
实用场景:金融生意业务、订单处理惩罚、计费体系等对数据划一性要求极高的场景。
性能权衡:事件和幂等性会带来额外开销(如网络往返、日志日志写入),需根据业务需求选择是否启用。


max.in.flight.requests.per.connection 在 Kafka 中的寄义和设置

max.in.flight.requests.per.connection 是 Apache Kafka 中的一个紧张设置参数,它控制着生产者客户端的举动。
寄义

这个参数界说了:
• 单个网络毗连上允许的未确认哀求的最大数量
• 即生产者可以在收到服务器相应之前发送多少个消息批次
默认值通常是5,这意味着生产者可以在等候确认的同时最多发送5个消息批次到同一个broker。
作用

这个参数影响:

  • 吞吐量:值越大,生产者可以发送更多未确认的消息,大概进步吞吐量
  • 消息序次:当值大于1且启用了重试(retries > 0)时,大概导致消息序次杂乱
  • 内存使用:更高的值必要更多的内存来缓冲未确认的消息
怎样设置

生产者设置

在Kafka生产者设置中设置:
  1. // Java示例
  2. Properties props = new Properties();
  3. props.put("max.in.flight.requests.per.connection", "5");
  4. // 其他配置...
  5. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
复制代码
大概在生产者设置文件中:
  1. max.in.flight.requests.per.connection=5
复制代码
保举值


  • 必要严格序次:设置为1(确保消息序次)
  • 高吞吐量优先:可以设置为5或更高(但要注意序次标题)
  • 平衡思量:通常3-5是一个公道的折中值
注意事项

• 当enable.idempotence=true(启用幂等生产者)时,此参数不能大于5
• 与acks和retries参数有交互影响
• 在Kafka 0.11及以上版本中,使用幂等生产者可以办理序次标题
这个参数必要根据您的具体需求(吞吐量 vs 序次包管)和Kafka版原来举行调优。

kafka是怎么通过事物包管跨会话的幂等性?

1. 幂等性与跨会话幂等性


  • 幂等性:指雷同的操纵被实验多次,其效果是一样的。在 Kafka 中,紧张是指生产者发送雷同的消息不会导致重复。
  • 跨会话幂等性:在生产者会话关闭并重启后,Kafka 仍能包管发送的消息不会被重复处理惩罚。

2. Kafka 的幂等性原理

Kafka 的幂等性紧张通过 Producer ID(PID)Sequence Number(序列号) 来实现:

  • Producer ID (PID)

    • Kafka 为每个生产者分配一个唯一的 PID。
    • PID 在生产者实例重启时会发生厘革。

  • Sequence Number

    • 每个分区内的每条消息都会有一个严格递增的序列号。
    • Broker 通过比力当前吸收到的消息序列号和之前存储的序列号,判定是否是重复消息。

幂等性只在单个会话内收效,由于生产者重启后,PID 会发生厘革,导致之前的 Sequence Number 信息无法继续使用。

3. 跨会话幂等性的挑衅

生产者在重启后,怎样克制之前发送的消息与新会话的消息辩论?这就必要事件的支持。

4. 事件怎样包管跨会话幂等性

Kafka 的事件机制通过以下步调实现跨会话的幂等性:
1) 事件 ID (Transactional ID)


  • 事件 ID 是幂等性和事件的关键点

    • 与 Producer ID 差别,事件 ID 是应用层界说的,用于标识逻辑上的生产者实例。
    • Kafka 通过事件 ID 追踪生产者的状态,纵然生产者实例重启,Kafka 仍然知道之前的事件纪录。

2) 事件管理


  • 事件调和器 (Transaction Coordinator)

    • Kafka 集群中的一个组件,负责管理事件状态和日志日志
    • 通过事件 ID 映射到特定的事件调和器。

  • 事件状态存储

    • 事件的状态信息(如最新的 PID 和对应的事件状态)存储在 Kafka 的内部主题 __transaction_state 中。
    • 当生产者重启时,Kafka 根据事件 ID 规复生产者的 PID 和事件上下文。

3) 幂等性与事件的团结


  • 事件性生产者

    • 当生产者开启事件功能时,Kafka 天生一个新的 PID,同时确保事件 ID 和 PID 的关联。
    • 纵然生产者重启,Kafka 仍能通过事件 ID 规复会话,包管幂等性。

  • 幂等性校验

    • 事件管分析查抄消息的 Sequence Number 是否符合逻辑序次。
    • 重复的消息会被直接抛弃。


5. 示例流程


  • 生产者启动

    • 指定 transactional.id,启动一个事件性生产者。
    • Kafka 分配一个新的 PID。

  • 消息生产与提交

    • 消息被写入 Kafka 的分区,同时标记为事件性消息。
    • 提交时,Kafka 将更新事件状态为 COMMITTED。

  • 生产者重启

    • Kafka 根据事件 ID 规复 PID 和未完成的事件状态。
    • 未完成的事件会被标记为 ABORTED,抛弃其未提交的消息。

  • 新的消息发送

    • 新的会话中继续使用规复的上下文,包管消息的幂等性和事件划一性。


6. 注意事项


  • 开启事件功能必要设置:
    1. enable.idempotence=true
    2. transactional.id=<事务ID>
    复制代码
  • 跨会话的幂等性依赖于事件 ID,因此事件 ID 应该是全局唯一的且与生产者实例绑定。

通过上述机制,Kafka 可以或许在跨会话场景下团结事件与幂等性,确保消息处理惩罚的正确性和划一性。
kafka包管消息有序消耗

必要有序消耗的场景

一个典范的有序消耗场景是订单处理惩罚体系。比方:

  • 用户在电商平台下单,包罗创建订单、付出订单、取消订单等操纵。
  • 体系必须按照用户的操纵序次处理惩罚事故:订单先被创建,后被付出,大概再被取消。
  • 假如事故处理惩罚序次乱了,好比先处理惩罚“付出”再处理惩罚“创建”,业务逻辑将会堕落。
在这种场景下,必要包管事故按照其产生的序次被消耗和处理惩罚。

Kafka 怎样包管有序消耗

Kafka 的操持通过 分区(Partition)生产者-消耗者机制 来实现有序消耗,具体如下:
1. 分区内序次包管

Kafka 在单个分区(Partition)中包管消息的序次。消息是按写入序次(Append-only)存储在日志中,每条消息都有一个递增的偏移量(Offset)。消耗者从分区中读取消息时,Kafka按偏移量序次返回消息,因此消耗者读取到的消息序次与生产者写入的序次划一。
关键点:

  • 单个分区内的序次是严格包管的。
  • 差别分区之间的消息序次无法包管。
2. 生产者怎样指定分区

为了使用分区内的有序特性,生产者必要确保雷同范例的消息始终写入同一个分区。Kafka 提供两种机制来控制分区选择:

  • Key-based Partitioning: 生产者在发送消息时可以指定一个 Key,Kafka 会使用 Key 的哈希值决定消息所属的分区。
  • Custom Partitioning: 生产者可以实现自界说的分区战略,将消息路由到特定的分区。
比方,对于订单处理惩罚,可以使用订单 ID 作为消息的 Key,如许同一订单的全部事故会被写入同一个分区,从而包管序次。
3. 消耗者组消耗分区

Kafka 的消耗者组模子使得多个消耗者可以协作消耗消息:

  • 每个分区只能由一个消耗者实例消耗,确保同一分区的消息不会被多个消耗者并发处理惩罚,从而维护序次。
  • 假如消耗者实例增长或淘汰,Kafka 会重新分配分区到消耗者实例,但单个分区的序次仍然被维护。
4. 消息乱序的大概性与应对

在某些环境下,大概出现乱序标题,好比:

  • 一个分区包罗多个差别范例的消息,处理惩罚速率差别。
  • 消息写入差别分区。
办理办法:

  • 操持消息模子,确保同一逻辑处理惩罚单元的消息归属于一个分区。
  • 在消耗者端实现缓冲机制,将乱序的消息重新排序后再处理惩罚。

Kafka 的其他相干特性

幂等性生产者

Kafka 提供了幂等性生产者(Idempotent Producer),防止因重试导致的重复消息写入,从而进一步资助维护消息的序次。
事件

Kafka 支持事件,使得生产者可以包管一组消息的原子性写入。事件在分布式环境中包管了多分区的消息划一性,但不会跨分区维护消息序次。

总结

Kafka 可以或许通太过区内序次、Key-based 路由和消耗分配战略实现严格的有序消耗。要在实际场景中包管有序消耗,开发者必要:

  • 公道计分别区战略。
  • 使用 Key 将相干消息路由到同一分区。
  • 确保消耗者组的操持可以或许维护分区的独占性。
消耗者组的偏移量是怎么生存的

假设我们有一个 Kafka 主题 topic1,它有 6 个分区(partition 0 到 partition 5),而且有一个消耗者组 group1,这个消耗者组包罗 3 个消耗者(consumer1, consumer2, consumer3)。下面我通过一个例子来具体表明在这种环境下,Kafka 是怎样生存偏移量的。
1. 消耗者组和分配

Kafka 会将消耗者组 group1 内的消耗者分配到差别的分区上。假设 Kafka 接纳轮询或其他战略来平衡消耗者与分区之间的关系,那么在这个例子中,大概会出现以下分配环境:

  • consumer1 负责 partition 0 和 partition 1
  • consumer2 负责 partition 2 和 partition 3
  • consumer3 负责 partition 4 和 partition 5
这种分配确保每个分区都只有一个消耗者在消耗,克制了多个消耗者竞争消耗同一个分区的消息。
2. 消耗者消耗消息并更新偏移量

每个消耗者会从本身负责的分区中消耗消息,并跟踪它消耗的进度。Kafka 会通过消耗者组内的偏移量来纪录这些进度。下面我们假设每个分区中有 10 条消息(编号为 0 到 9),消耗者开始消耗消息。
consumer1(负责 partition 0 和 partition 1):


  • 假设 consumer1 已经消耗了 partition 0 中的前 4 条消息(0-3),并消耗了 partition 1 中的前 6 条消息(0-5)。
  • Kafka 会在内部的 __consumer_offsets 主题中为 consumer1 生存如下偏移量:

    • partition 0 的偏移量:4(表现 consumer1 已消耗至第 5 条消息)
    • partition 1 的偏移量:6(表现 consumer1 已消耗至第 7 条消息)

consumer2(负责 partition 2 和 partition 3):


  • 假设 consumer2 已消耗了 partition 2 中的前 3 条消息(0-2),并消耗了 partition 3 中的前 7 条消息(0-6)。
  • Kafka 会为 consumer2 生存以下偏移量:

    • partition 2 的偏移量:3
    • partition 3 的偏移量:7

consumer3(负责 partition 4 和 partition 5):


  • 假设 consumer3 已消耗了 partition 4 中的前 5 条消息(0-4),并消耗了 partition 5 中的前 8 条消息(0-7)。
  • Kafka 会为 consumer3 生存以下偏移量:

    • partition 4 的偏移量:5
    • partition 5 的偏移量:8

3. Kafka 怎样生存这些偏移量?

偏移量是生存在 Kafka 内部的 __consumer_offsets 主题中的。这个主题会纪录每个消耗者组、每个分区的偏移量信息。Kafka 会为每个消耗者组(比方 group1)的每个分区(比方 partition 0、partition 1 等)生存一条偏移量纪录。
因此,在上述的例子中,__consumer_offsets 主题中的数据大概是如许的:
Consumer GroupPartitionOffsetgroup1partition 04group1partition 16group1partition 23group1partition 37group1partition 45group1partition 58这意味着:

  • consumer1 在 partition 0 上的消耗进度是 4(即它已经消耗了 partition 0 中的前 4 条消息)。
  • consumer1 在 partition 1 上的消耗进度是 6(即它已经消耗了 partition 1 中的前 6 条消息)。
  • consumer2 在 partition 2 上的消耗进度是 3,依此类推。
4. 偏移量的更新与提交

偏移量的更新由消耗者来决定。Kafka 提供了两种偏移量提交方式:

  • 主动提交偏移量(Auto Commit)
    假如启用了主动提交,消耗者在消耗消息后会主动提交偏移量。通常,这个操纵会在肯定时间隔断后完成(好比每隔 5 秒)。
  • 手动提交偏移量(Manual Commit)
    假如启用了手动提交,消耗者可以显式地控制什么时间提交偏移量。比方,消耗者大概在处理惩罚完一批消息后才提交偏移量,大概在确认消息已精确处理惩罚之后才提交偏移量。
岂论哪种方式,偏移量终极会生存到 __consumer_offsets 主题中。每个消耗者组的偏移量是独立的,消耗者组间的消耗进度互不影响。
5. 重点总结


  • 偏移量是 消耗者组 维护的,而不是单个消耗者。
  • Kafka 为每个消耗者组纪录每个分区的偏移量,存储在 __consumer_offsets 主题中。
  • 每个消耗者组的偏移量更新是独立的,消耗者组之间的消耗进度互不干扰。
  • 偏移量是由消耗者控制和提交的,可以选择主动提交大概手动提交。
这个机制确保了 Kafka 中的消息消耗是高效且可扩展的,同时允许消耗者组独立地跟踪本身的消耗进度。
往broker写入数据的流程

Kafka 是一个高吞吐量的分布式消息队列体系,其数据写入和恒久化操持精良,包管了性能和可靠性。以下是 Kafka 写入消息到 Broker 时的具体过程,包罗恒久化和索引的原理及流程。

Kafka 写入数据的过程

1. 生产者发送消息

生产者将消息发送到 Kafka 的特定主题(Topic)。每个主题分为多个分区(Partition),生产者根据分区战略选择将消息写入哪一个分区。
2. Broker 吸收消息

每个分区由一个 Kafka Broker 管理。当生产者发送消息到 Broker 时,Broker 会:

  • 验证消息的合法性(比方主题是否存在)。
  • 吸收消息并将其写入分区的日志文件。
3. 日志存储

Kafka 的日志存储是分区的焦点,其构造方式如下:

  • 分段存储(Segmented Storage):

    • 每个分区的日志被分为多个固定巨细的段(Segment),每个段是一个日志文件。
    • 日志文件以追加方式(Append-only)写入,文件名是起始偏移量。

  • 追加写入(Write-Ahead Logging):

    • 消息按照写入序次追加到当前生动的日志段中。
    • 每条消息都包罗元数据,比方偏移量(Offset)、时间戳等。

4. 索引文件

Kafka 为每个分段创建索引文件,用于快速定位消息:

  • 时间索引(TimeIndex):纪录消息时间戳与偏移量的映射。
  • 偏移量索引(OffsetIndex):纪录偏移量与消息的物理位置(字节偏移)的映射。
这些索引文件被定期刷盘,存储在与日志文件雷同的目次中。

Kafka 的恒久化机制

1. 消息恒久化的机会

Kafka 使用 PageCache(操纵体系的文件体系缓存)来进步性能,并通过以下机制控制恒久化:

  • **及时写入:**消息起首写入文件体系缓存(内存)。
  • 刷盘机会(Flush):

    • 定时刷盘:根据设置参数 log.flush.interval.messages 或 log.flush.interval.ms,定期将数据从内存刷到磁盘。
    • 欺凌刷盘:当生产者设置 acks=all 时,全部副本完成写入后,Kafka 会欺凌刷盘。

2. 恒久化的方式

Kafka 将日志文件和索引文件恒久化到磁盘。它使用高效的 I/O 模子:

  • 使用序次写入来淘汰磁盘寻址开销。
  • 文件段和索引文件被存储在 Kafka Broker 的日志目次(log.dirs)中。
3. 恒久化的位置

Kafka 日志和索引文件的恒久化位置可以通过设置 log.dirs 参数指定,支持多路径存储来进步数据冗余和性能。

举例:写入数据的完备流程

场景:用户下单事故写入 Kafka


  • 生产者发送消息

    • 消息:{"orderId": 12345, "status": "created", "timestamp": 1697037600}
    • 主题:orders
    • 分区战略:使用订单 ID (12345) 的哈希值选择分区,比方分区 0。

  • Broker 吸收消息

    • Broker A 管理 orders 的分区 0。
    • 消息被追加到分区 0 当前生动段的日志文件 000000000000.log 中。

  • 索引文件更新

    • 偏移量为 42 的消息被写入日志文件。
    • 索引文件更新:

      • 偏移量索引纪录:偏移量 42 -> 物理位置(字节偏移量)。
      • 时间索引纪录:时间戳 1697037600 -> 偏移量 42。


  • 消息恒久化

    • 消息起首写入操纵体系缓存(PageCache)。
    • 当满足刷盘条件(比方日志段到达肯定巨细或超时)时,数据被刷到磁盘上的日志目次 /var/lib/kafka/logs/orders-0/。

  • 消息消耗者消耗

    • 消耗者从分区 0 的偏移量 42 开始拉取消息。
    • 消耗者通过偏移量索引定位消息在日志文件中的具体位置,从而快速读取消息。


总结

Kafka 的写入和恒久化机制通过高效的日志布局、索引文件和刷盘战略实现了高性能和可靠性。整个流程如下:

  • 消息写入分区日志文件并更新索引。
  • 使用 PageCache 进步性能,满足条件时刷盘。
  • 日志文件和索引文件被存储在指定的目次中,实现恒久化和快速定位。
这种操持使 Kafka 可以或许在包管可靠性的同时,提供极高的吞吐量,非常适当大规模及时数据流处理惩罚的场景。
消耗者读取文件的流程

Kafka 的索引文件和日志文件是细密对应的,索引文件的作用是快速定位日志文件中的消息,克制逐条遍历日志文件查找。以下是它们的对应关系和快速定位原理的具体分析。

日志文件和索引文件的关系

日志文件


  • 每个分区的日志由多个固定巨细的段(Segment)构成,每个段由一个日志文件和多个索引文件构成。
  • 日志文件存储实际的消息,文件名为段的起始偏移量,比方 00000000000000000000.log 表现该段的起始偏移量为 0。
索引文件


  • 偏移量索引文件(OffsetIndex):纪录逻辑偏移量与物理位置的映射,文件名雷同于 00000000000000000000.index。
  • 时间戳索引文件(TimeIndex):纪录时间戳与逻辑偏移量的映射,文件名雷同于 00000000000000000000.timeindex。
每对日志段和索引文件通过雷同的起始偏移量关联。比方:

  • 00000000000000000000.log 对应 00000000000000000000.index 和 00000000000000000000.timeindex。

快速定位消息的过程

Kafka 使用二分查找和序次读取的组合来快速定位消息。
查找步调


  • 确定日志段

    • 消耗者哀求读取偏移量(比方,偏移量为 42)的消息。
    • Kafka 根据段的起始偏移量范围(比方,[0, 100),[100, 200))快速确定偏移量所属的日志段。
    • 假如偏移量为 42,则定位到 00000000000000000000.log。

  • 通过偏移量索引快速定位物理位置

    • 打开对应的索引文件 00000000000000000000.index。
    • 在索引文件中通过二分查找找到目标偏移量(42)对应的物理位置。
    • 索引文件存储偏移量到日志文件物理位置的映射,比方:
      1. Offset: 40 -> Position: 1024
      2. Offset: 50 -> Position: 2048
      复制代码

      • 偏移量 42 在偏移量 40 和 50 之间,因此物理位置在 1024 ~ 2048 之间。


  • 序次读取日志文件

    • 根据索引文件提供的物理位置范围,Kafka 从日志文件的 1024 字节位置开始序次读取,直到找到偏移量 42 的消息。

时间戳查找(按时间定位)

假如消耗者哀求按照时间戳查找消息,Kafka 使用时间戳索引文件 00000000000000000000.timeindex:

  • 在时间戳索引文件中通过二分查找找到目标时间戳(比方 1697037600)对应的偏移量。
  • 按偏移量查找步调获取对应的日志位置。

举例分析

场景

分区 0 的日志文件和索引文件如下:

  • 00000000000000000000.log: 存储消息偏移量为 [0, 99]。
  • 00000000000000000000.index: 偏移量索引文件,部分内容如下:
    1. Offset: 0  -> Position: 0
    2. Offset: 50 -> Position: 1024
    3. Offset: 100 -> Position: 2048
    复制代码
消耗者哀求获取偏移量为 72 的消息。
查找流程


  • 确定日志段

    • 偏移量 72 位于 [0, 99] 范围内,因此使用 00000000000000000000.log。

  • 使用偏移量索引快速定位

    • 在 00000000000000000000.index 中二分查找:

      • 偏移量 72 在偏移量 50 和 100 之间。
      • 物理位置范围为 [1024, 2048)。


  • 读取日志文件

    • 从日志文件 00000000000000000000.log 的位置 1024 开始序次读取。
    • 跳过偏移量 50 到 71 的消息,找到偏移量为 72 的目标消息。


总结

Kafka 索引文件和日志文件通过段的起始偏移量关联,共同使用以下机制快速定位消息:

  • 根据哀求的偏移量或时间戳,通过段的范围快速定位日志段。
  • 使用偏移量索引文件举行二分查找,确定日志文件中的物理位置范围。
  • 团结序次读取,从日志文件中高效提取目标消息。
这种操持使得 Kafka 在包管高吞吐量的同时,仍能快速处理惩罚消息定位需求,非常适当大规模数据流的及时处理惩罚场景。
kafka幂等性

Kafka 的幂等性(Idempotence)旨在办理因网络故障或其他非常导致生产者重复发送消息的标题,确保无论生产者怎样重试,同一条消息只会被恒久化到 Kafka 一次。以下是 Kafka 幂等性的底层实现原理的具体讲授。

幂等性的关键机制

Kafka 的幂等性依赖以下几个焦点组件和机制:
1. Producer ID(PID)


  • 每个 Kafka 生产者在初始化时,Kafka 会为其分配一个唯一的 Producer ID
  • PID 是生产者的全局标识,用于区分差别的生产者实例。
2. 序列号(Sequence Number)


  • 每个生产者针对每个分区维持一个递增的 序列号
  • 序列号纪录了生产者发送到分区的每条消息的序次。
3. Log End Offset(LEO)


  • Kafka Broker 为每个分区维护一个 Log End Offset(LEO),表现该分区中当前最新消息的偏移量。
  • 团结序列号和 LEO,可以确保消息不会被重复写入。
4. 幂等性控制表(Producer State Table)


  • Kafka Broker 为每个分区维护一个 Producer State Table,纪录生产者和该分区的状态,包罗:

    • Producer ID: 生产者的唯一标识。
    • Last Sequence Number: 近来一次乐成写入该分区的序列号。


幂等性实现流程

生产者发送消息


  • 生产者为每条消息天生一个递增的序列号,并在消息中附带 PID 和序列号。
  • 消息发送到 Kafka Broker,目标为某个特定分区。
Broker 校验幂等性


  • 查找 Producer State Table:

    • Broker 查抄分区对应的 Producer State Table 是否有该生产者(通过 PID 标识)。
    • 假如 PID 存在,读取其最新序列号。
    • 假如 PID 不存在,则初始化状态纪录,并继续消息。

  • 校验序列号:

    • 假如消息的序列号即是 Last Sequence Number + 1,分析消息按序到达,Broker 吸收并写入。
    • 假如序列号小于或即是 Last Sequence Number,分析消息已被写入,Broker 忽略消息。
    • 假如序列号大于 Last Sequence Number + 1,分析存在消息丢失或乱序,Broker 抛堕落误。

更新 Producer State Table


  • 假如消息被乐成写入日志,Broker 更新 Producer State Table 中该生产者的最新序列号。
相应生产者


  • Broker 向生产者返回 ACK,确认消息写入乐成或被忽略。

示例:幂等性保障过程

场景


  • 生产者发送三条消息到分区 P0,序列号依次为 0、1、2。
  • 由于网络标题,生产者未收到消息序列号为 1 的 ACK,触发重试。
过程


  • 发送消息 0:

    • 序列号为 0,PID 为 12345。
    • Broker 的 Producer State Table 初始为空。
    • Broker 继续消息,更新 Last Sequence Number = 0。
    • 消息写入分区日志,返回 ACK。

  • 发送消息 1:

    • 序列号为 1,PID 为 12345。
    • Broker 查抄 Producer State Table,序列号精确。
    • 消息写入分区日志,更新 Last Sequence Number = 1。
    • 返回 ACK,但生产者未收到。

  • 重试消息 1:

    • 生产者再次发送序列号为 1 的消息。
    • Broker 查抄 Producer State Table,发现序列号即是 Last Sequence Number。
    • 消息已写入,Broker 忽略消息,返回 ACK。

  • 发送消息 2:

    • 序列号为 2,PID 为 12345。
    • Broker 查抄 Producer State Table,序列号精确。
    • 消息写入分区日志,更新 Last Sequence Number = 2。
    • 返回 ACK。


幂等性的限定和扩展

幂等性的限定


  • 单分区保障: 幂等性只包管生产者对每个分区的消息不重复,但不能跨分区。
  • 有限重试窗口: 由于 Broker 的 Producer State Table 存储有限,Kafka 幂等性无法无穷期跟踪汗青纪录。
事件的扩展

为了跨分区的原子性和划一性,Kafka 引入事件机制(Transaction),团结幂等性提供更强的保障:

  • 生产者在事件内发送多条消息,Kafka 确保这些消息要么全部写入,要么全部失败。
  • 事件依赖幂等性和事件调和器(Transaction Coordinator)共同实现。

总结

Kafka 的幂等性通过以下关键步调实现:

  • Producer ID 唯一标识生产者实例。
  • 序列号 确保生产者向分区发送的消息按序次到达。
  • Producer State Table 纪录生产者最新状态,校验消息的重复性和精确性。
这种操持使 Kafka 能在分布式体系中高效保障消息的不重复写入,同时通过事件机制进一步扩展幂等性的实用范围,为用户提供可靠的数据划一性保障。

Kafka 中 **leader 副本** 和 **follower 副本** 的同步机制

下面我会从根本原理、同步过程、关键概念、失败处理惩罚机制几个方面,带你全面明确 Kafka 副本同步的全过程:

🧠 一、Kafka 副本架构根本概念

Kafka 中每个 分区(partition) 都有:

  • 一个 Leader 副本:负责处理惩罚生产者写入、消耗者读取哀求
  • 多少个 Follower 副本:从 leader 同步数据,作为备份,一旦 leader 失效可更换它
这些副本可以分布在差别的 broker 上。

🔁 二、消息同步的过程(Leader → Follower)

同步机制是 Kafka 内部的 “pull 模式”,也就是:
Follower 主动去 Leader 拉取数据
同步流程如下:


  • 生产者写入消息 → Leader 副本写入日志
  • Follower 副本通过 ReplicaFetcherThread 线程向 leader 发起同步哀求
  • Leader 根据 Follower 的偏移量(offset)返回新消息数据(雷同消耗者 poll())
  • Follower 将数据写入当地磁盘,更新自身 offset
  • 乐成同步的 Follower 被参加 ISR 聚集(下文表明)
📌 注意:Follower 副本不会向客户端袒露数据,只是内部备份;消耗和写入只通过 Leader。

📦 三、什么是 ISR(In-Sync Replicas)

ISR 是指与 Leader 保持同步的副本聚集
特点:


  • 包罗 Leader 本身和全部延长不高出指定阈值的 Follower 副本
  • 默认延长容忍时间是 replica.lag.time.max.ms(好比 10s)
  • ISR 会影响消息确认机制、leader 推选等焦点流程
比方:
副本当前状态是否在 ISRBroker 1(Leader)最新✅Broker 2延长 1 秒✅Broker 3延长 12 秒❌(踢出 ISR)
✍️ 四、同步方式:Kafka 的 ACK 设置控制同步战略

生产者发送消息时,有一个关键参数:
  1. acks = all
复制代码
Kafka 支持三种消息确认战略:

acks 值寄义安全性延长是否依赖 ISR0不等候确认最差最低❌1Leader 写入乐成即确认一样平常低❌all/-1Leader + 全部 ISR 成员都写入乐成最强稍高✅
🚨 五、假如同步失败怎么办?

1. Follower 掉队太多(高出 replica.lag.time.max.ms)

→ 会被踢出 ISR
2. Leader 瓦解

→ Kafka 通过 Controller 推选新的 leader
→ 新 leader 必须在原来的 ISR 中选出(确保数据划一性)
3. 全部 Follower 都挂了?

→ 假如 min.insync.replicas 设置了,Kafka 会制止写入,包管“写不丢”

🔧 六、相干设置参数(焦点):

设置项分析replica.lag.time.max.msfollower 掉队多久会被踢出 ISRreplica.lag.max.messages允许 follower 掉队多少条消息min.insync.replicasISR 最小副本数,共同 acks=all 可确保高可用acks控制生产者发送时简直认品级(见上)
📌 七、图解:同步流程简化图
  1. Producer --> [Leader副本] <--pull-- [Follower副本1]
  2.                        ^
  3.                        |
  4.                      pull
  5.                        |
  6.                [Follower副本2]
复制代码

  • Follower 主动拉取
  • 拉乐成了 → 写入磁盘 → 向 Leader 陈诉 offset
  • Leader 纪录各 follower offset,更新 ISR

✅ 八、总结

点内容同步方式Follower pull 模式 向 Leader 拉取数据写入战略acks=all 可包管写入被 ISR 成员全部同步ISR保持同步的副本聚集,影响可靠性和 leader 推选选主限定只能从 ISR 中选 Leader,确保数据不丢控制参数replica.lag.time.max.ms、min.insync.replicas 等
kafka生产者消息写入战略

你这个标题非常深入,涉及 Kafka Producer 的关键机制,包罗主线程和 Sender 线程分工缓冲区的数据生命周期网络非常的应对机制,我会逐一具体拆解。

✅ 一、放入缓冲区是主线程做的吗?

是的,放入缓冲区是主线程做的,这个动作发生在你调用:
  1. kafkaTemplate.send(topic, key, value);
复制代码
背后本质上会调用:
  1. producer.send(ProducerRecord)
复制代码
具体流程如下:
步调线程分析序列化 Key 和 Value主线程调用 Serializer.serialize()选择分区(Partitioner)主线程Hash 或轮询方式决定分区将消息放入 RecordAccumulator主线程找到对应的 Deque<RecordBatch>,放入内存缓冲区返回一个 Future主线程发送效果通过 Future 异步关照
📦 二、Sender 发送后缓冲区里的消息会立马删除吗?

不是立即删除,而是确认乐成发送后才会从缓冲区中移除
流程如下:

  • Sender 线程从缓冲区中读取可发送的批次(满足时间或巨细条件)
  • 将这个 batch 打包发送到对应 broker
  • 等候 broker 返回 ACK
  • 假如收到乐成的 ACK:

    • 将这个 batch 从 RecordAccumulator 中移除
    • 关照回调乐成

  • 假如发送失败:

    • 将 batch 留在缓冲区中,等候下次重试

✅ Kafka 是 ACK 机制驱动的:只有乐成确认,才移除数据。

❌ 三、假如 Sender 网络非常发送失败,会发生什么?

Kafka Producer 有 重试机制
设置项默认值分析retries0最大重试次数,发起设置如 3 或 Integer.MAX_VALUEretry.backoff.ms100每次重试之间的隔断max.in.flight.requests.per.connection5控制是否大概出现乱序(注意序次性标题)⚠️ 注意:

假如你开启了 acks=all 且重试次数 > 0,就可以较好克制数据丢失。但也有如下环境要注意:

💥 四、失败场景 + 原理分析 + 办理方案

失败场景缘故原由Kafka 举动办理发起Broker 暂时不可达网络非常保存消息在内存中,等候重试增大 retries、buffer.memory、request.timeout.ms发送失败高出重试次数网络连续失败抛出非常、回调失败在回调中纪录日志/报警,团结业务处理惩罚缓冲区满(buffer.memory)消息堆积主线程壅闭(默认),或抛出非常设置符合的 max.block.ms,优化吞吐ack 等候超时broker 无相应触发超时、重试调解 request.timeout.ms、linger.ms 以顺应网络环境
🔁 五、保举的鲁棒性设置
  1. spring:
  2.   kafka:
  3.     producer:
  4.       acks: all
  5.       retries: 5
  6.       retry-backoff-ms: 200
  7.       max-in-flight-requests-per-connection: 1 # 保证顺序发送
  8.       buffer-memory: 33554432
  9.       max-block-ms: 60000  # 如果缓冲区满,最多阻塞 60s
  10.       request-timeout-ms: 30000
复制代码

  • 设置 acks=all:确保消息被 leader 和 ISR 都写入才确认
  • 设置公道的 retries 和 backoff
  • 设置 max-in-flight=1 包管序次(关键业务场景)

🧪 六、真实生产案例举例

某付出体系使用 Kafka 发送用户扣款日志,假如发送失败,体系通过如下方式包管消息终极送达:

  • Producer 开启幂等性(enable.idempotence=true)+ acks=all + retries
  • 回调中监控监控失败日志,并恒久化失败事故
  • 另起线程读取失败日志做赔偿重发
  • 接纳死信机制,将无法重发的事故发到 dead-letter-topic

✅ 总结关键点:

标题答案放入缓冲区是主线程做的吗?是Sender 发完消息缓冲区立即扫除吗?不是,只有收到 ACK 才扫除网络非常发送失败怎么办?Kafka 会主动重试,超出次数会回调失败怎样包管发送可靠性?开启幂等性、设置重试、监控监控失败回调、容灾重发

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表