Kafka系列教程 - Kafka 生产者 -2

打印 上一主题 下一主题

主题 946|帖子 946|积分 2838

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1. 生产者简介

不管是把 Kafka 作为消息队列系统、还是数据存储平台,总是须要一个可以向 Kafka 写入数据的生产者和一个可以从 Kafka 读取数据的消费者,大概是一个兼具两种脚色的应用步伐。
使用 Kafka 的场景很多,诉求也各有不同,主要有:是否答应丢失消息?是否接受重复消息?是否有严格的延迟和吞吐量要求?
不同的场景对于 Kafka 生产者 API 的使用和配置会有直接的影响。
1.1. 生产者传输实体

Kafka Producer 发送的数据对象叫做 ProducerRecord ,它有 4 个关键参数:


  • Topic - 主题
  • Partition - 分区(非必填)
  • timestamp: 消息的时间戳。(非必填)
  • Key - 键(非必填)
  • Value - 值
  • headers: 消息头(键值对列表)。(非必填)
核心字段


  • Topic:

    • Kafka 的主题,用于分类消息。
    • 范例: String
    • 示例: "my-topic"

  • Partition:

    • 消息的目标分区。
    • 范例: Integer(可为 null,表示让分区器选择分区)。
    • 示例: 0(指定分区)或 null(主动分配)。

  • Key:

    • 消息的键,通常用于确定分区。
    • 范例: 泛型 K,如 String。
    • 示例: "user123"

  • Value:

    • 消息的内容。
    • 范例: 泛型 V,如 String 或 byte[]。
    • 示例: "Hello, Kafka!"

  • Timestamp:

    • 消息的时间戳,用于记录消息的生成时间。
    • 范例: Long(毫秒时间戳,可为 null)。
    • 示例: System.currentTimeMillis()

  • Headers:

    • 可选的消息头,包罗额外的元数据。
    • 范例: Iterable<Header>,键值对情势。
    • 示例: new RecordHeaders().add("header-key", "header-value".getBytes())


注意事项


  • 序列化:

    • 键和值必须可序列化。
    • 在创建 KafkaProducer 时,须要配置键和值的序列化器(如 StringSerializer 或自界说序列化器)。

  • 时间戳:

    • 默认使用 Kafka 服务器时间戳(日志时间)。
    • 假如提供时间戳,Kafka 会以该时间戳为准。

  • 消息头:

    • 消息头常用于携带附加信息,如跟踪 ID、元数据等。

1.2. 生产者发送流程

Kafka 生产者发送消息流程:
   ( 0 ) 构建 ProducerRecord
生产者构建一条消息,封装成一个 ProducerRecord 对象,包括以下主要内容:


  • 主题(Topic)
  • 分区(Partition,可选)
  • 消息键(Key,可选)
  • 消息值(Value)
  • 时间戳(Timestamp,可选)
  • 消息头(Headers,可选)
(1)序列化 - 发送前,生产者要先把键和值序列化。生产者通过配置的 键序列化器值序列化器 完成该步调。


  • 常用序列化器:StringSerializer、ByteArraySerializer 或自界说序列化器。
(2)分区 - 数据被传给分区器。假如在 ProducerRecord 中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord 的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。
使用以下逻辑确定消息的目标分区:

  • 直接指定分区: 假如 ProducerRecord 指定了分区,消息将直接发送到该分区。
  • 通过键计算分区:

    • 假如未指定分区但提供了键,Kafka 的默认分区器(DefaultPartitioner)会根据键的哈希值和主题的分区数计算分区。

  • 随机选择分区: 假如既未指定分区也未提供键,Kafka 将随机选择一个分区。
(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。


  • 批次,就是一组消息,这些消息属于同一个主题和分区
  • 发送时,会把消息分成批次传输,假如每次只发送一个消息,会占用大量的网路开销。


  • Kafka 生产者使用一个内部的缓冲区(RecordAccumulator)来批量存储即将发送的消息。
  • 同一主题和分区的消息会被聚合成批次(Batch)。

    • 批量发送可以显著提高网络传输效率和吞吐量。

  (4)消息压缩


  • 假如启用了消息压缩(通过配置 compression.type),Kafka 会在批次级别对消息举行压缩。

    • 支持的压缩算法:gzip、snappy、lz4、zstd。

  • 压缩后的消息占用更少的网络带宽和存储空间。
   (5) 发送消息到 Kafka Broker


  • 消息被发送到 Kafka 集群的对应 Broker 节点。
  • Kafka 生产者使用异步 IO,通过 Selector 组件管理网络连接,并将消息发送到 Broker。
  • Broker 收到消息后,将其存储到对应分区的日志中。
   (6) Broker 确认(ACK)处理
生产者等待 Broker 简直认(ACK),根据配置的 acks 参数决定消息确认的级别:

  • acks=0: 不等待确认,最快,但可能导致消息丢失。
  • acks=1: 等待分区的 Leader 确认,可能导致少量数据丢失。
  • acks=all(或 -1): 等待所有副本节点确认,最高可靠性。
  (7) 重试机制


  • 假如消息发送失败(如网络超时或 Leader 不可用),生产者会根据 retries 参数配置重试次数。
  • 重试间隔由 retry.backoff.ms 控制。
  (8) 回调(Callback)机制


  • 假如生产者在发送消息时指定了回调函数,会在消息发送乐成或失败后触发。
  • 回调函数可用于记录日志、处理非常或更新统计数据。
  (9) 非常处理


  • 假如消息发送失败且重试耗尽,Kafka 生产者会抛出非常,开发者须要在代码中捕捉并处理这些非常。
  • 常见非常:

    • TimeoutException: 消息发送超时。
    • SerializationException: 键或值的序列化失败。
    • RetriableException: 临时性错误,可重试。


生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?
Kafka 生产者在发送消息时,须要确定消息将发送到哪个 Broker 和哪个分区。这个过程依赖于主题的元数据信息和分区选择逻辑。以下是具体的流程:

1. 获取主题的元数据信息



  • 元数据包罗的信息

    • 主题的分区数量。
    • 每个分区的 Leader Broker(负责吸收写入哀求)。
    • 每个分区的副本信息。

  • 元数据泉源

    • 当生产者启动时,会向 Kafka 集群的恣意一个 Broker 哀求元数据信息。
    • 元数据会定期更新(由配置 metadata.max.age.ms 决定),或在发生非常(如 Leader 变更)时重新获取。


2. 分区选择逻辑

Kafka 使用分区选择器(默认是 DefaultPartitioner)来决定消息发送到哪个分区。其逻辑如下:
(1)假如指定了分区



  • 直接发送到指定的分区,无需计算。
  • new ProducerRecord<>("my-topic", 0, "key", "value");
    上述代码将消息直接发送到 my-topic 的分区 0。
(2)假如未指定分区但提供了键



  • 使用键的哈希值来计算目标分区: partition = hash(key) % partitionCount

    • hash 函数:Kafka 默认使用 Java 的 hashCode 方法。
    • 这种方式确保同一个键的消息始终发送到同一个分区,方便有序消费。

(3)假如既未指定分区也未提供键



  • 生产者会随机选择一个分区,确保消息负载均衡:

    • 通过循环的方式从可用分区中依次选择(轮询算法)。


3. 确定目标 Broker



  • 每个分区都有一个 Leader Broker 负责处理写入哀求。
  • 生产者根据主题元数据确定目标分区的 Leader Broker,直接与其创建连接并发送消息。

4. 处理分区不可用环境



  • 假如目标分区的 Leader Broker 不可用:

    • 生产者会尝试刷新元数据以获取新的分区 Leader 信息。
    • 假如刷新失败或凌驾配置的重试次数(retries),消息发送失败。

流程总结

1. 获取主题元数据(包括分区及其 Leader Broker)。
2. 使用分区选择逻辑确定目标分区: - 指定分区 → 直接使用。 - 提供键 → 哈希计算。 - 无键无分区 → 轮询分区。
3. 确定目标分区的 Leader Broker。
4. 向 Leader Broker 发送消息。
5. 假如发送失败,尝试重试或刷新元数据。




生产者配置相关参数

参数名称作用默认值metadata.max.age.ms元数据缓存时间,凌驾该时间会刷新元数据300000(5分钟)partitioner.class自界说分区器类,用于分区选择逻辑默认分区器retries发送失败时的重试次数0(不重试)retry.backoff.ms重试前的等待时间100max.in.flight.requests最大答应的未确认哀求数5 性能优化建议


  • 批量发送: 增大 batch.size 和 linger.ms 提高吞吐量。
  • 压缩: 开启合适的消息压缩(compression.type)。
  • 异步发送: 使用回调处理结果,避免壅闭主线程。
  • 分区策略: 根据业务需求优化分区分配。
2. 生产者 API

Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步调有 4 步。

  • 构造生产者对象所需的参数对象。
  • 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
  • 使用 KafkaProducer 的 send 方法发送消息。
  • 调用 KafkaProducer 的 close 方法关闭生产者并开释各种系统资源。
2.1. 创建生产者

Kafka 生产者核心配置:
1. 基础配置
配置项作用默认值bootstrap.serversKafka 集群的地址列表,格式为 host1:port,host2:port必须配置key.serializer键(Key)的序列化器,负责将键序列化为字节数组必须配置value.serializer值(Value)的序列化器,负责将值序列化为字节数组必须配置
2. 消息确认和可靠性配置

配置项作用默认值acks确认机制:1- 0: 不等待 Broker 确认,消息可能丢失,但性能高。- 1: 仅等待 Leader 确认,性能和可靠性折中。- all(或 -1): 等待所有副本确认,最高可靠性。retries消息发送失败时的重试次数。0retry.backoff.ms每次重试的时间间隔(毫秒)。100enable.idempotence是否启用幂等性,确保消息不重复发送(须要 acks=all 和 max.in.flight.requests.per.connection <= 5)。false
3. 性能优化配置

配置项作用默认值batch.size每个分区的批次巨细(字节),消息会被聚合成批次发送以提高吞吐量。16384(16 KB)linger.ms生产者等待更多消息加入批次的时间,增长延迟可以提升批量发送效率。0compression.type消息压缩算法,可选值:none、gzip、snappy、lz4、zstd。nonebuffer.memory生产者内存缓冲区的巨细(字节),用于暂存消息。33554432(32 MB)max.in.flight.requests.per.connection单个连接上答应未确认的哀求数量。降低此值可避免乱序(特殊是在启用幂等性的环境下)。5
4. 分区和消息路由配置

配置项作用默认值partitioner.class自界说分区器类名,控制消息发送到哪个分区。默认分区器max.block.ms生产者在缓冲区满或元数据不可用时壅闭的最大时间(毫秒)。60000(1分钟)
5. 超时和网络配置

配置项作用默认值request.timeout.ms等待 Broker 响应的超时时间(毫秒)。30000(30秒)delivery.timeout.ms整个消息发送的超时时间,包括重试时间。120000(2分钟)connections.max.idle.ms生产者连接空闲多久后关闭(毫秒)。540000(9分钟)socket.send.buffer.bytes生产者发送套接字缓冲区的巨细。131072(128 KB)socket.receive.buffer.bytes生产者吸收套接字缓冲区的巨细。32768(32 KB)
6. 幂等性和事务配置

配置项作用默认值enable.idempotence启用幂等性,防止消息重复(须要与 acks=all 配合)。falsetransactional.id设置事务 ID,启用事务性生产者,支持精确一次(EOS)。无transaction.timeout.ms事务的超时时间(毫秒)。60000
示例配置

基础配置

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
高可靠性配置

  1. props.put("acks", "all");
  2. props.put("retries", 3);
  3. props.put("enable.idempotence", true);
  4. props.put("max.in.flight.requests.per.connection", 5);
复制代码
高性能配置

props.put("batch.size", 32768); // 增大批次巨细
props.put("linger.ms", 10); // 等待更多消息
props.put("compression.type", "gzip"); // 开启压缩
事务性生产者配置

props.put("transactional.id", "my-transactional-id");

常用场景的配置保举

场景保举配置高吞吐batch.size=32768、linger.ms=10、compression.type=gzip高可靠性acks=all、retries=3、enable.idempotence=true、max.in.flight.requests.per.connection=5事务支持enable.idempotence=true、transactional.id=my-transactional-id 通过调整这些核心配置,Kafka 生产者可以根据不同的应用场景在性能、可靠性和复杂性之间举行权衡。 


2.2. 异步发送

直接发送消息,不关心消息是否到达。
这种方式吞吐量最高,但有小概率会丢失消息。
【示例】异步发送
  1. producer.send(new ProducerRecord<>("HelloWorld", "key", msg), new Callback() {
  2.                 @Override
  3.     public void onCompletion(RecordMetadata metadata, Exception exception) {
  4.         if (exception != null) {
  5.             exception.printStackTrace();
  6.         } else {
  7.             System.out.println("Sent out :" + msg);
  8.         }
  9.     }
  10. });
复制代码
2.3. 同步发送

返回一个 Future 对象,调用 get() 方法,会不停壅闭等待 Broker 返回结果。
这是一种可靠传输方式,但吞吐量最差。
【示例】同步发送
  1. ProducerRecord<String, String> record =
  2. new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
  3. try {
  4.     producer.send(record).get();
  5. } catch (Exception e) {
  6.     e.printStackTrace();
  7. }
复制代码
2.4. 异步响应发送

代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如:抛出非常、记录错误日志。
这是一个折中的方案,即兼顾吞吐量,也保证消息不丢失。
【示例】异步响应发送
起首,界说一个 callback:
  1. private class DemoProducerCallback implements Callback {
  2.       @Override
  3.         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  4.            if (e != null) {
  5.                e.printStackTrace();
  6.              }
  7.         }
  8. }
复制代码
然后,使用这个 callback:
  1. ProducerRecord<String, String> record =
  2.             new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
  3. producer.send(record, new DemoProducerCallback());
复制代码
2.5. 关闭连接

调用 producer.close() 方法可以关闭 Kafka 生产者连接。
  1. Producer<String, String> producer = new KafkaProducer<>(properties);
  2. try {
  3.    producer.send(new ProducerRecord<>(topic, msg));
  4. } catch (Exception e) {
  5.     e.printStackTrace();
  6. } finally {
  7.     // 关闭连接
  8.     producer.close();
  9. }
复制代码
3. 生产者的连接

Apache Kafka 的所有通讯都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通讯都是云云。
选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用哀求以及同时轮询多个连接的能力。
3.1. 何时创建 TCP 连接

Kafka 生产者创建连接有三个时机:
(1)在创建 KafkaProducer 实例时,生产者应用会在背景创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,起首会创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
(2)当 Producer 更新集群的元数据信息之后,假如发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。


  • 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 哀求给 Kafka 集群,去尝试获取最新的元数据信息。
  • 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变革,Producer 每 5 分钟都会逼迫刷新一次元数据以保证它是最实时的数据。
(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。
3.2. 何时关闭 TCP 连接

Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 主动关闭
主动关闭是指调用 producer.close() 方法来关闭生产者连接;乃至包括用户调用 kill -9 主动“杀掉”Producer 应用。
假如设置 Producer 端 connections.max.idle.ms 参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms 指定时间内,假如没有任何哀求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。假如设置该参数为 -1,TCP 连接将成为永世长连接。
值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但实在这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的结果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被停止。
4. 序列化

Kafka 内置了常用 Java 基础范例的序列化器,如:StringSerializer、IntegerSerializer、DoubleSerializer 等。
但假如要传输较为复杂的对象,保举使用序列化性能更高的工具,如:Avro、Thrift、Protobuf 等。
使用方式是通过实现 org.apache.kafka.common.serialization.Serializer 接口来引入自界说的序列化器。
Kafka 内置的序列化器

以下是 Kafka 内置的序列化器及其对应的类:
数据范例序列化器类名说明字符串(String)org.apache.kafka.common.serialization.StringSerializer将字符串编码为 UTF-8 字节数组。字节数组(Bytes)org.apache.kafka.common.serialization.ByteArraySerializer直接发送字节数组,无需额外转换。整数(Integer)org.apache.kafka.common.serialization.IntegerSerializer将整数编码为 4 字节的字节数组(大端序)。长整型(Long)org.apache.kafka.common.serialization.LongSerializer将长整型编码为 8 字节的字节数组(大端序)。双精度(Double)org.apache.kafka.common.serialization.DoubleSerializer将双精度浮点数编码为 8 字节字节数组。空值(Null)无需序列化类(支持键或值为 null)。不发送任何字节。
序列化器的配置

在 Kafka 生产者中,序列化器通过以下配置指定:


  • Key 序列化器:key.serializer
  • Value 序列化器:value.serializer
示例配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
上述配置中:


  • 键和值都被序列化为 UTF-8 格式的字符串。

序列化器的使用场景


  • 字符串序列化

    • 常用于键或值是文本数据的场景。
    • 例如,发送 JSON 格式的消息时,值通常会序列化为字符串。

  • 字节数组序列化

    • 假如数据已经是二进制格式(如图像、文件等),直接使用 ByteArraySerializer 发送。
    • 避免重复序列化,提高性能。

  • 整数或长整型序列化

    • 得当键是简朴数字的场景(如用户 ID、订单编号)。
    • 在分区计算时,整数键的哈希值更稳固。

  • 双精度序列化

    • 适用于传递科学计算或金融范畴的精确浮点数值。


自界说序列化器

假如内置的序列化器无法满意需求(例如,自界说对象),可以实现 Kafka 提供的序列化接口:org.apache.kafka.common.serialization.Serializer。
自界说序列化器实现:

  1. import org.apache.kafka.common.serialization.Serializer;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. public class CustomObjectSerializer<T> implements Serializer<T> {
  4.     private final ObjectMapper objectMapper = new ObjectMapper();
  5.     @Override
  6.     public byte[] serialize(String topic, T data) {
  7.         try {
  8.             return objectMapper.writeValueAsBytes(data); // 使用 Jackson 序列化对象
  9.         } catch (Exception e) {
  10.             throw new RuntimeException("Failed to serialize object", e);
  11.         }
  12.     }
  13. }
复制代码

使用自界说序列化器:

props.put("value.serializer", "com.example.CustomObjectSerializer");

总结

Kafka 提供的内置序列化器已经涵盖了大多数常见的数据范例:


  • 文本数据保举使用 StringSerializer。
  • 二进制数据直接使用 ByteArraySerializer。
  • 数值范例使用 IntegerSerializer 或 LongSerializer。
对于复杂的对象或自界说需求,可以自行实现序列化逻辑。选择合适的序列化器是确保消息高效传输和处理的关键。
5. 分区

5.1. 什么是分区

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。
在 Kafka 中,恣意一个 Topic 维护了一组 Partition 日志,如下所示:

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。 
5.2. 为什么要分区

为什么 Kafka 的数据结构采用三级结构?
分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而举行的,这样每个节点的机器都能独立地执行各自分区的读写哀求处理。并且,我们还可以通过添加新的机器节点来增长团体系统的吞吐量。

以下是 Kafka 使用分区的几个主要缘故原由:
1. 提高并发性和吞吐量



  • 负载均衡:Kafka 将主题拆分为多个分区,使得生产者和消费者可以并行操作不同的分区。每个分区可以独立地吸收和处理消息,因此可以在集群中分布负载,提升团体吞吐量。
  • 并行写入和读取:生产者可以同时将消息写入不同的分区,消费者也可以并行地从多个分区读取数据,这样可以有效地提高系统的吞吐量。
2. 程度扩展性



  • 扩展集群:Kafka 的分区机制使得它能够方便地横向扩展。当数据量增大时,可以通过增长更多的分区和 Broker 来处理更多的负载。每个分区可以分配给不同的 Broker,从而实现数据的分布式存储和处理。
  • 分布式存储:每个分区的数据可以分布在集群的多个 Broker 上,不同 Broker 负责不同的分区。随着集群扩展,分区数量可以增长,进而提升集群的容量和性能。
3. 容错性和高可用性



  • 副本机制:Kafka 通过将分区数据复制到多个 Broker 来实现高可用性。每个分区有一个 Leader 和多个 Follower 副本,Leader 负责处理读写哀求,Follower 副本则负责数据同步。在发生故障时,副本可以作为备份,确保数据的持久性和高可用性。
  • 分区级别的容错:即使某个 Broker 出现故障,只要该 Broker 上的分区有副本存在,数据依然可以在集群中的其他 Broker 上访问。Kafka 会主动举行副本的切换,保证消息不丢失。
4. 分区带来的顺序性



  • 保证分区内顺序性:Kafka 保证同一分区内的消息是有顺序的。生产者向分区写入的消息会按照发送顺序存储,消费者读取分区中的消息时也会按顺序消费。这对于须要保证数据顺序的应用(例如,事件日志、事务处理等)非常紧张。
  • 跨分区无顺序性保证:Kafka 不保证跨分区的消息顺序,因此,假如对全局顺序有严格要求,须要通过键(key)来确保同一键的消息发送到同一分区,从而保持顺序。
5. 机动的分区策略



  • 分区键控制数据分布:Kafka 通过分区键(通常是消息的 key)来决定消息发送到哪个分区。例如,同一范例的数据或属于同一用户的数据可以通过相同的键发送到同一个分区,从而保证这些消息的顺序性。
  • 分区的数量可配置:Kafka 答应在创建主题时指定分区的数量。根据业务需求,可以机动地调整分区数,确保系统在不同负载下的性能需求。
6. 实现更高效的消费



  • 并行消费:分区使得 Kafka 支持多个消费者并行消费消息。当消费者组中的多个消费者分配到不同的分区时,每个消费者可以独立地消费各自分配的分区,进一步提高消费的效率。
  • 消费者组:Kafka 支持消费者组(consumer groups)来实现多历程或多线程消费。每个消费者组中的每个消费者负责消费不同的分区,保证同一分区的消息只被一个消费者消费。

分区的工作原理


  • 生产者将消息发送到分区

    • 生产者根据配置或分区键决定将消息发送到哪个分区。
    • 假如消息没有指定分区,则生产者可以选择使用轮询或其他策略选择目标分区。
    • 分区选择通常通过生产者的分区器(Partitioner)来完成。

  • Broker 存储消息

    • 每个分区都由一个或多个 Broker 负责存储。每个分区在 Kafka 集群中的一个 Broker 上有一个主副本(Leader),其他副本(Follower)存储相同的数据。
    • 消息写入时由 Leader 负责,Follower 副本举行同步。

  • 消费者消费消息

    • 消费者根据消费组和分区分配策略,从不同分区读取消息。
    • Kafka 保证每个消费者组内每个分区只有一个消费者举行消费,避免重复消费。


总结

Kafka 分区机制的核心上风包括:


  • 提供高并发的消息生产和消费能力,提升系统吞吐量。
  • 支持程度扩展,能够处理大规模数据。
  • 提供数据副本和故障规复机制,确保数据可靠性。
  • 保证分区内消息顺序,但不保证跨分区的顺序。
因此,分区是 Kafka 实现高可用、高性能和高扩展性的关键因素。

5.3. 分区策略

1. 默认分区策略(轮询)

假如生产者没有显式指定分区,Kafka 默认使用 轮询(Round-robin)策略将消息均匀地分配到可用的分区。即使生产者发送的消息没有指定 key,Kafka 也会按顺序将消息轮流写入各个分区。


  • 场景:当没有特殊的需求来保证消息顺序时,轮询策略能够确保消息负载均衡,得当须要高吞吐量的场景。
示例

ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "message value");
// 没有指定分区,Kafka 使用默认的轮询分区策略
2. 基于键的分区策略

生产者可以使用 消息的 Key 来决定将消息发送到哪个分区。Kafka 默认使用该键的哈希值来举行分区计算。具体来说,Kafka 会计算 key.hashCode(),然后使用这个哈希值除以分区数,得到一个目标分区的编号。


  • 场景:这种策略适用于须要保证相同 key 的消息顺序的场景。比如,同一用户的所有操作须要保证顺序,大概所有有关某个会话的消息必须按顺序消费。
计算公式

partition = key.hashCode() % numberOfPartitions


  • 优点

    • 保证同一 key 的消息始终发送到同一个分区,保持顺序。
    • 分区分配更加稳固,避免了不须要的负载均衡和数据重新分布。

示例

ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "user123", "message value");
// 这里的 "user123" 是 key,Kafka 会根据其哈希值选择目标分区
3. 自界说分区策略

Kafka 答应生产者自界说分区器。通过实现 org.apache.kafka.clients.producer.Partitioner 接口,可以根据业务需求来决定消息的分区。自界说分区器可以根据更复杂的规则来确定目标分区,如按时间、消息内容等自界说逻辑举行分区。


  • 场景:假如内置的分区策略不能满意需求(比如须要根据业务逻辑或消息的内容来决定分区),则可以实现自界说分区策略。
自界说分区器示例

  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.serialization.Serializer;
  3. import java.util.Map;
  4. public class CustomPartitioner implements Partitioner {
  5.     @Override
  6.     public void configure(Map<String, ?> configs) {
  7.         // 可选:初始化分区器,获取配置等
  8.     }
  9.     @Override
  10.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  11.         // 这里可以根据自定义逻辑来决定分区
  12.         if (key instanceof String && ((String) key).equals("special_key")) {
  13.             return 0; // 将 "special_key" 的消息定向到分区 0
  14.         }
  15.         // 默认情况下按照 key 的 hash 计算分区
  16.         return Math.abs(keyBytes.hashCode()) % cluster.partitionCountForTopic(topic);
  17.     }
  18.     @Override
  19.     public void close() {
  20.         // 可选:关闭资源
  21.     }
  22. }
复制代码
在生产者配置中,使用自界说分区器:
props.put("partitioner.class", "com.example.CustomPartitioner");
4. 选择分区的策略(Producer API)

Kafka 的生产者客户端通过以下几种方式来选择分区:


  • 未指定键时的分区选择

    • 当生产者没有提供消息的 key,Kafka 默认使用 轮询 策略(Round-robin),按顺序选择一个分区来发送消息。

  • 指定键时的分区选择

    • 当生产者提供了消息的 key 时,Kafka 会使用哈希算法根据该 key 来选择分区,这保证了相同 key 的消息发送到同一个分区。

5. 分区分配(Consumer Group)

消费者组(Consumer Group)中的每个消费者被分配到一个或多个分区来举行并行消费,确保每个分区在消费者组内只有一个消费者。Kafka 的分区分配是动态的,分区可以在消费者数量变革时重新分配。


  • 默认分配策略

    • Range 分配器:将分区按顺序分配给消费者,每个消费者尽可能地均匀分配到多个连续的分区。
    • Round-robin 分配器:按消费者的数量轮流分配分区,使得每个消费者分配到相对均等的分区数。

消费者在启动时可以配置分配策略:
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
总结:Kafka 分区策略


  • 轮询(Round-robin)策略

    • 默认策略,适用于负载均衡但不要求消息顺序的环境。

  • 键控制(Hashing)策略

    • 适用于须要保证相同 key 的消息发送到同一个分区,并保持顺序的场景。

  • 自界说分区器

    • 当内置策略无法满意需求时,可以通过实现 Partitioner 接口来界说更复杂的分区策略。

  • 消费者分区分配策略

    • 消费者通过分区分配策略决定如何从多个分区中分配任务,支持并行消费。

通过合理选择和定制分区策略,Kafka 可以高效地处理大量数据,保持高吞吐量、低延迟,并能在不同的消费场景下提供机动的数据分发。
6. 压缩

6.1. Kafka 的消息格式

Kafka 的消息格式是高度优化和简化的,以便在分布式环境中高效传输、存储和检索。Kafka 的消息结构由几个关键的元素构成,包括消息头、消息体以及相关的元数据。了解 Kafka 的消息格式对于开发者在处理消息时非常紧张,特殊是在对消息的生产、消费以及存储管理等方面。
Kafka 消息的核心结构

Kafka 的消息由以下几个部门构成:

  • 消息头(Message Header)
  • 消息体(Message Body)
  • 消息元数据(Metadata)
1. 消息结构概览

Kafka 中的每条消息是由以下几个主要部门构成:


  • Key(可选):消息的键,用于分区。用于确定消息写入哪个分区。
  • Value(必填):消息的实际内容或负载,生产者发送的数据。
  • Timestamp:消息的时间戳,表示消息生产的时间。
  • Headers(可选):附加的键值对,答应开发者携带自界说的元数据。
  • Message Metadata:包罗分区和偏移量等其他紧张信息。
2. Kafka 消息格式的具体说明

a. Key 和 Value



  • Key:消息的键,通常用于控制消息的分区。当生产者向 Kafka 发送消息时,假如指定了 key,Kafka 会基于 key 的哈希值来确定该消息属于哪个分区。假如没有指定 key,Kafka 会使用默认的分区策略(例如轮询)来决定消息的分区。
  • Value:消息的实际内容,即生产者发送的消息数据。通常是字节数组,可以是任何范例的数据,如文本、JSON、二进制数据等。
b. 消息体(Payload)

Kafka 的每条消息都是字节数组(byte array)。key 和 value 都是字节数组情势,因此消息体的数据可以是恣意格式,取决于应用步伐的需求。Kafka 本身并不对消息内容做任何限制或约束,数据格式完全由生产者和消费者的应用步伐决定。
c. Timestamp

每条消息在 Kafka 中都会包罗一个时间戳,通常是消息生产的时间。时间戳有两种范例:


  • Create Time:消息创建的时间戳,表示消息被生产者发送的时间。这个时间戳通常由生产者在发送消息时设置。
  • Log Append Time:消息被 Kafka 服务器写入日志的时间戳。这个时间戳由 Kafka 集群在写入时主动生成。
Kafka 可以根据配置来使用这两种时间戳。
d. Headers

Kafka 消息可以包罗恣意数量的键值对情势的 Headers,这是 Kafka 2.0 引入的新特性。消息头可以用于携带元数据,开发者可以自由界说这些头信息。这些头信息在 Kafka 消息中是可选的。


  • 用途:Headers 可以用来传递一些额外的元数据,例如跟踪标识符、消息范例等,而不必将它们包罗在 key 或 value 中。
e. 消息元数据(Metadata)



  • Partition:Kafka 中的每条消息都包罗目标分区的编号。在消息被生产者发送时,Kafka 会根据分区策略确定目标分区,消息被写入该分区。
  • Offset:每条消息都有一个唯一的 偏移量,这是 Kafka 中每个分区内消息的唯一标识符。Kafka 会根据偏移量来确保消费者能准确地读取消息。
  • LeaderReplica:Kafka 在每个分区中维护一个 Leader 副本,其他副本为 Replica。消费者从 Leader 分区读取数据。
3. 消息结构示意

以下是 Kafka 消息的简化结构:
| Key (Optional) | Value (Required) | Timestamp | Headers (Optional) | Metadata (Partition, Offset) |
|-----------------|------------------|-----------|--------------------|-----------------------------|
| User ID | User Message | 1629384 | { 'header1': 'value1'} | Partition: 0, Offset: 1234 | | Order ID | Order Details | 1629385 | { 'header2': 'value2'} | Partition: 1, Offset: 1235 |
4. 消息的物理存储结构

在 Kafka 中,消息是按分区存储的,每个分区都是一个 日志。消息存储在磁盘上,格式如下:


  • Kafka 将每个分区的数据存储为多个 日志段文件(Log Segments),每个日志段包罗一组连续的消息。
  • 每条消息在存储时会附加一个 偏移量(Offset)。偏移量是分区内的唯一标识符,资助消费者跟踪已消费的消息。
5. 消息的传输和存储流程

Kafka 消息的传输和存储大抵流程如下:

  • 生产者发送消息:生产者使用 Kafka 提供的 Producer API 发送消息,并根据分区策略将消息写入到目标分区。
  • 消息存储:Kafka 将消息存储到对应分区的日志中。每个分区有多个 日志文件,消息会附带一个偏移量。
  • 消息消费:消费者通过 Consumer API 消费消息,通过偏移量举行跟踪。
6. 例子:发送消息的结构

假设我们要发送一条消息,消息包罗:


  • Key: user123
  • Value: User message content
  • Timestamp: 当前时间
  • Headers: {'header1': 'value1'}
使用 Kafka 的生产者发送这条消息时,Kafka 会将消息的 KeyValue 转换为字节数组,然后存储和传输。Kafka 会计算 key.hashCode() 并决定该消息应写入哪个分区。消息的时间戳和其他元数据(如分区、偏移量)将与消息一起存储。

总结

Kafka 的消息格式主要由 KeyValueTimestampHeadersMetadata 构成。消息本身是一个字节数组,Kafka 通过分区和偏移量来管理消息的顺序和存储。Kafka 提供了机动的消息格式,可以根据须要传输任何范例的数据,并且在分布式系统中保持高效的读写性能。
6.2. Kafka 的压缩流程

Kafka 的压缩流程,一言以概之——Producer 端压缩、Broker 端保持、Consumer 端解压缩。
压缩过程

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。
生产者步伐中配置 compression.type 参数即表示启用指定范例的压缩算法。
【示例】开启 GZIP 的 Producer 对象
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("acks", "all");
  4. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. // 开启 GZIP 压缩
  7. props.put("compression.type", "gzip");
  8. Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
通常,Broker 从 Producer 端吸收到消息后,不做任何处理。以下两种环境除外:


  • 环境一:Broker 端指定了和 Producer 端不同的压缩算法。显然,应该尽量避免这种环境。
  • 环境二:Broker 端发生了消息格式转换。所谓的消息格式转换,主要是为了兼容老版本的消费者步伐。在一个生产环境中,Kafka 集群中同时生存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般环境下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。
所谓零拷贝,说的是当数据在磁盘和网络举行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此假如 Kafka 享受不到这个特性的话,性能必然有所损失,以是尽量保证消息格式的统一吧,这样不仅可以避免不须要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。
解压缩的过程

通常来说解压缩发生在消费者步伐中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样生存起来。当 Consumer 步伐哀求这部门消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
那么如今题目来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?实在答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息聚集中,这样当 Consumer 读取到消息聚集时,它天然就知道了这些消息使用的是哪种压缩算法。
压缩算法

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。
在实际使用中,GZIP、Snappy、LZ4 乃至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
假如客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗

Kafka 生产者压缩流程

当生产者向 Kafka 发送消息时,生产者会根据配置的压缩范例举行压缩。具体流程如下:

  • 消息批量化

    • Kafka 的生产者客户端会将多条消息聚集成一个消息批次(batch)。Kafka 的消息通常是批量发送的,而不是一条一条发送的。通过批量发送,Kafka 可以淘汰网络开销,提高吞吐量。

  • 压缩消息批次

    • 一旦消息被打包成一个批次,生产者会根据配置的压缩方式对整个批次举行压缩。压缩是在消息批次级别举行的,而不是单条消息。
    • 假如 compression.type 配置为 gzip,则生产者会使用 gzip 算法对该消息批次举行压缩。压缩后的批次将以压缩格式存储。

  • 发送压缩批次

    • 压缩后的消息批次被发送到 Kafka 的 Broker。生产者会根据分区策略选择目标分区并将消息发送到对应的分区。

Kafka Broker 存储压缩消息

在 Kafka 的 Broker 端,消息存储在日志文件中。当消息批次被传输到 Broker 时,压缩后的批次会被直接存储在日志文件中。


  • 磁盘存储:Kafka 会将压缩后的批次直接写入磁盘。由于压缩淘汰了数据的存储空间,Kafka 可以在磁盘上存储更多的消息。
  • 副本同步:Kafka 会将压缩后的消息副本同步到其他 Broker 中的副本。压缩后的消息会以相同的压缩格式存储在各个副本中,保持数据一致性。
Kafka 消费者解压缩流程

消费者从 Kafka 中读取消息时,Kafka 会执行解压缩操作。具体流程如下:

  • 拉取压缩消息

    • 消费者通过 KafkaConsumer 拉取消息。假如消息批次是压缩格式的,消费者会发现批次数据是压缩的。

  • 解压缩消息

    • Kafka 会在消费者端主动解压缩这些压缩的消息批次。解压缩是按批次举行的,消费者会根据压缩范例(gzip、snappy、lz4 或 zstd)主动选择相应的解压缩算法。
    • 解压后的消息会被传递给消费者应用步伐,消费者可以按正常的方式处理这些消息。

  • 消费未压缩消息

    • 假如消息没有经过压缩,消费者将直接读取和消费未压缩的消息。

压缩的上风和应用场景

a. 降低存储空间



  • 压缩大大淘汰了 Kafka 中消息的存储空间,尤其是在存储大量数据时,这可以显著降低磁盘空间的使用。
b. 提高吞吐量



  • 通过压缩消息,Kafka 淘汰了网络传输中的数据量,从而提高了吞吐量。在高流量环境下,压缩能够淘汰网络延迟和带宽使用。
c. 降低网络带宽使用



  • 在消息通过网络传输时,压缩可以显著淘汰须要传输的数据量,降低网络带宽的消耗,特殊是在高负载的分布式环境中。
d. 数据存储和传输优化



  • Kafka 的压缩能够优化存储和传输过程,尤其是当消息内容重复或有一定的模式时,压缩效果尤为显著。
选择压缩算法的考量



  • gzip:得当须要更高压缩比,但对延迟敏感度较低的场景。由于其压缩比高,但速度相对较慢,因此得当批量处理或存储密集型任务。
  • snappy:得当对延迟敏感且数据量较大的场景。它提供了较快的压缩和解压速度,但压缩比略低。
  • lz4:适用于实时性要求较高的系统,提供快速的压缩和解压速度,压缩比适中。
  • zstd:提供高压缩比和较快的压缩速度,适用于大规模数据处理任务。

6.3. 何时启用压缩

何时启用压缩是比较合适的时机呢?
压缩是在 Producer 端完成的工作,那么启用压缩的一个条件就是 Producer 步伐运行机器上的 CPU 资源要很富足。假如 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会拔苗助长。
假如环境中带宽资源有限,那么也建议开启压缩。
7. 幂等性

7.1. 什么是幂等性

幂等性(Idempotence) 是指在多次执行同一操作时,操作的结果与执行的次数无关,只有第一次执行会产见效果,后续相同的操作不会对系统的状态造成任何额外的影响。在编程和分布式系统中,幂等性通常用于确保在网络题目或重复哀求的环境下,操作的结果不会重复或产生不一致。
幂等性的根本特点:



  • 多次执行,结果不变:即使同一个操作被多次执行,结果依然是相同的,系统的状态不会发生变革。
  • 系统容错性:在分布式系统中,网络停止或超时可能导致哀求被重复发送。幂等性确保即使操作被多次执行,也不会导致错误的状态。
7.2. Kafka Producer 的幂等性

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它实在是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的环境。在 0.11 之后,指定 Producer 幂等性的方法很简朴,仅须要设置一个参数即可,即 props.put(“enable.idempotence”, ture),
或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence 被设置成 true 后,Producer 主动升级成幂等性 Producer,其他所有的代码逻辑都不须要改变。Kafka 主动帮你做消息的去重。底层具体的原理很简朴,就是经典的用空间去换时间的优化思路,即在 Broker 端多生存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够主动知晓这些消息已经重复了,于是可以在背景默默地把它们“丢弃”掉。固然,实际的实现原理并没有这么简朴,但你大抵可以这么理解。
我们必须要了解幂等性 Producer 的作用范围:


  • 起首,enable.idempotence 只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
  • 其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 历程的一次运行。当你重启了 Producer 历程之后,这种幂等性保证就丧失了。
假如想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)大概依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

Kafka Producer 的幂等性实现原理

Kafka 生产者的幂等性是通过以下机制实现的:

  • 生产者配置 enable.idempotence=true: Kafka 生产者通过设置 enable.idempotence=true 来启用幂等性功能。在启用此功能时,生产者在发送消息时,会主动为每个哀求分配一个 消息序列号,Kafka 会根据该序列号来判断消息是否是重复的。
  • 消息序列号(Producer Sequence Number): Kafka 在生产者发送每条消息时,会生成一个 消息序列号(即每个 Producer 实例的消息计数器)。每个消息会携带一个序列号,在每个分区的消息队列中,消息会按照其序列号举行排序。Kafka Broker 会根据这些序列号来判断消息是否重复。假如一个消息被发送多次,Kafka 会根据序列号辨认并丢弃重复的消息。
  • 事务性(Transaction): 在 Kafka 中,幂等性和 事务 是密切相关的。Kafka 通过对每个生产者分配一个 Producer ID事务 ID 来确保幂等性。每次发送消息时,生产者会带上事务 ID。假如该事务的消息由于某些缘故原由(如网络失败)没有乐成发送并发生重试,Kafka 会根据事务 ID 确保重复消息不会被写入。即使生产者重试发送同样的消息,Kafka 会确保这些重复的消息不会再次被存储。
  • 分区级别的去重: Kafka 生产者的幂等性是分区级别的。即生产者为每个分区维护一个 单独的序列号。当生产者发送消息时,Kafka 使用分区级别的序列号来判断该消息是否已经被写入分区。假如已经写入,则该消息会被丢弃。
  • Kafka 的哀求和响应机制: Kafka 在举行消息写入时,使用一个 确认机制(acknowledgement),它保证了消息的乐成写入。在幂等性启用的环境下,即使由于网络题目或超时,生产者会主动重试发送消息,但只会将消息写入 Kafka 一次。
Kafka Producer 的幂等性工作流程


  • 生产者发送消息: 当生产者发送消息时,它会为每个消息分配一个序列号,并使用这个序列号来追踪消息。
  • 消息到达 Broker: 消息到达 Broker 后,Broker 会查抄消息的序列号和 Producer ID。假如 Broker 已经吸收到该消息,大概序列号不符合要求(例如,序列号过小),它会忽略该消息。
  • 消息写入分区: Broker 会将消息写入分区,并确保消息的顺序和唯一性。假如消息已经存在(例如,重复的消息),Kafka 会主动丢弃这些消息,而不会重复写入。
  • 确认消息写入: 假如消息乐成写入分区,Kafka 会向生产者返回乐成的响应。生产者收到确认后才会认为消息被乐成写入。
  • 重试机制: 假如由于网络题目等缘故原由,生产者没有收到消息写入乐成简直认,它会重试发送相同的消息。由于启用了幂等性,Kafka 会确保即使发生重试,消息也不会被重复写入。
Kafka 的幂等性如何保证精确一次交付

启用幂等性后,Kafka 生产者能够保证消息的精确一次交付(Exactly Once Semantics,EOS)。这意味着:


  • 重复消息不会写入:即使网络延迟或其他题目导致消息重试,Kafka 会通过序列号和事务 ID 来确保消息不会重复写入。
  • 保证顺序:在单个分区内,消息会按照生产者发送的顺序严格写入,并且不会出现乱序题目。
  • 容错性:即使在高网络延迟或 Kafka Broker 宕机的环境下,Kafka 也能保证消息的幂等性和精确一次交付。
幂等性与事务性



  • 幂等性 主要保证单条消息的可靠性,确保在生产者重试时不会重复发送消息。
  • 事务性 进一步提供了跨多个消息或多个分区的精确一次交付语义。Kafka 的 事务性 确保一组消息(跨多个分区)要么全部乐成,要么全部失败。
通过启用 Kafka 事务(acks=all, transactional.id),生产者可以确保整个事务内的消息要么全部被乐成提交,要么全部被回滚。事务性提供了更强的保障,尤其适用于须要跨多个分区发送消息的应用场景。

7.3. PID 和 Sequence Number

为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。


  • PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
  • 作用:Producer ID 用于区分不同的生产者,并且资助 Kafka Broker 确定来自同一个生产者实例的消息是否已经被处理。每个生产者在启动时,Kafka 会为其分配一个唯一的 Producer ID。这个 ID 保证了即使在网络或其他故障的环境下,生产者的消息能够正确地被辨认并去重。
  • 分配方式:Kafka 生成 Producer ID 时,使用了一个全局唯一的标识符,并在 Kafka Broker 中为每个生产者维护一个 Producer ID。每个生产者实例只有一个 Producer ID,它在整个生命周期内都不会改变。
  • Sequence Numbler。对于每个 PID,该 Producer 发送数据的每个 <Topic, Partition> 都对应一个从 0 开始单调递增的 Sequence Number。
  • 作用:Sequence Number 主要用于保证消息的顺序性和幂等性。Kafka 通过查抄消息的 Sequence Number 来确定是否为重复消息。假如生产者发送的消息序列号与已经吸收到的消息序列号相同,Kafka 会丢弃重复的消息,避免重复写入。
  • 工作原理:每个生产者(通过 Producer ID 标识)都维护一个递增的序列号,每发送一条消息,序列号就加 1。这样,Kafka 可以通过该序列号判断一条消息是否是重复的。对于同一个生产者 ID 和分区,Kafka 会严格按照序列号顺序处理消息。
Broker 端在缓存中生存了这 seq number,对于吸收的每条消息,假如其序号比 Broker 缓存中序号大于 1 则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个 Producer 对于同一个 <Topic, Partition> 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 partion 幂等。
如何通过 PID 和 Sequence Number 实现幂等性



  • Producer ID 和 Sequence Number 的结合:当 Kafka 生产者发送一条消息时,消息携带着生产者的 Producer ID 和消息的 Sequence Number。这样,Kafka 就能够知道这条消息来自哪个生产者,并且可以查抄该生产者发送的消息是否已经处理过。通过对比 Producer IDSequence Number,Kafka 可以判断消息是否重复。
  • 重复消息的判断:假如生产者发送的消息与之前已经存储在 Kafka 中的消息具有相同的 Producer ID 和 Sequence Number,Kafka 会认为这条消息是重复的并丢弃它。因此,即使在网络超时或其他故障导致生产者重试发送同一条消息的环境下,Kafka 也能通过 PID 和 Sequence Number 确保该消息只会被写入一次。
Kafka Producer 的幂等性工作流程


  • 生产者发送消息:当生产者向 Kafka 发送消息时,Kafka 为每个生产者分配一个唯一的 Producer ID。每次生产者发送消息时,都会附带一个递增的 Sequence Number。这个消息就包罗了 Producer IDSequence Number
  • Kafka Broker 吸收到消息:Kafka Broker 会通过 Producer IDSequence Number 来判断消息是否是重复的。

    • 假如同一个生产者(由 Producer ID 标识)在短时间内发送了多次相同的消息(例如,由于网络题目导致生产者重试),Kafka 会通过 Sequence Number 来判断消息是否已经存在。假如消息的 Sequence Number 已经处理过,那么 Kafka 会丢弃这条消息。

  • 消息写入:假如消息是新的(即其 Sequence Number 是递增的),Kafka 会将消息写入分区。Kafka 确保每个生产者的消息按照其 Sequence Number 顺序写入。
  • 生产者重试:在生产者发送消息时,假如没有收到确认(例如,由于网络题目),生产者会重试发送相同的消息。因为每条消息都包罗 Producer IDSequence Number,即使生产者重试发送相同的消息,Kafka 也会根据消息的序列号来确认消息是否已经处理,避免重复插入。
  • 消息确认:一旦消息乐成写入 Kafka,Broker 会返回一个乐成简直认响应给生产者。假如生产者没有收到确认,会举行重试;否则,消息就被认为已经乐成写入。
Kafka 生产者的幂等性配置

为了启用生产者的幂等性功能,必须在生产者的配置中启用 enable.idempotence=true。同时,生产者的以下配置参数也与幂等性密切相关:


  • acks=all:确保所有副本都收到消息。这是启用幂等性的关键配置之一,确保生产者发送的消息能在所有副本上确认并最终一致。
  • retries=Integer.MAX_VALUE:配置生产者的重试次数为无穷次,这样在网络颠簸或 Broker 故障的环境下,生产者会不断尝试重新发送消息,直到消息乐成写入。
  • linger.ms 和 batch.size:这些配置资助生产者将更多消息合并成一个批次,提升吞吐量。
配置示例:

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. // 启用幂等性props.put("acks", "all"); // 确保所有副本确认收到消息props.put("retries", Integer.MAX_VALUE); // 设置重试次数为无穷props.put("linger.ms", 1); // 最小延迟,更多消息打包在一起发送props.put("batch.size", 16384); // 批量巨细// 启用幂等性props.put("enable.idempotence", "true");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
复制代码
7.4. 幂等性的应用实例

(1)配置属性
须要设置:


  • enable.idempotence,须要设置为 ture,此时就会默认把 acks 设置为 all,以是不须要再设置 acks 属性了。
  1. // 指定生产者的配置
  2. final Properties properties = new Properties();
  3. properties.put("bootstrap.servers", "localhost:9092");
  4. // 设置 key 的序列化器
  5. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. // 设置 value 的序列化器
  7. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8. // 开启幂等性
  9. properties.put("enable.idempotence", true);
  10. // 设置重试次数
  11. properties.put("retries", 3);
  12. //Reduce the no of requests less than 0
  13. properties.put("linger.ms", 1);
  14. // buffer.memory 控制生产者可用于缓冲的内存总量
  15. properties.put("buffer.memory", 33554432);
  16. // 使用配置初始化 Kafka 生产者
  17. producer = new KafkaProducer<>(properties);
复制代码
(2)发送消息
跟一般生产者一样,如下
  1. public void produceIdempotMessage(String topic, String message) {
  2.     // 创建Producer
  3.     Producer producer = buildIdempotProducer();
  4.     // 发送消息
  5.     producer.send(new ProducerRecord<String, String>(topic, message));
  6.     producer.flush();
  7. }
复制代码
此时,因为我们并没有配置 transaction.id 属性,以是不能使用事务相关 API,如下
  1. producer.initTransactions();
复制代码
否则会出现如下错误:
  1. Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.
  2.     at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)
  3.     at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)
  4.     at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)
复制代码
8. Kafka 事务

Kafka 的事务是为了提供 精确一次交付(Exactly Once Semantics, EOS) 保证而设计的,确保消息的 原子性一致性。通过 Kafka 的事务机制,生产者可以确保消息在多分区和多主题的环境下要么完全乐成提交,要么完全回滚,从而避免了数据丢失和重复消息的环境。
8.1. 事务

Kafka 事务的关键概念


  • 事务 ID(Transactional ID): 每个 Kafka 生产者在启用事务时都会指定一个 事务 ID,它是一个唯一的标识符,用来追踪该生产者发出的所有事务消息。这个 ID 答应 Kafka 区分不同的生产者,并确保同一生产者的消息按顺序提交或回滚。
  • 事务隔离级别: Kafka 提供两种事务隔离级别:

    • read_committed:消费者只会读取已提交的消息,不会读取事务中的未提交消息。默认的隔离级别是 read_committed。
    • read_uncommitted:消费者可以读取未提交的消息(事务内的消息)。这种模式下,消费者可能会读取到部门提交的事务数据。

  • 事务日志: Kafka 使用事务日志来管理事务的状态。消息起首写入事务日志,在事务提交时才会正式写入日志文件。假如事务回滚,则所有相关消息会被丢弃。
  • 生产者 API: Kafka 为生产者提供了事务控制的 API,答应生产者开始事务、提交事务或回滚事务。这些操作保证了事务内所有消息的原子性。

    • beginTransaction():开始一个事务。
    • commitTransaction():提交事务,表示所有消息乐成写入。
    • abortTransaction():回滚事务,表示所有消息无效并丢弃。

Kafka 事务的工作流程

Kafka 事务的工作流程可以简化为以下几个步调:

  • 生产者启动事务: 当 Kafka 生产者启用事务时,Kafka 会为其分配一个唯一的 事务 ID,生产者可以通过调用 beginTransaction() 开始事务。
  • 发送消息: 在事务开始后,生产者可以向 Kafka Broker 发送消息。消息会被标志为事务消息,并且先写入事务日志,而不是立即提交到日志文件。
  • 提交事务: 假如生产者确认事务内的所有消息都乐成发送,可以调用 commitTransaction() 来提交事务。这时,所有消息会正式写入 Kafka 中。
  • 回滚事务: 假如生产者在事务处理中遇到错误,大概想要取消事务,可以调用 abortTransaction() 往返滚事务。回滚时,事务内的所有消息会被丢弃。
  • 事务日志与消息提交: Kafka 会将事务消息写入事务日志,直到事务被提交或回滚。在提交时,消息会正式写入 Kafka 的日志文件;假如回滚,事务中的消息会被丢弃。
  • 消费者读取消息: 消费者默认只读取已提交的消息(read_committed 隔离级别)。因此,消费者不会读取到正在举行中的事务消息,保证了数据的一致性。
Kafka 事务的使用场景

Kafka 事务非常得当以下场景:

  • 跨多个分区或主题的消息发送: Kafka 事务能够保证在跨多个分区或多个主题发送消息时,所有消息要么都乐成,要么都失败。
  • 精确一次交付: Kafka 的事务功能能够确保消息的精确一次交付,避免消息丢失或重复消费,适用于须要严格一致性保证的场景。
  • 防止重复消费: Kafka 事务可以防止在消费者端读取到重复的消息,确保消费者不会因为生产者的重试机制而多次处理同一条消息。
Kafka 事务的限制


  • 性能开销: 启用事务会导致性能开销,尤其是在写入和提交事务时,Kafka 须要处理事务日志和事务的提交状态。
  • 事务提交延迟: 由于事务须要等待所有副本确认并写入事务日志,因此事务提交的延迟可能较高,尤其是在网络延迟较大的环境下。
  • 事务性消费的复杂性: 在消费端,假如须要处理跨事务的消息,可能须要一些额外的处理逻辑,例如处理事务失败和重新消费的场景。
总结

Kafka 事务提供了 精确一次交付跨分区、跨主题的原子操作,确保消息的处理既可靠又一致。通过 transactional.id,Kafka 能够确保生产者的消息处理具有一致性,消费者也能够根据事务隔离级别读取正确的消息。尽管事务机制引入了一些性能开销,但它对于须要严格一致性保证的应用场景,尤其是分布式系统中,提供了强盛的支持。
8.2. 事务型 Producer

生产者端配置


  • transactional.id:指定生产者的事务 ID,生产者将通过这个 ID 来标识它的事务。
  • acks=all:确保所有副本都确认消息,保证消息的持久性和一致性。
  • retries=Integer.MAX_VALUE:设置生产者的重试次数为无穷次,以确保即使发生网络颠簸,消息也能最终提交。
配置示例(生产者端)

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. // 启用事务props.put("acks", "all"); // 确保所有副本都收到消息props.put("retries", Integer.MAX_VALUE); // 重试次数设为无穷次props.put("transactional.id", "my-transaction-id"); // 事务 ID,用于标识事务KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 开始事务producer.beginTransaction();try {    // 发送消息    producer.send(new ProducerRecord<>(topic, key, value));    // 提交事务    producer.commitTransaction();} catch (ProducerFencedException | OutOfMemoryError | KafkaException e) {    // 事务失败,回滚    producer.abortTransaction();} finally {    producer.close();}
复制代码
消费者端配置

消费者端的配置决定了是否能够读取未提交的消息。默认环境下,消费者读取已提交的消息。消费者可以通过 isolation.level 配置来指定事务的隔离级别:


  • read_committed:只读取已提交的消息。
  • read_uncommitted:读取所有消息,包括未提交的事务消息。
配置示例(消费者端)

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "my-consumer-group");
  4. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  5. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  6. // 设置事务隔离级别为 read_committed,表示只读取已提交的消息
  7. props.put("isolation.level", "read_committed");
  8. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  9. consumer.subscribe(Collections.singletonList("my-topic"));
  10. while (true) {
  11.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  12.     for (ConsumerRecord<String, String> record : records) {
  13.         // 处理消息
  14.     }
  15. }
复制代码

8.3. 事务操作的 API

Producer 提供了 initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五个事务方法。
  1. /**
  2.      * 初始化事务。需要注意的有:
  3.      * 1、前提
  4.      * 需要保证transation.id属性被配置。
  5.      * 2、这个方法执行逻辑是:
  6.      *   (1)Ensures any transactions initiated by previous instances of the producer with the same
  7.      *      transactional.id are completed. If the previous instance had failed with a transaction in
  8.      *      progress, it will be aborted. If the last transaction had begun completion,
  9.      *      but not yet finished, this method awaits its completion.
  10.      *    (2)Gets the internal producer id and epoch, used in all future transactional
  11.      *      messages issued by the producer.
  12.      *
  13.      */
  14.     public void initTransactions();
  15.     /**
  16.      * 开启事务
  17.      */
  18.     public void beginTransaction() throws ProducerFencedException ;
  19.     /**
  20.      * 为消费者提供的在事务内提交偏移量的操作
  21.      */
  22.     public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
  23.                                          String consumerGroupId) throws ProducerFencedException ;
  24.     /**
  25.      * 提交事务
  26.      */
  27.     public void commitTransaction() throws ProducerFencedException;
  28.     /**
  29.      * 放弃事务,类似回滚事务的操作
  30.      */
  31.     public void abortTransaction() throws ProducerFencedException ;
复制代码
8.4. Kafka 事务相关配置

Kafka 事务相关配置


  • transactional.id

    • 描述:设置 Kafka 生产者的事务 ID,这是每个事务的唯一标识符。它用于标识属于同一生产者实例的所有事务操作。
    • 范例:String
    • 默认值:无(必须手动设置)
    • 配置示例: props.put("transactional.id", "my-transaction-id");
    • 作用:确保 Kafka 能够在一个事务内追踪所有的消息,并且通过事务 ID 来处理事务的提交或回滚。

  • acks

    • 描述:控制消息写入简直认级别,确保副本同步并写入磁盘。
    • 范例:String
    • 可选值

      • 0:生产者不等待确认。
      • 1:生产者等待向导者确认。
      • all:生产者等待所有副本确认,确保数据的高可靠性。

    • 默认值:1
    • 配置示例: props.put("acks", "all"); // 确保所有副本都确认
    • 作用:在启用事务时,acks 设置为 all 是保举配置,它确保消息在所有副本都被确认后才算提交,从而保证数据一致性。

  • retries

    • 描述:指定在发送失败时的重试次数,通常须要将其设置为较大的值,以确保消息最终被乐成发送。
    • 范例:int
    • 默认值:0
    • 配置示例: props.put("retries", Integer.MAX_VALUE); // 设置为最大值,确保尽可能多的重试
    • 作用:为了支持事务,retries 配置应设置为 Integer.MAX_VALUE,确保 Kafka 生产者在发生暂时的网络或其他故障时能够举行重试。

  • acks 和 retries 配置的配合使用 在生产者配置中,acks 和 retries 配置是密切配合的。为了确保事务的可靠性,通常建议:

    • acks 设置为 all,以确保消息在所有副本都被确认后才算完成。
    • retries 设置为较大的值(如 Integer.MAX_VALUE),以保证消息发送的可靠性。

  • max.in.flight.requests.per.connection

    • 描述:控制单个连接上最多可以有多少个未完成的哀求(哀求指消息的发送)。
    • 范例:int
    • 默认值:5
    • 配置示例: props.put("max.in.flight.requests.per.connection", "1");
    • 作用:为了确保事务的幂等性,max.in.flight.requests.per.connection 应该设置为 1。这可以防止在发生重试时出现乱序题目。

  • transaction.timeout.ms

    • 描述:指定事务超时时间。假如事务在该时间内没有被提交或回滚,Kafka 会主动回滚事务。
    • 范例:long
    • 默认值:60000(60秒)
    • 配置示例: props.put("transaction.timeout.ms", "300000"); // 设置超时时间为 5 分钟
    • 作用:防止事务长时间未提交,设置一个合理的超时时间可以避免系统因未完成的事务而壅闭。

  • isolation.level

    • 描述:控制消费者读取事务消息的隔离级别。可以选择:

      • read_committed:只读取已提交的消息(默认设置)。
      • read_uncommitted:读取所有消息,包括未提交的消息。

    • 范例:String
    • 默认值:read_committed
    • 配置示例: props.put("isolation.level", "read_committed"); // 只读取已提交的消息
    • 作用:确保消费者读取的是已经完全提交的事务消息。设置为 read_committed 使得消费者不会读取到正在举行中的事务消息。

使用 kafka 的事务 api 时的一些注意事项:


  • 须要消费者的主动模式设置为 false,并且不能子再手动的举行执行 consumer#commitSync 大概 consumer#commitAsyc
  • 设置 Producer 端参数 transctional.id。最好为其设置一个故意义的名字。
  • 和幂等性 Producer 一样,开启 enable.idempotence = true。假如配置了 transaction.id,则此时 enable.idempotence 会被设置为 true
  • 消费者须要配置事务隔离级别 isolation.level。在 consume-trnasform-produce 模式下使用事务时,必须设置为 READ_COMMITTED。

    • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,假如你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
    • read_committed:表明 Consumer 只会读取事务型 Producer 乐成提交事务写入的消息。固然了,它也能看到非事务型 Producer 写入的所有消息。

在使用 Kafka 事务 API 时,有几个关键的注意事项:

  • 事务 ID 唯一性

    • 确保 transactional.id 对每个生产者是唯一的。多个生产者不能使用相同的事务 ID,否则会造成冲突。
    • Kafka 使用事务 ID 来追踪每个生产者的事务,因此每个生产者实例应该具有唯一的事务 ID,尤其是在分布式环境中。

  • 处理事务失败

    • 事务操作可能会失败,尤其是在生产者发送消息时遇到网络题目或 Kafka Broker 制止服务的环境下。必须通过适当的非常捕捉来处理这些失败,例如 ProducerFencedException、OutOfMemoryError、KafkaException 等。
    • 在发生错误时,要调用 abortTransaction() 往返滚事务,确保不将部门消息提交。

  • 事务提交与回滚

    • 在正常的消息发送流程中,一旦所有消息都乐成发送,就应该调用 commitTransaction() 来提交事务。假如发送消息时遇到任何题目,应调用 abortTransaction() 往返滚事务。
    • 一定要确保事务的提交和回滚操作正确,否则可能会导致数据的不一致。

  • 事务超时

    • 事务超时(transaction.timeout.ms)是一个紧张的配置项,应该根据业务需求举行适当配置。假如一个事务在规定时间内未完成,Kafka 会主动回滚该事务。
    • 注意,事务超时的设置应该与 Kafka Broker 的设置保持一致。

  • 消息的幂等性与事务的结合

    • Kafka 在启用事务时,主动开启了生产者的幂等性机制。这意味着即使发生消息重试,也不会造成重复消息。因此,为了确保事务的可靠性,必须同时启用幂等性 (acks=all 和 retries=Integer.MAX_VALUE)。

  • 高吞吐量与事务性能开销

    • 事务性操作会增长一定的性能开销,因为每次消息的发送都须要记录事务日志,并且 Kafka 须要处理消息的提交和回滚。高吞吐量的场景下,可能会导致延迟增长,因此在设计系统时须要权衡事务的一致性保证与性能之间的均衡。

  • 事务的重试与消息顺序

    • Kafka 在发送事务消息时提供了重试机制,但假如事务提交期间出现网络故障或 Kafka Broker 故障,消息可能会发生重试。在启用事务时,必须确保 max.in.flight.requests.per.connection 配置为 1,以防止消息在重新发送时乱序。

  • 事务日志和消息提交

    • Kafka 会将事务内的所有消息写入事务日志,确保消息的原子性。只有在调用 commitTransaction() 时,消息才会被正式提交到分区。假如发生回滚,事务内的消息会被丢弃,确保不会将不一致的数据提交到消费者。

8.5. Kafka 事务应用示例

Kafka 事务功能能够确保跨多个分区和多个主题的消息处理具有 原子性,保证消息要么完全提交,要么完全回滚,从而避免了数据丢失和重复的环境。以下是一个 Kafka 事务应用的示例,展示如何使用 Kafka 的事务功能来保证数据一致性。
Kafka 事务应用场景

假设我们有一个订单系统,该系统须要将订单消息发送到多个主题(例如 order-topic 和 inventory-topic),在这种环境下,确保两个主题的数据一致性是非常紧张的。假如订单创建消息已经乐成发送到 order-topic,但由于某种缘故原由,库存更新失败并没有乐成发送到 inventory-topic,就会导致数据不一致。因此,使用 Kafka 事务可以确保两个消息要么同时乐成,要么都失败。
Kafka 生产者事务应用示例

假设我们正在创建一个订单,并须要同时更新 order-topic 和 inventory-topic。假如所有消息都乐成,我们将提交事务;假如出现任何错误,我们将回滚事务。
1. 配置 Kafka 生产者

起首,我们须要配置 Kafka 生产者的事务参数:
  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. public class KafkaTransactionProducer {
  6.     public static void main(String[] args) {
  7.         // 配置生产者
  8.         Properties props = new Properties();
  9.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  10.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  11.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  12.         
  13.         // 启用事务
  14.         props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-inventory-transaction");  // 事务 ID
  15.         props.put(ProducerConfig.ACKS_CONFIG, "all");  // 确保所有副本都确认
  16.         props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);  // 设置重试次数为无限
  17.         
  18.         // 创建 Kafka 生产者
  19.         KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  20.         
  21.         try {
  22.             // 开始事务
  23.             producer.beginTransaction();
  24.             
  25.             // 发送订单创建消息
  26.             ProducerRecord<String, String> orderMessage = new ProducerRecord<>("order-topic", "order1", "Order created: order1");
  27.             producer.send(orderMessage);
  28.             
  29.             // 发送库存更新消息
  30.             ProducerRecord<String, String> inventoryMessage = new ProducerRecord<>("inventory-topic", "order1", "Inventory updated for order1");
  31.             producer.send(inventoryMessage);
  32.             
  33.             // 如果两条消息都成功,提交事务
  34.             producer.commitTransaction();
  35.             System.out.println("Transaction committed successfully.");
  36.         } catch (Exception e) {
  37.             // 如果发生任何异常,回滚事务
  38.             producer.abortTransaction();
  39.             System.out.println("Transaction aborted due to an error.");
  40.         } finally {
  41.             // 关闭生产者
  42.             producer.close();
  43.         }
  44.     }
  45. }
复制代码
2. 消费者配置和读取消息

在这个示例中,我们有两个主题 order-topic 和 inventory-topic,消费者须要确保只能读取已提交的消息。
消费者配置示例

消费者会在 order-topic 和 inventory-topic 上订阅消息,并根据事务隔离级别读取已提交的消息:
  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.util.Collections;
  4. import java.util.Properties;
  5. public class KafkaTransactionConsumer {
  6.     public static void main(String[] args) {
  7.         // 配置消费者
  8.         Properties props = new Properties();
  9.         props.put("bootstrap.servers", "localhost:9092");
  10.         props.put("group.id", "order-consumer-group");
  11.         props.put("key.deserializer", StringDeserializer.class.getName());
  12.         props.put("value.deserializer", StringDeserializer.class.getName());
  13.         
  14.         // 设置事务隔离级别,确保只读取已提交的消息
  15.         props.put("isolation.level", "read_committed");
  16.         
  17.         // 创建 Kafka 消费者
  18.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  19.         
  20.         // 订阅主题
  21.         consumer.subscribe(Collections.singletonList("order-topic"));
  22.         consumer.subscribe(Collections.singletonList("inventory-topic"));
  23.         
  24.         while (true) {
  25.             // 拉取消息
  26.             consumer.poll(1000).forEach(record -> {
  27.                 System.out.println("Consumed record: " + record.value());
  28.             });
  29.         }
  30.     }
  31. }
复制代码

Kafka 事务机制工作原理

生产者端事务流程:


  • 开始事务: 使用 beginTransaction() 开始事务。
  • 发送消息: 发送消息到多个分区和主题。在事务内,消息不会立即被写入到 Kafka 日志。
  • 提交事务: 假如所有消息都乐成发送并且没有出现错误,调用 commitTransaction() 来提交事务。
  • 回滚事务: 假如在发送消息过程中发生任何错误,调用 abortTransaction() 往返滚事务,所有消息都会丢失。
消费者端事务隔离:



  • 消费者使用 isolation.level=read_committed 配置来确保它们只会读取已经提交的消息。消费者会忽略正在举行中的事务消息,避免读取到部门提交的消息。
4. Kafka 事务的非常处理

事务在 Kafka 中可能会遇到一些非常,尤其是在生产者端。以下是一些可能的非常和对应的处理方法:


  • ProducerFencedException:当生产者尝试举行操作时,发现它已经不再拥有事务 ID 时,Kafka 会抛出此非常。此时应该终止当前生产者并重新启动。
  • OutOfMemoryError:假如生产者遇到内存题目,事务会被回滚,并且可能须要重试。
  • KafkaException:Kafka 通用非常,表示发生了未知错误。假如捕捉到此非常,应回滚事务。
5. 生产者和消费者的性能考量



  • 事务的性能开销:启用事务会增长一定的性能开销,特殊是当涉及到跨多个分区和主题时。事务的提交须要等待所有副本简直认,这可能导致较长的延迟。
  • 消费者的性能影响:在启用 read_committed 时,消费者须要等待 Kafka 中的事务完成才气读取数据,这也可能会影响消费的延迟。
9. 生产者的配置

   更详尽的生产者配置可以参考:Kafka 生产者官方配置说明(opens new window)
  以下为生产者主要配置参数清单:


  • acks:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是乐成的。默认为 acks=1

    • acks=0 假如设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立即添加到 socket buffer 中并认为已经发送完成。在这种环境下,服务器是否收到哀求是没法保证的,并且参数retries也不访问效(因为客户端无法得到失败信息)。每个记录返回的 offset 总是被设置为-1。
    • acks=1 假如设置为 1,leader 节点会将记录写入当地日志,并且在所有 follower 节点反馈之前就先确认乐成。在这种环境下,假如 leader 节点在吸收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
    • acks=all 假如设置为 all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对哀求传递的最有效保证。acks=-1 与 acks=all 是等效的。

  • buffer.memory:用来设置 Producer 缓冲区巨细。
  • compression.type:Producer 生成数据时可使用的压缩范例。默认值是 none(即不压缩)。可配置的压缩范例包括:none、gzip、snappy 、lz4 或 zstd。压缩是针对批处理的所有数据,以是批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。
  • retries:用来设置发送失败的重试次数。
  • batch.size:用来设置一个批次可占用的内存巨细。
  • linger.ms:用来设置 Producer 在发送批次前的等待时间。
  • client.id:Kafka 服务器用它来辨认消息源,可以是恣意字符串。
  • max.in.flight.requests.per.connection:用来设置 Producer 在收到服务器响应前可以发送多少个消息。
  • timeout.ms:用来设置 Broker 等待同步副本返回消息确认的时间,与 acks 的配置相匹配。
  • request.timeout.ms:Producer 在发送数据时等待服务器返回响应的时间。
  • metadata.fetch.timeout.ms:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。
  • max.block.ms:该配置控制 KafkaProducer.send() 和KafkaProducer.partitionsFor() 答应被壅闭的时长。这些方法可能因为缓冲区满了大概元数据不可用而被壅闭。用户提供的序列化步伐或分区步伐的壅闭将不会被计算到这个超时。
  • max.request.size:哀求的最大字节数。
  • receieve.buffer.bytes:TCP 吸收缓冲区的巨细。
  • send.buffer.bytes:TCP 发送缓冲区的巨细。
在 Kafka 中确保消息的顺序性,可以采用以下方案,具体选择取决于你的业务需求和场景:

10. 利用 Kafka 的分区特性

Kafka 保证每个分区内的消息是严格按照发送顺序存储和消费的。因此,可以通过以下方式确保消息的先后顺序:
方法1:使用相同的 Key



  • 将具有顺序要求的消息发送到同一个分区。
  • 生产者在发送消息时使用相同的 Key(例如,订单 ID、用户 ID)。
  • Kafka 根据 Key 的哈希值决定消息路由到哪个分区。
实现步调:


  • 确保 Kafka 的 Topic 设置为多分区(可以有多个分区,但顺序要求的 Key 的消息始终发往同一分区)。
  • 在生产者端,发送消息时设置 Key: ProducerRecord<String, String> record = new ProducerRecord<>("topicName", key, message);
  • producer.send(record);
  • 消费端按分区消费消息,顺序处理。

方法2: 单分区(Single Partition)设计

假如全局消息都须要严格顺序,可以使用单分区的 Topic。
优点:



  • 全局顺序性完全保证。
缺点:



  • 单分区限制了吞吐量,所有消息的生产和消费都会受限于单分区的处理速度。
实现:


  • 创建 Topic 时,指定 partitions=1: kafka-topics.sh --create --topic topicName --partitions 1 --replication-factor 1 --zookeeper localhost:2181
  • 生产者直接发送消息,不须要指定 Key。

方法3. 严格控制消费者的并发度

假如使用多分区,但某些分区的消息须要顺序处理,可以控制消费者的并发度:
方法:消费者每次只消费一个分区


  • 确保每个分区只有一个消费者。
  • 消费者按顺序处理分区内的消息。
Kafka Consumer Group 配置:



  • 每个 Consumer Group 内的消费者数量不凌驾分区数量。
  • 消费逻辑中保证单线程处理。

方法4: 使用事务(Kafka Transactional API)

假如须要同时保证顺序性和多操作的原子性,可以利用 Kafka 的事务 API:


  • Kafka 支持生产者在事务中发送消息,确保消息的顺序性和一致性。
  • 在消费端也可以启用事务模式,确保处理完一批消息后再提交偏移量。
实现:


  • 生产者开启事务: producer.initTransactions();

  • producer.beginTransaction();
  • producer.send(record);
  • producer.commitTransaction();
  • 消费者使用事务逻辑处理批量消息。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小小小幸运

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表