kafka消费者详细先容(超级详细)

打印 上一主题 下一主题

主题 877|帖子 877|积分 2631

一、Kafka 消费者与消费者组

Kafka 中的消费者(Consumer)和消费者组(Consumer Group)是 Kafka 架构中的焦点概念。它们对消息的消费模式、扩展性、可靠性以及性能有着直接影响。理解消费者和消费者组的工作原理,可以帮助我们在构建高效和可扩展的消息消费系统时做出公道的设计选择。
1.1 Kafka 消费者(Consumer)概述

Kafka 消费者是一个从 Kafka 主题(Topic)中读取消息的客户端应用程序。消费者可以使用 Kafka 提供的 API 来消费分布式系统中的消息。Kafka 支持不同的消费模型,包括单消费者和消费者组。
消费者的基本操作包括:


  • 订阅主题:消费者可以订阅一个或多个主题。
  • 拉取消息:消费者通过拉取方式(poll() 方法)从 Kafka 署理(Broker)获取消息。
  • 处理消息:消费者接收到消息后,可以对其进行处理,如业务逻辑操作。
  • 提交偏移量:消费者在处理消息后,提交当前消息的偏移量,表示已经处理过该消息。
1.1.1 消费者工作流程


  • 消费者向 Kafka 署理发送拉取请求。
  • 署理返回符合消费者订阅条件的消息。
  • 消费者处理消息并提交偏移量。
  • 消费者定期发送心跳,以维持其在消费者组中的活跃状态。
1.1.2 消费者的关键配置



  • group.id:指定消费者所属的消费者组。当多个消费者属于同一组时,它们会共享消息的消费。
  • auto.offset.reset:当消费者没有偏移量或偏移量超出范围时,定义从哪里开始消费消息。可以设置为 earliest(从最早的消息开始消费)或 latest(从最新的消息开始消费)。
  • enable.auto.commit:控制是否主动提交消息的偏移量。设置为 false 可以让开发者手动提交偏移量,以控制消息消费的精度。
  • fetch.min.bytes:消费者拉取消息时的最小字节数,确保消息拉取的服从。
  • max.poll.records:每次调用 poll() 方法时,消费者最多拉取的消息数。
示例配置
  1. group.id=my-consumer-group
  2. auto.offset.reset=latest
  3. enable.auto.commit=false
  4. fetch.min.bytes=50000
  5. max.poll.records=10
  6. 0
  7. 0
复制代码
1.2 Kafka 消费者组(Consumer Group)概述

Kafka 中的消费者组是多个消费者共同组成的一个逻辑实体。消费者组的作用是将多个消费者组织在一起,共同消费 Kafka 主题的消息。消费者组的焦点思想是 消息的分区消费,即每个分区内的消息只能由一个消费者处理。
1.2.1 消费者组的工作原理



  • 分区分配:每个消费者组内的每个消费者负责消费一个或多个主题分区。Kafka 使用消费者组的机制来确保每个分区的消息只有一个消费者进行消费。如许,多个消费者可以并行地消费不同分区的消息,从而提高系统的吞吐量。
  • 再均衡:当消费者组中的消费者发生变化(如到场、离开或失败)时,Kafka 会主动进行再均衡,将分区重新分配给现有消费者。
1.2.2 消费者组的优点


  • 扩展性:通过增加消费者,可以横向扩展消费能力,支持高吞吐量的消息消费。
  • 负载均衡:多个消费者之间按分区分配负载,制止了某个消费者过载。
  • 容错性:假如某个消费者挂掉,Kafka 会触发再均衡,其他消费者会接受该消费者的任务,保证消息消费的高可用性。
1.2.3 消费者组的再均衡

当消费者组成员发生变化时(如消费者到场或退出),Kafka 会主动进行再均衡。在再均衡过程中,分区会被重新分配给消费者,这会导致消费延迟的增加。
再均衡的触发条件


  • 新消费者到场消费者组。
  • 消费者退出消费者组(正常或非常退出)。
  • 分区数目变化(如增加或减少分区)。
  • 负载不均衡,导致重新分配分区。
1.2.4 消费者组的关键配置



  • group.id:指定消费者组的 ID。Kafka 使用消费者组 ID 来标识一个消费者组。
  • partition.assignment.strategy:分区分配策略,Kafka 支持两种策略:Range 和 RoundRobin。

    • Range:根据分区的次序分配,适用于消费者数和分区数相称的情况。
    • RoundRobin:将分区平均分配给消费者,适用于消费者数少于分区数的情况。

示例配置
  1. group.id=my-consumer-group
  2. partition.assignment.strategy=roundrobin
复制代码
1.3 消费者与消费者组的关系

1.3.1 单消费者与消费者组



  • 单消费者:当只有一个消费者时,它会消费所有分区的消息。此时,消费者不需要到场消费者组,因为只有一个消费者会消费所有消息。
  • 消费者组:消费者组答应多个消费者共享分区的消费工作。每个消费者组内的每个消费者会处理不同的分区,制止重复消费。
1.3.2 消费者组的偏移量管理



  • 每个消费者组都有独立的 偏移量,Kafka 会为每个消费者组存储偏移量信息。
  • 消费者每次拉取消息后,都会更新自己的消费偏移量。偏移量生存在 Kafka 的内部主题 __consumer_offsets 中。
偏移量存储与恢复


  • 默认情况下,Kafka 主动管理偏移量的存储和恢复。消费者组的偏移量会在每次消费完成后主动提交,或者开发者可以手动提交偏移量。
  • 手动提交偏移量:可以通过 commitSync() 或 commitAsync() 方法来手动提交偏移量。
1.3.3 消费者组的再均衡与负载均衡



  • 当消费者组内的消费者数目发生变化时(如某个消费者失效或新的消费者到场),Kafka 会触发再均衡,将分区重新分配给其他消费者。
  • 通过公道设置消费者组的巨细,可以实现负载均衡,确保每个消费者的工作量相对均衡。
1.4 消费者组与分区的关系



  • Kafka 中的每个分区只能被一个消费者组中的一个消费者消费。这意味着假如你有多个消费者,它们将会共享分区的消费工作,制止了重复消费。
  • 假如消费者组内的消费者数目少于主题的分区数,某些消费者将会消费多个分区。反之,假如消费者组内的消费者数目多于分区数,某些消费者将会闲置,不参与消息的消费。
二、kafka消费者客户端开发

Kafka 消费者客户端是用于从 Kafka 集群中的 Topic 消费消息的应用程序。消费者从 Topic 或者指定的分区拉取消息,处理消息后,可以选择提交偏移量,纪录它消费到的位置。Kafka 消费者客户端开发通常使用 Kafka 提供的 Java 客户端 API。以下将详细先容如何开发 Kafka 消费者客户端,包括基本的配置、消费模式、偏移量管理、性能调优等方面。
2.1 Kafka 消费者客户端开发基础

Kafka 消费者客户端需要具备以下功能:


  • 毗连 Kafka 集群:配置 Kafka 服务器和消费者组。
  • 订阅 Topic 或 分区:消费者可以订阅一个或多个 Topic,或者指定某个分区进行消费。
  • 拉取消息:使用 poll() 方法从 Kafka 中拉取消息。
  • 消息处理:处理消费到的消息。
  • 提交偏移量:控制消息消费的进度,确保消息的可靠处理。
  • 容错性和负载均衡:处理消费者崩溃和分区再平衡。
Kafka 消费者客户端开发的焦点 API:



  • KafkaConsumer:用于毗连 Kafka 集群并消费消息。
  • ConsumerConfig:配置消费者的各种属性。
  • Poll():拉取消息的重要方法。
  • commitSync() / commitAsync():提交偏移量的操作方法。
2.2 Kafka 消费者客户端配置

开发消费者时,需要为消费者设置一系列配置参数。重要配置项包括 Kafka 集群的地点、消费者组 ID、反序列化方式、主动提交偏移量策略等。
示例:Kafka 消费者客户端配置

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.KafkaConsumer;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import java.util.Properties;
  5. public class KafkaConsumerExample {
  6.     public static void main(String[] args) {
  7.         // 创建消费者配置
  8.         Properties properties = new Properties();
  9.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 集群地址
  10.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组 ID
  11.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息 key 反序列化
  12.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息 value 反序列化
  13.         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 如果没有偏移量,则从最早的消息开始消费
  14.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量
  15.         // 创建消费者实例
  16.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  17.         
  18.         // 订阅 Topic
  19.         consumer.subscribe(List.of("test-topic"));
  20.         // 拉取和消费消息
  21.         while (true) {
  22.             var records = consumer.poll(1000); // 每次拉取 1000 毫秒
  23.             for (var record : records) {
  24.                 System.out.println("Consumed: " + record.value());
  25.             }
  26.         }
  27.     }
  28. }
复制代码
常用消费者配置项:


  • bootstrap.servers:指定 Kafka 集群的地点,消费者需要与 Kafka Broker 建立毗连。
  • group.id:消费者组的 ID,一个消费者组内的多个消费者会共同消费同一个 Topic。
  • auto.offset.reset:指定消费者从哪里开始消费。当消费者第一次订阅 Topic 或者偏移量不存在时,Kafka 会根据该参数决定从哪里开始消费,可以设置为:

    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费。

  • enable.auto.commit:是否主动提交偏移量。默认值为 true,表示消费者主动提交偏移量。
三、Kafka消费者关键参数

Kafka 消费者的配置参数影响其举动、性能和容错能力。理解这些关键参数对于高效且稳定的消费者操作至关告急。以下是 Kafka 消费者的一些关键配置参数的详细先容及其作用。
3.1 bootstrap.servers



  • 作用:指定 Kafka 集群的地点列表(主机名和端口),用于初次毗连到 Kafka 集群。消费者会通过这些地点发现集群中的所有署理(Broker)。
  • 类型:String(逗号分隔的多个 broker 地点)
  • 默认值:无
  • 示例
  1. bootstrap.servers=localhost:9092
复制代码
3.2 group.id



  • 作用:消费者所属的消费组 ID。Kafka 将消费者分配到消费组中,每个消费组负责消费 Kafka 中的分区。多个消费者共享一个消费组 ID 时,Kafka 会将这些消费者匀称地分配到不同的分区。一个分区只能由同一消费组中的一个消费者处理。
  • 类型:String
  • 默认值:无(必须指定)
  • 示例
  1. group.id=test-group
复制代码
3.3 key.deserializer 和 value.deserializer



  • 作用:指定如何反序列化消费者接收到的消息的键和值。key.deserializer 负责将消息的键反序列化为指定类型,value.deserializer 则负责将消息的值反序列化为指定类型。Kafka 提供了多个内置的反序列化器,如 StringDeserializer、IntegerDeserializer、ByteArrayDeserializer 等。
  • 类型:String(类名)
  • 默认值:无(必须指定)
  • 示例
  1. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  2. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
复制代码
3.4 enable.auto.commit



  • 作用:控制消费者是否启用主动提交偏移量。当 enable.auto.commit 设置为 true 时,消费者会定期主动提交它的偏移量;假如设置为 false,消费者则需要手动提交偏移量。
  • 类型:Boolean
  • 默认值:true
  • 示例
  1. enable.auto.commit=false
复制代码


  • 留意:假如设置为 false,需要手动调用 commitSync() 或 commitAsync() 来提交偏移量。
3.5 auto.commit.interval.ms



  • 作用:假如 enable.auto.commit 设置为 true,该参数指定主动提交偏移量的时间隔断(毫秒)。消费者每隔这个时间隔断主动提交一次偏移量。
  • 类型:Long
  • 默认值:5000(5秒)
  • 示例
  1. auto.commit.interval.ms=1000
复制代码
3.6 auto.offset.reset



  • 作用:指定消费者在没有初始偏移量或者偏移量超出范围时,如那边理。

    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费(默认值)。
    • none:假如没有初始偏移量或超出范围,会抛出非常。

  • 类型:String
  • 默认值:latest
  • 示例
  1. auto.offset.reset=earliest
复制代码


  • 留意:假如消费者第一次启动而且没有已提交的偏移量,Kafka 会使用这个参数来决定从哪个偏移量开始消费。
3.7 max.poll.records



  • 作用:消费者每次调用 poll() 时,最多可以拉取的消息数目。此参数控制一次 poll() 操作返回的最大消息数。假如消息队列中有更多消息,消费者将会多次调用 poll()。
  • 类型:Integer
  • 默认值:500
  • 示例
  1. max.poll.records=10
  2. 0
复制代码


  • 留意:减少 max.poll.records 可以减少每次 poll() 拉取的消息数目,从而降低单次处理的压力,适用于处理时间较长的情况。
3.8 session.timeout.ms



  • 作用:消费者与 Kafka 署理之间的会话超时时间。假如在此时间内,消费者没有发送心跳,Kafka 会以为消费者失效,并将其从消费组中移除。消费者需要定期发送心跳来维持毗连。
  • 类型:Long
  • 默认值:10000(10秒)
  • 示例
  1. session.timeout.ms=10000
复制代码


  • 留意:假如 session.timeout.ms 设置得太短,消费者可能会因为处理消息较慢而被误判为失效,导致频仍的消费者重新均衡。
3.9 heartbeat.interval.ms



  • 作用:消费者发送心跳的时间隔断。假如 heartbeat.interval.ms 设置得过短,可能会导致过多的网络请求;假如设置得过长,可能会导致消费者失效检测不实时。
  • 类型:Long
  • 默认值:3000(3秒)
  • 示例
  1. heartbeat.interval.ms=3000
复制代码


  • 留意:应确保 heartbeat.interval.ms 小于 session.timeout.ms,否则消费者可能无法实时响应心跳。
3.10 fetch.min.bytes



  • 作用:指定消费者从服务器拉取消息时,最小返回的数据量。假如 fetch.min.bytes 设置得比力大,消费者会等候,直到 Kafka 服务器返回至少 fetch.min.bytes 字节的数据,制止频仍拉取小的数据包。
  • 类型:Long
  • 默认值:1
  • 示例
  1. fetch.min.bytes=50000
复制代码


  • 留意:此参数的配置可以减少网络请求的次数,提高吞吐量,但也可能增加延迟。
3.11 fetch.max.wait.ms



  • 作用:指定消费者拉取数据时的最大等候时间。假如服务器在此时间内没有足够的数据返回,消费者会返回空数据。这通常与 fetch.min.bytes 共同使用。
  • 类型:Long
  • 默认值:500
  • 示例
  1. fetch.max.wait.ms=1000
复制代码


  • 留意:在延迟敏感的场景下,设置较低的 fetch.max.wait.ms 有助于减少等候时间。
3.12 client.id



  • 作用:指定客户端的标识符。Kafka 用此 ID 来识别不同的消费者实例。客户端 ID 用于日记纪录、监控等操作。
  • 类型:String
  • 默认值:无(可以指定)
  • 示例
  1. client.id=consumer-client-1
复制代码


  • 留意:假如在多个消费者应用中使用雷同的 client.id,它们将共享雷同的标识符。
3.13 max.poll.interval.ms



  • 作用:指定消费者在两次调用 poll() 之间可以答应的最大隔断时间。若超时,消费者会被以为已死,消费者组会重新分配该消费者负责的分区。
  • 类型:Long
  • 默认值:300000(5分钟)
  • 示例
  1. max.poll.interval.ms=600000
复制代码


  • 留意:该参数与 max.poll.records 共同使用,限定了每次消费的时间,防止消费者处理消息过慢。
3.14 partition.assignment.strategy



  • 作用:指定消费者如何分配分区给消费者实例。可选的策略包括:

    • org.apache.kafka.clients.consumer.RangeAssignor:将分区按次序分配给消费者。
    • org.apache.kafka.clients.consumer.RoundRobinAssignor:轮询方式分配分区给消费者。

  • 类型:String
  • 默认值:RangeAssignor
  • 示例
  1. partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
复制代码
3.15 isolation.level



  • 作用:指定消费者读取消息时的隔离级别。

    • read_committed:只读取已提交的消息(适用于事件消息)。
    • read_uncommitted:可以读取未提交的消息(默认值)。

  • 类型:String
  • 默认值:read_uncommitted
  • 示例
  1. isolation.level=read_committed
复制代码


  • 留意:在启用 Kafka 事件时,使用 read_committed 可以确保消费者只消费提交的消息。
参数作用示例bootstrap.servers指定 Kafka 集群地点localhost:9092group.id消费者组 IDtest-groupkey.deserializer消息键的反序列化器StringDeserializervalue.deserializer消息值的反序列化器StringDeserializerenable.auto.commit是否启用主动提交偏移量falseauto.offset.reset无偏移量时的偏移量重置策略earliestmax.poll.records每次拉取的最大纪录数100session.timeout.ms消费者会话超时值10000heartbeat.interval.ms消费者心跳发送隔断3000fetch.min.bytes拉取消息时的最小数据量50000fetch.max.wait.ms最大等候时间1000client.id消费者客户端 IDconsumer-client-1max.poll.interval.ms两次 poll() 之间的最大时间隔断600000partition.assignment.strategy分区分配策略RoundRobinAssignorisolation.level消费者读取消息的隔离级别read_committed 这些参数控制了消费者的各种举动,适当调解这些参数,可以帮助你根据实际场景优化 Kafka 消费者的性能、可靠性和容错能力。
四、Kafka 反序列化

Kafka 的反序列化是将消息从字节数组(byte[])转回为原始对象的过程。Kafka 消息是以字节数组的形式存储的,因此消费者在接收到消息时,需要将字节数组转化为相应的对象,才能进行进一步处理。Kafka 提供了多种反序列化器,可以根据消息格式选择合适的反序列化器。
4.1 反序列化的基本概念

反序列化是将存储在 Kafka 中的字节数据恢复为原始数据结构的过程。每条消息由两部门组成:keyvalue,两者都需要被反序列化。
Kafka 的反序列化器 (Deserializer) 是负责这个转换的类,它将从 Kafka 中获取的字节数组转换为 Java 对象。
4.2 Kafka 反序列化器接口

Kafka 提供了 org.apache.kafka.common.serialization.Deserializer 接口,该接口定义了反序列化的焦点方法:
  1. public interface Deserializer<T> {
  2.     T deserialize(String topic, byte[] data);
  3. }
复制代码
deserialize 方法的参数:


  • topic:主题名称,用于标识数据的泉源(固然该参数在反序列化过程中不常用,但偶然可以通过它做一些特殊的处理)。
  • data:从 Kafka 中读取的字节数组,需要反序列化为目标对象。
Kafka 提供了几个常用的反序列化器来将字节数组转换为常见数据类型:


  • StringDeserializer:将字节数组反序列化为字符串。
  • IntegerDeserializer:将字节数组反序列化为整数。
  • LongDeserializer:将字节数组反序列化为长整型。
  • ByteArrayDeserializer:将字节数组反序列化为字节数组。
  • Kafka Avro Deserializer:将字节数组反序列化为 Avro 格式的数据。
4.3 常用的 Kafka 反序列化器

4.3.1 StringDeserializer

StringDeserializer 将字节数组转换为字符串。


  • 用法示例
  1. import org.apache.kafka.common.serialization.StringDeserializer;
  2. Properties properties = new Properties();
  3. properties.put("bootstrap.servers", "localhost:9092");
  4. properties.put("group.id", "test-group");
  5. properties.put("key.deserializer", StringDeserializer.class.getName());
  6. properties.put("value.deserializer", StringDeserializer.class.getName());
  7. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  8. consumer.subscribe(Collections.singletonList("test-topic"));
  9. while (true) {
  10.     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  11.     for (ConsumerRecord<String, String> record : records) {
  12.         System.out.println("Key: " + record.key() + ", Value: " + record.value());
  13.     }
  14. }
复制代码
在上述代码中,StringDeserializer 将 key 和 value 从字节数组反序列化为字符串。
4.3.2 IntegerDeserializer

IntegerDeserializer 将字节数组转换为整数。


  • 用法示例
  1. import org.apache.kafka.common.serialization.IntegerDeserializer;
  2. Properties properties = new Properties();
  3. properties.put("bootstrap.servers", "localhost:9092");
  4. properties.put("group.id", "test-group");
  5. properties.put("key.deserializer", IntegerDeserializer.class.getName());
  6. properties.put("value.deserializer", IntegerDeserializer.class.getName());
  7. KafkaConsumer<Integer, Integer> consumer = new KafkaConsumer<>(properties);
  8. consumer.subscribe(Collections.singletonList("test-topic"));
  9. while (true) {
  10.     ConsumerRecords<Integer, Integer> records = consumer.poll(Duration.ofMillis(1000));
  11.     for (ConsumerRecord<Integer, Integer> record : records) {
  12.         System.out.println("Key: " + record.key() + ", Value: " + record.value());
  13.     }
  14. }
复制代码
IntegerDeserializer 将 key 和 value 从字节数组反序列化为整数。
4.3.3 ByteArrayDeserializer

ByteArrayDeserializer 将字节数组转换为字节数组。这个反序列化器通常用于处理原始的字节数据或二进制消息。


  • 用法示例
  1. import org.apache.kafka.common.serialization.ByteArrayDeserializer;
  2. Properties properties = new Properties();
  3. properties.put("bootstrap.servers", "localhost:9092");
  4. properties.put("group.id", "test-group");
  5. properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
  6. properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
  7. KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
  8. consumer.subscribe(Collections.singletonList("test-topic"));
  9. while (true) {
  10.     ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
  11.     for (ConsumerRecord<byte[], byte[]> record : records) {
  12.         System.out.println("Key: " + Arrays.toString(record.key()) + ", Value: " + Arrays.toString(record.value()));
  13.     }
  14. }
复制代码
ByteArrayDeserializer 将 key 和 value 作为字节数组处理。
4.3.4 Avro 反序列化

Avro 是一种常见的序列化格式,尤其在使用 Schema Registry 时。使用 Avro 反序列化时,我们通常使用 Kafka Avro Deserializer。
起首,需要添加 Kafka Avro 相干的依赖:
  1. <dependency>
  2.     <groupId>io.confluent</groupId>
  3.     <artifactId>kafka-avro-serializer</artifactId>
  4.     <version>7.0.1</version>
  5. </dependency>
复制代码
接着,可以使用 Avro 反序列化器:


  • 用法示例
  1. import io.confluent.kafka.serializers.KafkaAvroDeserializer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. Properties properties = new Properties();
  4. properties.put("bootstrap.servers", "localhost:9092");
  5. properties.put("group.id", "test-group");
  6. properties.put("key.deserializer", StringDeserializer.class.getName());
  7. properties.put("value.deserializer", KafkaAvroDeserializer.class.getName());
  8. properties.put("schema.registry.url", "http://localhost:8081");
  9. KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
  10. consumer.subscribe(Collections.singletonList("avro-topic"));
  11. while (true) {
  12.     ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
  13.     for (ConsumerRecord<String, GenericRecord> record : records) {
  14.         System.out.println("Key: " + record.key() + ", Value: " + record.value());
  15.     }
  16. }
复制代码
在这个例子中,消费者将 value 从 Avro 格式反序列化为 GenericRecord 类型的对象,key 仍旧是字符串类型。
4.4 自定义反序列化器

在实际应用中,可能需要处理自定义的数据格式。在这种情况下,可以自定义反序列化器。自定义反序列化器需要实现 Deserializer 接口。
自定义反序列化器示例:将字节数组转换为自定义对象
假设我们有一个简单的类 Person,包罗 name 和 age 字段:
  1. public class Person {
  2.     private String name;
  3.     private int age;
  4.     // Getters, setters, and constructor
  5. }
复制代码
可以编写一个自定义的反序列化器:
  1. import org.apache.kafka.common.serialization.Deserializer;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. public class PersonDeserializer implements Deserializer<Person> {
  4.     private ObjectMapper objectMapper = new ObjectMapper();
  5.     @Override
  6.     public Person deserialize(String topic, byte[] data) {
  7.         try {
  8.             return objectMapper.readValue(data, Person.class);
  9.         } catch (Exception e) {
  10.             throw new RuntimeException("Error deserializing Person", e);
  11.         }
  12.     }
  13. }
复制代码
使用 PersonDeserializer 的 Kafka 消费者示例:
  1. Properties properties = new Properties();
  2. properties.put("bootstrap.servers", "localhost:9092");
  3. properties.put("group.id", "test-group");
  4. properties.put("key.deserializer", StringDeserializer.class.getName());
  5. properties.put("value.deserializer", PersonDeserializer.class.getName());
  6. KafkaConsumer<String, Person> consumer = new KafkaConsumer<>(properties);
  7. consumer.subscribe(Collections.singletonList("person-topic"));
  8. while (true) {
  9.     ConsumerRecords<String, Person> records = consumer.poll(Duration.ofMillis(1000));
  10.     for (ConsumerRecord<String, Person> record : records) {
  11.         System.out.println("Key: " + record.key() + ", Value: " + record.value().getName());
  12.     }
  13. }
复制代码
在这个例子中,我们自定义了一个 PersonDeserializer,将字节数组反序列化为 Person 对象。
五、Kafka 消费者的消费模式

5.1 拉取模式(Pull Model)

Kafka 消费者的拉取模式是消费者通过调用 poll() 方法主动从 Kafka 集群拉取消息。这种模式是 Kafka 消费者的重要工作方式。在拉取模式下,消费者会根据自己的需求定期向 Kafka 请求消息。消费者每次调用 poll() 方法时,Kafka 集群会返回符合条件的消息,或者假如没有新消息,消费者会等候或返回空结果。
拉取模式的关键点:


  • 主动拉取:消费者主动拉取消息,而不是 Kafka 推送消息。
  • 批量拉取:消费者可以批量拉取消息,提高消费服从。
  • 壅闭和超时:poll() 方法会壅闭直到有新消息或者超时。
5.1 拉取模式的基本流程

在 Kafka 中,消费者使用拉取模式来获取消息。消费者向 Kafka 请求消息后,会返回一个 ConsumerRecords 对象,其中包罗了从指定主题和分区拉取的消息。消费者可以通过 poll() 方法指定等候消息的时间,通常会设置一个最大等候时间。
拉取模式流程概述

  • 消费者配置:消费者需要设置 bootstrap.servers、group.id 等基本配置。
  • 订阅主题:消费者通过 subscribe() 方法订阅一个或多个主题。
  • 拉取消息:通过 poll() 方法拉取消息,消费者处理这些消息。
  • 提交偏移量:消息消费完成后,消费者提交已消费的消息的偏移量。
5.2 poll() 方法的举动

poll() 方法是 Kafka 消费者的焦点方法,负责从 Kafka 拉取消息。它的基本署名如下:
  1. ConsumerRecords<K, V> poll(Duration timeout)
复制代码


  • timeout:指定最多等候的时间。假如在这段时间内没有消息,poll() 会返回空的 ConsumerRecords。假如有消息,则会尽可能多地返回消息,直到最大拉取限定为止。
  • 返回值:ConsumerRecords 是一个包罗多个消息纪录的集合,每个纪录都有 key、value、offset 等信息。
poll() 方法的举动非常告急,它决定了消费者从 Kafka 拉取消息的频率和策略。你可以控制 poll() 的最大等候时间,以顺应不同的消费需求。
5.3 拉取模式示例

下面的示例展示了一个典型的 Kafka 消费者使用拉取模式消费消息的流程。
示例:基本的 Kafka 消费者拉取模式
  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.util.Collections;
  4. import java.util.Properties;
  5. public class SimpleConsumer {
  6.     public static void main(String[] args) {
  7.         // 配置消费者
  8.         Properties properties = new Properties();
  9.         properties.put("bootstrap.servers", "localhost:9092");
  10.         properties.put("group.id", "test-group");
  11.         properties.put("key.deserializer", StringDeserializer.class.getName());
  12.         properties.put("value.deserializer", StringDeserializer.class.getName());
  13.         // 创建 KafkaConsumer 实例
  14.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  15.         
  16.         // 订阅主题
  17.         consumer.subscribe(Collections.singletonList("test-topic"));
  18.         // 持续拉取消息
  19.         while (true) {
  20.             // 调用 poll 方法,设置最大等待时间为 1000 毫秒
  21.             var records = consumer.poll(java.time.Duration.ofMillis(1000));
  22.             
  23.             // 处理拉取到的消息
  24.             records.forEach(record -> {
  25.                 System.out.println("Consumed message: Key = " + record.key() + ", Value = " + record.value());
  26.             });
  27.         }
  28.     }
  29. }
复制代码
说明:


  • subscribe():消费者订阅 test-topic 主题。
  • poll():消费者每次调用 poll() 方法,最多等候 1000 毫秒。假如有新消息,poll() 会返回消息。假如没有消息,poll() 会返回一个空的 ConsumerRecords 对象。
  • forEach:遍历并处理每一条拉取到的消息。
5.4. 高级配置选项

5.4.1 max.poll.records



  • 作用:max.poll.records 控制每次 poll() 调用返回的最大纪录数。默认情况下,Kafka 消费者每次 poll() 调用返回的消息数目是没有限定的。
  • 设置示例
  1. Properties properties = new Properties();
  2. properties.put("bootstrap.servers", "localhost:9092");
  3. properties.put("group.id", "test-group");
  4. properties.put("key.deserializer", StringDeserializer.class.getName());
  5. properties.put("value.deserializer", StringDeserializer.class.getName());
  6. properties.put("max.poll.records", "10");  // 每次拉取最多 10 条消息
  7. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  8. consumer.subscribe(Collections.singletonList("test-topic"));
  9. while (true) {
  10.     var records = consumer.poll(java.time.Duration.ofMillis(1000));
  11.     records.forEach(record -> {
  12.         System.out.println("Consumed message: " + record.value());
  13.     });
  14. }
复制代码
在此示例中,消费者每次拉取的最大消息数为 10 条。
5.4.2 fetch.min.bytes 和 fetch.max.bytes



  • fetch.min.bytes:指定 Kafka 消费者每次拉取时,返回的消息字节数的最小值。假如消息字节数不敷,消费者会等候更多消息直到满意该值。
  • fetch.max.bytes:指定 Kafka 消费者每次拉取时,返回的消息字节数的最大值。
  • 设置示例
  1. properties.put("fetch.min.bytes", "1000");  // 拉取至少 1000 字节的数据
  2. properties.put("fetch.max.bytes", "500000");  // 每次最多拉取 500KB 的数据
复制代码
5.6 拉取模式的优化

5.6.1 调解 poll() 超时



  • poll() 超时:poll() 方法的超时时间可以影响消息消费的服从。假如设置太短,可能会导致频仍的网络请求;设置过长,可能会增加消息消费的延迟。
  • 最佳实践:根据消息到达的频率,调解 poll() 的超时,使其在保证低延迟的同时,减少网络请求的频率。
5.6.2 消息批量处理

通过公道设置 max.poll.records,可以在每次拉取时处理更多消息,制止频仍的 poll() 调用,提高消费服从。
5.7 总结

Kafka 消费者的拉取模式是基于主动拉取的方式,通过 poll() 方法从 Kafka 集群拉取消息。消费者可以控制拉取的频率、返回的消息数目以及偏移量的提交方式。公道配置 poll() 方法、消息批量处理以及偏移量管理,有助于提升消息消费的性能和可靠性。
配置项说明默认值max.poll.records每次 poll() 返回的最大纪录数500fetch.min.bytes每次拉取时,返回的最小字节数1fetch.max.bytes每次拉取时,返回的最大字节数50MBenable.auto.commit是否启用主动提交偏移量trueauto.commit.interval.ms主动提交偏移量的隔断时间5000 通过公道的配置和优化,可以提高 Kafka 消费者的消息处理服从,确保高效且可靠的消息消费。
六、Kafka 消费者再均衡

Kafka 中的消费者再均衡(Rebalance)指的是在 Kafka 消费者组中,消费者的分区重新分配的过程。当消费者到场或离开消费者组,或者消费者组中的分区数发生变化时,Kafka 会触发再均衡操作。再均衡过程旨在确保每个消费者能够平衡地消费消息,而且每个分区只会被一个消费者消费。
再均衡的触发条件


  • 新消费者到场消费者组:当一个新的消费者到场消费者组时,Kafka 会重新分配分区,进行再均衡。
  • 消费者离开消费者组:当一个消费者从消费者组中离开(无论是正常退出照旧非常退出),Kafka 会重新分配分区。
  • 分区数目变化:当 Kafka 中的某个主题的分区数目发生变化(例如,添加或删除分区),也会触发再均衡。
6.1 再均衡过程

在 Kafka 中,消费者再均衡是由消费者和谐器(Consumer Coordinator)来管理的。再均衡的过程包括以下几个步调:

  • 消费者启动:消费者启动并到场消费者组。
  • 分区分配:消费者和谐器根据当前消费者组的消费者数目和分区数目,分配每个消费者需要消费的分区。
  • 消费者消费消息:消费者开始消费分配给它的分区中的消息。
  • 触发再均衡:在某些事件(如消费者到场或离开)发生时,消费者和谐器会触发再均衡。
  • 重新分配分区:消费者和谐器通知每个消费者,让它们重新分配分区。每个消费者停止消费而且重新接收分配的分区。
  • 恢复消费:消费者在分配到新的分区后开始消费。
再均衡的过程是由 Kafka 自己管理的,但假如不小心配置,可能会导致一些性能问题,比如频仍的再均衡,进而影响消费的稳定性。
6.2 再均衡的触发机制

6.2.1 消费者离开或到场消费者组

当消费者组中的消费者数目变化时,Kafka 会主动触发再均衡。例如:


  • 消费者到场:当新的消费者到场消费者组时,Kafka 会重新分配分区。
  • 消费者离开:当某个消费者离开消费者组时,Kafka 会重新分配该消费者原来消费的分区。
6.2.2 分区数目变化

当 Kafka 集群中的某个主题的分区数发生变化时,Kafka 也会触发再均衡。
6.2.3 负载均衡

Kafka 会尽量使得每个消费者分配到雷同数目的分区,但当分区数和消费者数不完全匹配时,某些消费者可能会被分配更多的分区。
6.3 再均衡过程中可能存在的问题

6.3.1 再均衡延迟

每次再均衡过程中,消费者会停止消费一段时间,直到新的分区分配完成,这可能会导致一些延迟。这个过程会影响消费的连续性。
6.3.2 分区再分配的次序

再均衡时,假如消费者在某个分区上已经消费了一部门消息(例如,提交了偏移量),那么分配给新消费者的分区可能会导致它从这个分区的某个位置开始消费,从而可能导致消息的重复消费或漏消费。
6.3.3 频仍的再均衡

频仍的消费者到场和离开,或者分区数目频仍变化,可能导致 Kafka 消费者组的再均衡操作频仍发生,进而影响消息消费的稳定性和性能。
6.4 再均衡的优化

为了减少再均衡的频率和延迟,Kafka 提供了一些优化选项。
6.4.1 session.timeout.ms

session.timeout.ms 设置了消费者与 Kafka 消费者和谐器之间的心跳超时时间。假如消费者在这个时间内没有发送心跳,Kafka 会以为该消费者已经失效,并触发再均衡。


  • 默认值是 10 秒。
  • 减小该值会更快地检测消费者失效,但可能增加由于网络延迟等缘故原由触发误判的风险。
6.4.2 max.poll.interval.ms

max.poll.interval.ms 控制消费者处理消息的最大时间。假如消费者在此时间内没有调用 poll() 方法,Kafka 会以为消费者失效,并触发再均衡。


  • 默认值是 5 分钟。
  • 假如消费者处理每条消息的时间过长,可能会触发再均衡。
6.4.3 rebalance.listener

Kafka 答应你通过实现 ConsumerRebalanceListener 接口来控制消费者再均衡的举动。例如,你可以在再均衡前后实行一些操作,如提交偏移量、清算缓存等。


  • onPartitionsRevoked():在分区重新分配之前调用,可以在此进行一些清算工作。
  • onPartitionsAssigned():在分区分配之后调用,可以在此进行初始化工作。
示例:使用 ConsumerRebalanceListener 控制再均衡
  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
  4. import org.apache.kafka.common.TopicPartition;
  5. import java.util.Collections;
  6. import java.util.Properties;
  7. public class ConsumerWithRebalanceListener {
  8.     public static void main(String[] args) {
  9.         Properties properties = new Properties();
  10.         properties.put("bootstrap.servers", "localhost:9092");
  11.         properties.put("group.id", "test-group");
  12.         properties.put("key.deserializer", StringDeserializer.class.getName());
  13.         properties.put("value.deserializer", StringDeserializer.class.getName());
  14.         properties.put("enable.auto.commit", "false");  // 手动提交偏移量
  15.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  16.         consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
  17.             // 分区被撤销时调用
  18.             @Override
  19.             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  20.                 System.out.println("Partitions revoked: " + partitions);
  21.                 // 提交偏移量,防止消息丢失
  22.                 consumer.commitSync();
  23.             }
  24.             // 分区被分配时调用
  25.             @Override
  26.             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  27.                 System.out.println("Partitions assigned: " + partitions);
  28.                 // 可以在此初始化处理逻辑
  29.             }
  30.         });
  31.         while (true) {
  32.             var records = consumer.poll(java.time.Duration.ofMillis(1000));
  33.             records.forEach(record -> {
  34.                 System.out.println("Consumed message: " + record.value());
  35.             });
  36.             consumer.commitSync();
  37.         }
  38.     }
  39. }
复制代码
说明:


  • onPartitionsRevoked():消费者在分区被撤销前调用,可以在这里提交偏移量,确保数据不会丢失。
  • onPartitionsAssigned():消费者在分区被重新分配后调用,在此可以初始化处理逻辑。
6.5 总结

Kafka 消费者的再均衡是为了确保消费者组中的每个消费者都能公平地消费分区。在消费者到场、离开或分区数目变化时,Kafka 会触发再均衡,重新分配分区。尽管再均衡是 Kafka 消费者组的正常举动,但过于频仍的再均衡可能影响消费性能。
配置项说明默认值session.timeout.ms消费者与 Kafka 的心跳超时时间10,000 msmax.poll.interval.ms消费者最大处理时间300,000 msenable.auto.commit是否主动提交偏移量truerebalance.listener自定义再均衡监听器无 通过公道配置和优化消费者的再均衡策略,能够减少不必要的延迟和资源浪费,提高消息消费的稳定性和性能。
七、Kafka消费者订阅主题和分区

在 Kafka 中,消费者通过订阅主题来获取消息。消费者订阅主题时,Kafka 会将该主题的一个或多个分区分配给消费者。Kafka 提供了两种重要的订阅方式:基于主题的订阅基于分区的订阅
7.1 订阅主题(subscribe())

消费者通过调用 subscribe() 方法来订阅一个或多个主题。当消费者订阅了一个主题时,Kafka 会主动将该主题的所有分区分配给消费者。消费者不需要指定具体的分区,Kafka 会在消费者组中均衡分配分区。
7.1.1 subscribe() 方法

subscribe() 方法用于订阅一个或多个主题。消费者根据该方法订阅的主题,会主动分配相应的分区。
  1. consumer.subscribe(Arrays.asList("topic1", "topic2"));
复制代码


  • 多个主题订阅:传入一个主题列表,消费者会订阅多个主题。
  • 主动分区分配:Kafka 会根据消费者组的巨细和每个主题的分区数主动为消费者分配分区。
7.1.2 subscribe() 共同 ConsumerRebalanceListener

可以在订阅时提供一个 ConsumerRebalanceListener 监听器,用于处理再均衡事件,如分区的分配和撤销。
  1. consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {
  2.     @Override
  3.     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  4.         System.out.println("Partitions revoked: " + partitions);
  5.     }
  6.     @Override
  7.     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  8.         System.out.println("Partitions assigned: " + partitions);
  9.     }
  10. });
复制代码


  • onPartitionsRevoked():在分区撤销之前调用,通常用于提交偏移量,制止数据丢失。
  • onPartitionsAssigned():在分区分配之后调用,消费者可以在此初始化相干的资源。
7.1.3 assign() 与 subscribe() 的对比



  • subscribe():消费者订阅主题,Kafka 主动将分区分配给消费者。适用于大多数常见场景。
  • assign():消费者直接指定分区进行消费,不需要 Kafka 进行主动分配。适用于某些特殊的消费需求,比如需要手动控制分配的情况。
7.2 订阅分区(assign())

偶然,消费者可能需要手动指定具体的分区进行消费,而不是让 Kafka 主动分配。这可以通过 assign() 方法实现。与 subscribe() 方法不同,assign() 方法直接指定分区,不会触发消费者组的再均衡操作。
7.2.1 assign() 方法

消费者通过 assign() 方法指定一个或多个分区进行消费。此时,消费者会直接从指定的分区拉取消息。
  1. List<TopicPartition> partitions = Arrays.asList(
  2.     new TopicPartition("topic1", 0),
  3.     new TopicPartition("topic2", 1)
  4. );
  5. consumer.assign(partitions);
复制代码


  • 手动指定分区:消费者指定了特定的分区,不会由 Kafka 进行主动分配。
  • 没有再均衡:使用 assign() 时,消费者不会参与消费者组的再均衡,消费者手动管理分区。
7.2.2 assign() 与 subscribe() 的区别



  • subscribe():Kafka 会主动根据消费者组的巨细、分区数等条件分配分区,而且会触发再均衡。
  • assign():消费者手动指定具体的分区,Kafka 不会进行分配,而且不会触发再均衡。
assign() 适用于某些特殊场景,例如,消费者需要处理特定分区的消息或在某些情况下制止分区的动态调解。
7.3 消费者订阅主题与分区的工作流程

7.3.1 基于 subscribe() 的工作流程


  • 消费者订阅主题:消费者调用 subscribe() 方法,指定需要消费的主题。
  • 分区分配:Kafka 会为每个主题的每个分区分配一个消费者。消费者可以消费多个主题的多个分区。
  • 处理消息:消费者开始从分配给它的分区中拉取消息进行处理。
  • 分区再分配(再均衡):当消费者到场、离开或分区数发生变化时,Kafka 会触发再均衡,重新分配分区。
7.3.2 基于 assign() 的工作流程


  • 消费者手动指定分区:消费者通过 assign() 方法明确指定要消费的分区。
  • 消息消费:消费者从指定的分区开始消费消息。消费者不参与消费者组的再均衡,因此需要手动管理每个分区的消费。
  • 无需再均衡:没有消费者到场或离开消费者组,也不会触发再均衡。
7.4 示例:订阅主题与分区

7.4.1 基于 subscribe() 的示例

  1. import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;import java.util.Properties;public class SubscribeExample {    public static void main(String[] args) {        // 配置消费者        Properties properties = new Properties();        properties.put("bootstrap.servers", "localhost:9092");        properties.put("group.id", "test-group");        properties.put("key.deserializer", StringDeserializer.class.getName());        properties.put("value.deserializer", StringDeserializer.class.getName());        // 创建 KafkaConsumer 实例        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);                // 订阅多个主题        consumer.subscribe(Arrays.asList("topic1", "topic2"));
  2.         // 拉取消息        while (true) {            var records = consumer.poll(java.time.Duration.ofMillis(1000));            records.forEach(record -> {                System.out.println("Consumed message: " + record.value());            });        }    }}
复制代码


  • subscribe(Arrays.asList("topic1", "topic2")):消费者订阅了 topic1 和 topic2 两个主题,Kafka 会主动为这两个主题分配分区。
  • 消费者从分配给它的分区拉取消息并进行消费。
7.4.2 基于 assign() 的示例

  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.apache.kafka.common.TopicPartition;
  4. import java.util.Arrays;
  5. import java.util.List;
  6. import java.util.Properties;
  7. public class AssignExample {
  8.     public static void main(String[] args) {
  9.         // 配置消费者
  10.         Properties properties = new Properties();
  11.         properties.put("bootstrap.servers", "localhost:9092");
  12.         properties.put("group.id", "test-group");
  13.         properties.put("key.deserializer", StringDeserializer.class.getName());
  14.         properties.put("value.deserializer", StringDeserializer.class.getName());
  15.         // 创建 KafkaConsumer 实例
  16.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  17.         // 手动指定消费的分区
  18.         List<TopicPartition> partitions = Arrays.asList(
  19.             new TopicPartition("topic1", 0),
  20.             new TopicPartition("topic2", 1)
  21.         );
  22.         
  23.         consumer.assign(partitions);  // 手动指定分区
  24.         // 拉取消息
  25.         while (true) {
  26.             var records = consumer.poll(java.time.Duration.ofMillis(1000));
  27.             records.forEach(record -> {
  28.                 System.out.println("Consumed message: " + record.value());
  29.             });
  30.         }
  31.     }
  32. }
复制代码


  • assign(partitions):消费者明确指定了要消费的 topic1 分区 0 和 topic2 分区 1。
  • 该消费者将只从指定的分区拉取消息,而不会主动获取其他分区的消息。
7.5 主题订阅和分区订阅对比

订阅方式说明优势使用场景subscribe()通过主题订阅,Kafka 主动分配分区主动分配分区,顺应性强一样平常情况下的主题消费assign()直接指定分区,消费者手动管理分配精致控制分区分配特殊场景,如需要手动控制分区消费

  • subscribe() 适用于大多数常见的消费场景,Kafka 会主动管理分区分配。
  • assign() 适用于特殊场景,如需要精准控制消费者消费哪些分区。
八、Kafka消费者偏移量

在 Kafka 中,偏移量(Offset)是指消费者在某个分区中消费消息的位置。每条消息在 Kafka 中都有一个唯一的偏移量,消费者使用偏移量来纪录它已经消费到哪个位置。当消费者再次启动时,可以从前次提交的偏移量继承消费。
偏移量对于 Kafka 消费者来说至关告急,它答应消费者在消息流中保持同步或恢复消费。Kafka 默认会将每个分区的偏移量存储在 Kafka 集群中的一个特殊的内部主题(__consumer_offsets)中。
8.1 偏移量管理方式

Kafka 提供了两种重要的偏移量管理方式:

  • 主动提交偏移量(默认方式)
  • 手动提交偏移量
消费者可以选择得当自己需求的偏移量管理方式,确保消息处理的可靠性和可追溯性。
8.2 主动提交偏移量(enable.auto.commit)

默认情况下,Kafka 消费者会主动提交偏移量,即每消费一条消息,消费者会主动提交当前偏移量到 Kafka 集群。
8.2.1 配置项



  • enable.auto.commit:是否启用主动提交偏移量。默认值为 true。
  • auto.commit.interval.ms:主动提交偏移量的时间隔断。默认值为 5000ms。
8.2.2 主动提交的工作流程


  • 消费者拉取消息并处理。
  • 每消费完一条消息,消费者会在后台主动提交偏移量,表示已处理的最新消息位置。
  • 假如消费者发生故障或重新启动,它将从前次提交的偏移量继承消费。
8.2.3 主动提交的优缺点



  • 优点

    • 简单易用,不需要额外的配置。
    • 对于不要求高精度消费控制的场景,适用。

  • 缺点

    • 消息丢失:在消费者处理消息时发生故障,可能导致消息未被成功处理却已经提交偏移量,造成丢失。
    • 消息重复消费:假如在提交偏移量之前消费者崩溃,消息会被重新消费,导致重复消费。

8.2.4 主动提交示例

  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.util.Arrays;
  4. import java.util.Properties;
  5. public class AutoCommitExample {
  6.     public static void main(String[] args) {
  7.         // 配置消费者
  8.         Properties properties = new Properties();
  9.         properties.put("bootstrap.servers", "localhost:9092");
  10.         properties.put("group.id", "test-group");
  11.         properties.put("key.deserializer", StringDeserializer.class.getName());
  12.         properties.put("value.deserializer", StringDeserializer.class.getName());
  13.         properties.put("enable.auto.commit", "true");  // 启用自动提交
  14.         properties.put("auto.commit.interval.ms", "1000");  // 提交间隔
  15.         // 创建 KafkaConsumer 实例
  16.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  17.         // 订阅主题
  18.         consumer.subscribe(Arrays.asList("topic1"));
  19.         // 拉取并处理消息
  20.         while (true) {
  21.             var records = consumer.poll(java.time.Duration.ofMillis(1000));
  22.             records.forEach(record -> {
  23.                 System.out.println("Consumed message: " + record.value());
  24.             });
  25.         }
  26.     }
  27. }
复制代码
8.3 手动提交偏移量(enable.auto.commit = false)

手动提交偏移量意味着消费者在处理完一条或多条消息后,明确调用 commitSync() 或 commitAsync() 方法来提交偏移量。
8.3.1 配置项



  • enable.auto.commit:设置为 false,禁用主动提交偏移量。
  • auto.commit.interval.ms:此配置无效,当 enable.auto.commit 为 false 时,消费者需要手动控制提交偏移量。
8.3.2 手动提交的工作流程


  • 消费者拉取消息并处理。
  • 消费者在处理完消息后调用 commitSync() 或 commitAsync() 提交当前偏移量。
  • commitSync() 是同步提交,直到提交成功才会返回,保证偏移量提交成功后才继承消费。
  • commitAsync() 是异步提交,消费者不会等候提交结果,适用于性能要求较高的场景,但需要处理提交失败的非常。
8.3.3 手动提交的优缺点



  • 优点

    • 精确控制消费进度,制止消息丢失或重复消费。
    • 可以确保每条消息都被处理后才提交偏移量,增加了可靠性。

  • 缺点

    • 开发者需要管理偏移量提交的机会,增加了复杂性。

8.3.4 手动提交示例

  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.util.Arrays;
  4. import java.util.Properties;
  5. public class ManualCommitExample {
  6.     public static void main(String[] args) {
  7.         // 配置消费者
  8.         Properties properties = new Properties();
  9.         properties.put("bootstrap.servers", "localhost:9092");
  10.         properties.put("group.id", "test-group");
  11.         properties.put("key.deserializer", StringDeserializer.class.getName());
  12.         properties.put("value.deserializer", StringDeserializer.class.getName());
  13.         properties.put("enable.auto.commit", "false");  // 禁用自动提交
  14.         // 创建 KafkaConsumer 实例
  15.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  16.         // 订阅主题
  17.         consumer.subscribe(Arrays.asList("topic1"));
  18.         // 拉取并处理消息
  19.         while (true) {
  20.             var records = consumer.poll(java.time.Duration.ofMillis(1000));
  21.             records.forEach(record -> {
  22.                 System.out.println("Consumed message: " + record.value());
  23.             });
  24.             // 手动提交偏移量
  25.             consumer.commitSync();  // 同步提交
  26.             // consumer.commitAsync(); // 异步提交
  27.         }
  28.     }
  29. }
复制代码


  • commitSync():同步提交,直到偏移量成功提交后才会返回。
  • commitAsync():异步提交,提交结果由回调处理,不会壅闭消费。
8.3.5 commitSync() 与 commitAsync() 的对比

方法特点优势缺点commitSync()同步提交偏移量提交后确认成功,可以保证准确性可能导致消费壅闭commitAsync()异步提交偏移量不会壅闭消费,性能更高提交失败时需要回调处理 8.4 偏移量的存储与恢复

在 Kafka 中,消费者的偏移量(Offset)纪录了消费者在某个分区上消费的最后一条消息的位置。Kafka 默认将消费者的偏移量存储在一个特殊的内部主题 __consumer_offsets 中。每当消费者提交偏移量时,Kafka 会将该偏移量写入该主题,确保消费者在重新启动时能够恢复消费进度。
偏移量的存储和恢复机制是 Kafka 消费者的告急特性之一,能够保证消费者的高可用性和数据的精确消费。
8.4.1. 偏移量的存储

Kafka 将每个消费者组(由 group.id 标识)在每个分区的偏移量存储在名为 __consumer_offsets 的内部 Kafka 主题中。每个分区对应一个纪录,存储了该分区的最新偏移量、消费者组的元数据、偏移量提交的时间等信息。


  • 每个消费者组在 Kafka 中都有唯一的标识 group.id,Kafka 会为每个消费者组分配一个分区。
  • 每个消费者组的偏移量按分区存储,答应多个消费者组独立地消费雷同的消息。
8.4.1.1 偏移量存储的结构

__consumer_offsets 主题的结构大致如下:


  • group:消费者组的 ID(即 group.id)。
  • topic:主题名称。
  • partition:分区号。
  • offset:消费者在该分区中的偏移量(即消费的最后一条消息的位置信息)。
  • metadata:提交偏移量的元数据,通常为空。
  • timestamp:偏移量提交的时间戳。
Kafka 会定期将消费者的偏移量写入到该主题。
8.4.1.2 偏移量提交的方式



  • 主动提交偏移量(默认方式):消费者主动提交偏移量,提交的频率和时间隔断由 enable.auto.commit 和 auto.commit.interval.ms 配置项控制。
  • 手动提交偏移量:消费者显式调用 commitSync() 或 commitAsync() 方法提交偏移量。手动提交偏移量通常在消息处理成功后进行,确保偏移量的提交与消息消费的成功状态保持一致。
8.4.2 偏移量的恢复

当消费者重启或在某些情况下发生故障时,它需要恢复消费进度,这时就需要使用之前存储在 __consumer_offsets 主题中的偏移量。
8.4.2.1 主动恢复(主动提交偏移量)

假如消费者启用了主动提交(enable.auto.commit = true),Kafka 会在消费者重新启动时主动使用最后一次提交的偏移量恢复消费进度。Kafka 会主动查抄 __consumer_offsets 主题并将消费者恢复到前次提交的偏移量位置。
8.4.2.2 手动恢复(手动提交偏移量)

假如消费者使用手动提交偏移量(enable.auto.commit = false),则消费者可以在每次消费完成后调用 commitSync() 或 commitAsync() 提交偏移量。消费者重新启动时,它会读取 __consumer_offsets 主题,恢复到前次成功提交的偏移量。


  • commitSync():同步提交偏移量,消费者会等候 Kafka 完成偏移量的提交。假如提交失败,消费者会抛出非常并需要处理。
  • commitAsync():异步提交偏移量,消费者不会等候提交结果,提交失败时会通过回调函数处理。
消费者每次拉取消息时,都会查抄 __consumer_offsets 中纪录的偏移量,从而恢复消费进度。
8.4.2.3 偏移量的恢复过程


  • 重启消费者:消费者历程停止并重新启动。
  • 读取偏移量:消费者通过其 group.id 和每个分区号从 __consumer_offsets 主题读取偏移量。
  • 恢复消费进度:消费者根据读取的偏移量恢复消费。具体的偏移量取决于消费者最后提交的状态。
  • 继承消费:消费者从恢复的偏移量开始继承拉取消息。
8.4.2.4 处理偏移量恢复的界限情况

在恢复消费进度时,可能会碰到以下几种情况:


  • 偏移量已过期:假如偏移量过期(即超过 Kafka 集群的 log.retention.ms 配置的时间限定),消费者会发现该偏移量对应的消息已经被删除。此时,消费者通常会回退到最新的有用偏移量或使用 auto.offset.reset 配置项定义的举动(如从最早或最新消息开始消费)。
  • auto.offset.reset 配置项

    • latest:假如没有找到偏移量或偏移量无效,消费者将从最新的消息开始消费。
    • earliest:假如没有找到偏移量或偏移量无效,消费者将从最早的消息开始消费。
    • none:假如没有找到偏移量,消费者会抛出非常。

8.4.2.5 检察偏移量

Kafka 提供了一个命令行工具 kafka-consumer-groups.sh,可以检察消费者组的偏移量信息。这对于调试和监控非常有用。
检察消费者组的偏移量:
  1. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
复制代码
该命令会显示指定消费者组(test-group)的每个分区的当前偏移量、已提交的偏移量以及 lag(延迟)信息。
示例输出:
  1. Group           Topic              Partition  Current Offset  Log End Offset  Lag
  2. test-group      topic1             0          15              20              5
  3. test-group      topic1             1          5               20              15
复制代码


  • Current Offset:消费者当前的偏移量。
  • Log End Offset:该分区的最新消息的偏移量。
  • Lag:消费者的延迟,表示消费者距离最新消息另有多少条消息。
8.4.3 偏移量管理的最佳实践


  • 提交偏移量机会

    • 对于关键数据,推荐在消费完数据并确认处理无误后再提交偏移量。
    • 假如使用手动提交,通常会在消息处理成功之后调用 commitSync() 或 commitAsync() 来提交偏移量。
    • 假如使用主动提交,思量调解 auto.commit.interval.ms 的值来控制提交频率。

  • 偏移量的管理方式

    • 对于高可靠性场景,使用手动提交偏移量,确保只有消息处理成功后才提交偏移量。
    • 对于简单场景,主动提交偏移量可以简化开发,但可能会导致消息丢失或重复消费。

  • 偏移量的重置

    • 假如碰到无法恢复的错误或偏移量丢失,可以使用 auto.offset.reset 配置项来控制偏移量的恢复方式。
    • 在开发中,可以在调试时通过命令行工具检察消费者的偏移量状态,确保正确恢复消费进度。

九、Kafka 消费者多线程场景

在 Kafka 中,消费者通常是单线程工作的,一个消费者实例只能处理一个线程的消息消费任务。然而,在某些场景下,可能需要使用多个线程来并行处理消息以提高消费服从。这时需要特殊留意消息的次序性、线程的管理以及偏移量的管理等问题。
9.1 为什么使用多线程消费 Kafka 消息



  • 提高性能:当 Kafka 集群中有大量消息时,单线程消费可能会成为瓶颈。使用多线程可以提高处理速度。
  • 并行处理:某些处理逻辑可能非常复杂,多个线程可以同时处理消息,缩短总体处理时间。
  • 分区级别并行:Kafka 本身是基于分区的,消费者可以并行消费不同分区的数据,因此可以使用多线程消费不同分区的消息。
9.2 Kafka 消费者多线程的基本原则


  • 每个线程消费独立分区:Kafka 的分区是并行消费的基本单位。一个线程消费一个或多个分区的消息,多个线程消费不同的分区。消费者不应该跨多个线程共享分区的消费。
  • 消费者组(Consumer Group):每个消费者组中的消费者负责消费不同分区的消息。同一个消费者组内的不同消费者可以分配到不同的分区,因此,接纳多线程时,可以在不同线程中创建多个消费者实例,来实现对不同分区的并行消费。
  • 消息次序性:Kafka 确保同一分区内的消息是有次序的,但不同分区之间的消息次序是不可保证的。在多线程消费时,要留意保证每个分区内消息的次序性。
  • 偏移量管理:每个消费者都会跟踪自己消费的偏移量。使用多线程时,通常会为每个线程创建独立的消费者实例,确保每个线程的偏移量管理是独立的。
9.3 Kafka 消费者多线程模式

有两种常见的多线程模式:

  • 每个线程创建独立消费者:为每个线程创建独立的消费者实例,每个消费者消费一个或多个分区。
  • 共享消费者实例:一个线程创建多个消费者实例,共享消息队列,适用于需要控制消费次序的场景。
9.3.1 每个线程创建独立消费者

这种模式下,每个线程创建一个 Kafka 消费者实例,并独立消费某些分区的消息。此时每个消费者负责消费分配给它的分区,确保了每个线程能够并行处理消息。
优点


  • 简单高效,能够使用 Kafka 分区进行并行消费。
  • 每个线程负责独立的消费任务,相互之间不干扰。
缺点


  • 消费者实例的创建和烧毁需要管理,可能增加复杂度。
示例代码:每个线程独立消费者
  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.util.Arrays;
  4. import java.util.Properties;
  5. public class MultiThreadConsumer {
  6.     public static void main(String[] args) {
  7.         // 创建多个消费者线程
  8.         int numThreads = 3;  // 假设有 3 个线程
  9.         for (int i = 0; i < numThreads; i++) {
  10.             new Thread(new ConsumerRunnable(i)).start();
  11.         }
  12.     }
  13.     public static class ConsumerRunnable implements Runnable {
  14.         private final int threadId;
  15.         public ConsumerRunnable(int threadId) {
  16.             this.threadId = threadId;
  17.         }
  18.         @Override
  19.         public void run() {
  20.             Properties properties = new Properties();
  21.             properties.put("bootstrap.servers", "localhost:9092");
  22.             properties.put("group.id", "test-group");
  23.             properties.put("key.deserializer", StringDeserializer.class.getName());
  24.             properties.put("value.deserializer", StringDeserializer.class.getName());
  25.             properties.put("enable.auto.commit", "false"); // 禁用自动提交
  26.             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  27.             consumer.subscribe(Arrays.asList("topic1"));
  28.             while (true) {
  29.                 var records = consumer.poll(java.time.Duration.ofMillis(1000));
  30.                 records.forEach(record -> {
  31.                     System.out.println("Thread " + threadId + " consumed message: " + record.value());
  32.                 });
  33.                 // 手动提交偏移量
  34.                 consumer.commitSync();
  35.             }
  36.         }
  37.     }
  38. }
复制代码


  • 这里使用了 3 个线程,每个线程独立创建一个消费者实例,消费雷同的主题 topic1。
  • 每个线程的消费都是独立的,Kafka 会根据消费者组的负载均衡策略主动分配分区给各个消费者。
9.3.2 共享消费者实例(消息队列)

在共享消费者实例模式下,可以将一个消费者实例的消息拉取到一个共享队列中,然后多个线程从这个队列中获取消息进行并行处理。这种方式需要精确控制消息的分配和消费次序。
优点


  • 控制消费的次序性。
  • 可以共享一个消费者实例,减少 Kafka 消费者实例的数目。
缺点


  • 线程之间需要和谐,可能带来额外的复杂性。
  • 消费者实例的负载均衡不如独立消费者那样灵活。
示例代码:共享消费者实例
  1. import org.apache.kafka.clients.consumer.KafkaConsumer;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.util.Arrays;
  4. import java.util.Properties;
  5. import java.util.concurrent.BlockingQueue;
  6. import java.util.concurrent.LinkedBlockingQueue;
  7. public class SharedConsumerQueue {
  8.     public static void main(String[] args) {
  9.         BlockingQueue<String> queue = new LinkedBlockingQueue<>();
  10.         KafkaConsumer<String, String> consumer = createConsumer();
  11.         // 创建线程池,每个线程从共享队列中获取消息进行处理
  12.         for (int i = 0; i < 3; i++) {
  13.             new Thread(new ConsumerWorker(queue, i)).start();
  14.         }
  15.         // 消费者拉取消息并放入共享队列
  16.         new Thread(() -> {
  17.             while (true) {
  18.                 var records = consumer.poll(java.time.Duration.ofMillis(1000));
  19.                 records.forEach(record -> {
  20.                     try {
  21.                         queue.put(record.value());
  22.                     } catch (InterruptedException e) {
  23.                         e.printStackTrace();
  24.                     }
  25.                 });
  26.                 consumer.commitSync();
  27.             }
  28.         }).start();
  29.     }
  30.     public static class ConsumerWorker implements Runnable {
  31.         private final BlockingQueue<String> queue;
  32.         private final int threadId;
  33.         public ConsumerWorker(BlockingQueue<String> queue, int threadId) {
  34.             this.queue = queue;
  35.             this.threadId = threadId;
  36.         }
  37.         @Override
  38.         public void run() {
  39.             try {
  40.                 while (true) {
  41.                     String message = queue.take();
  42.                     System.out.println("Thread " + threadId + " processing message: " + message);
  43.                     // 在这里处理消息
  44.                 }
  45.             } catch (InterruptedException e) {
  46.                 e.printStackTrace();
  47.             }
  48.         }
  49.     }
  50.     private static KafkaConsumer<String, String> createConsumer() {
  51.         Properties properties = new Properties();
  52.         properties.put("bootstrap.servers", "localhost:9092");
  53.         properties.put("group.id", "test-group");
  54.         properties.put("key.deserializer", StringDeserializer.class.getName());
  55.         properties.put("value.deserializer", StringDeserializer.class.getName());
  56.         properties.put("enable.auto.commit", "false");
  57.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  58.         consumer.subscribe(Arrays.asList("topic1"));
  59.         return consumer;
  60.     }
  61. }
复制代码


  • 这里创建了一个共享的 BlockingQueue,消费者将拉取到的消息放入队列,多个线程从队列中取消息进行并行处理。
  • 每个线程从队列中取出消息并实行处理操作,可以根据需要进行异步或同步处理。
9.4 消费者多线程时的留意事项


  • 偏移量管理

    • 独立消费者实例:每个线程应该有独立的消费者实例,如许偏移量管理就不会出现冲突。
    • 共享消费者实例:假如多个线程共享一个消费者实例,需要特殊留意线程安全和偏移量提交的同步问题。

  • 负载均衡与分区分配

    • Kafka 会根据消费者组中的消费者数目主动分配分区给消费者。若使用多个线程,可以通过启动多个消费者来并行消费不同的分区。
    • 假如在一个线程内共享消费者实例进行消费,那么可以使用消费者的分区分配策略来确保每个分区都被消费。

  • 消息次序性

    • Kafka 保证同一分区内消息的次序性,但不同分区之间的消息次序不保证。在多线程消费时,多个线程处理的分区之间的消息次序可能会打乱,必须在设计中思量这一点。

  • 线程池和非常处理

    • 使用线程池来管理线程,并为每个线程添加非常处理机制,以确保线程的稳定运行。
    • 线程池可以制止频仍创建和烧毁线程,提高性能和资源使用率。

9.5 每个线程独立消费者和共享消费者实例对比

多线程方式优点缺点适用场景每个线程独立消费者简单高效,使用 Kafka 分区进行并行消费需要管理多个消费者实例,代码较为复杂高并发消费,分区级别的并行处理共享消费者实例减少消费者实例数目,适用于有次序需求的场景线程间需要和谐,可能带来额外复杂性消费次序要求较高,资源有限的场景 在 Kafka 消费者多线程设计时,选择得当的策略可以有用提高消费服从,但需要特殊留意消费者实例的管理、偏移量提交和消息次序性等问题。
十、kafka消费者常见问题

Kafka 消费者在实际使用中可能会碰到多种问题。这些问题通常与消费者的配置、偏移量管理、性能优化等方面相干。以下是一些常见的 Kafka 消费者问题及其详细先容,包括具体的办理方案和案例。
10.1 问题:消费者组无法消费消息(消费滞后)

现象:消费者组无法消费新的消息,或者消费速度远远低于生产者的消息速率。
可能缘故原由


  • 消费者与生产者速率不匹配:假如生产者的消息速率高于消费者的消费速率,消费者会出现消费滞后的问题。
  • 分区分配不均:消费者组的消费者数目不敷,或者消费者组中的某些消费者没有被分配到分区,导致部门分区无法消费。
  • 消费处理能力不敷:消费者端的处理能力(例如消息处理时间过长)会导致消费速度降低。
办理方案

  • 增加消费者数目:可以增加消费者的数目,确保每个分区都能有一个消费者进行消费,达到负载均衡。
  • 调解消费者配置:如增加 fetch.min.bytes 和减少 fetch.max.wait.ms 以减少网络请求次数,提升吞吐量。
  • 查抄消费者处理能力:优化消费者端的消息处理逻辑,减少每条消息的处理时间。
示例
假如消息的处理时间过长,可以接纳多线程或异步处理来提高消费服从。例如,在每个消费者线程中异步处理消息:
  1. public class ConsumerWorker implements Runnable {
  2.     private KafkaConsumer<String, String> consumer;
  3.     public ConsumerWorker(KafkaConsumer<String, String> consumer) {
  4.         this.consumer = consumer;
  5.     }
  6.     @Override
  7.     public void run() {
  8.         while (true) {
  9.             ConsumerRecords<String, String> records = consumer.poll(100);
  10.             records.forEach(record -> {
  11.                 // 异步处理消息
  12.                 CompletableFuture.runAsync(() -> processMessage(record));
  13.             });
  14.         }
  15.     }
  16.     private void processMessage(ConsumerRecord<String, String> record) {
  17.         // 处理消息的逻辑
  18.     }
  19. }
复制代码
10.2 问题:消费者偏移量重复消费或丢失

现象:消费者在消费消息时可能会碰到偏移量重复消费或丢失的问题,导致消息的重复消费或者丢失。
可能缘故原由


  • 主动提交偏移量失效:假如消费者未正确配置偏移量主动提交或者手动提交时存在问题,可能导致消息的重复消费或丢失。
  • 消费者崩溃或重启:假如消费者崩溃或重启时未成功提交偏移量,可能会重复消费未提交的消息。
  • 消费者组切换:当消费者组发生变化(例如消费者到场或离开),Kafka 会重新分配分区并重新处理偏移量。
办理方案

  • 关闭主动提交并手动提交偏移量:通过 enable.auto.commit=false
    配置关闭主动提交,并使用 commitSync() 或 commitAsync() 手动提交偏移量,确保只有在消息处理完毕后才提交偏移量。
  • 使用精确的偏移量管理:假如需要精确控制消息的消费进度,可以接纳基于事件的消息处理方案。
示例:手动提交偏移量示例:
  1. consumer.poll(100).forEach(record -> {
  2.     // 处理消息
  3.     consumer.commitSync(); // 提交当前偏移量
  4. });
复制代码
10.3 问题:消费者因 Rebalance(再均衡)导致的消息丢失或重复消费

现象:当消费者组的成员变动时(例如新消费者到场或旧消费者离开),Kafka 会触发消费者组的 再均衡(Rebalancing)。在此期间,消费者可能会丢失正在消费的消息,或者会重复消费已经消费过的消息。
可能缘故原由


  • 消费者在再均衡期间提交偏移量:消费者在实行消息处理时,假如触发了再均衡,可能会导致未处理完的消息丢失。
  • 再均衡过程中的壅闭:再均衡时,假如消费者未能实时完成偏移量提交或消息处理,可能会导致在恢复后从错误的偏移量开始消费。
办理方案

  • 降低再均衡触发的频率:适当增加 session.timeout.ms 和 heartbeat.interval.ms 的时间,减少消费者失效和再均衡的频率。
  • 使用精确的消息处理:在处理消息时,使用手动提交偏移量,确保每条消息的处理完成后才提交偏移量。
  • 使用 Kafka 事件:为消费者启用事件,确保消费者可以在事件失败时回滚。
示例
制止在 poll() 和 commit() 之间做复杂的逻辑处理:
  1. consumer.poll(100).forEach(record -> {
  2.     try {
  3.         // 处理消息
  4.         consumer.commitSync(); // 提交偏移量
  5.     } catch (Exception e) {
  6.         // 处理异常,确保偏移量不丢失
  7.     }
  8. });
复制代码
10.4 问题:消费者读取不到数据(延迟高或空消费)

现象:消费者调用 poll() 时返回为空,或者消息的消费延迟较高。
可能缘故原由


  • auto.offset.reset 配置错误:假如 auto.offset.reset 设置为 latest,且消费者之前没有偏移量纪录,那么消费者将从最新的消息开始消费,导致丢失旧消息。
  • 消息未被生产:假如生产者发送的消息较少或没有消息到达,消费者可能会在短时间内读取不到数据。
  • 消费进度问题:假如消费者的偏移量已经超过了当前的消息,可能导致消费者读取不到消息。
办理方案

  • 查抄 auto.offset.reset 配置:确保 auto.offset.reset 设置为 earliest,以便消费者能够消费所有消息,包括未消费的旧消息。
  • 查抄生产者消息流:确保生产者连续发送消息,制止因生产者停止发送而导致消费者读取不到消息。
  • 调治 fetch.max.wait.ms 和 fetch.min.bytes 参数:通过调治这些参数来减少拉取消息时的延迟。
示例
  1. auto.offset.reset=earliest
复制代码
10.5 问题:消费者无法毗连到 Kafka 集群

现象:消费者启动时,无法毗连到 Kafka 集群,抛出毗连非常。
可能缘故原由


  • bootstrap.servers 配置错误:指定的 Kafka 集群地点或端口错误。
  • 网络问题:消费者地点的机器与 Kafka 署理之间存在网络毗连问题。
  • Kafka 署理不可用:Kafka 集群的某些署理(Broker)不可用,导致消费者无法建立毗连。
办理方案

  • 查抄 bootstrap.servers 配置:确保消费者配置了正确的 Kafka 集群地点,格式为 host:port。
  • 查抄网络毗连:确保消费者的机器与 Kafka 署理之间的网络是通畅的,没有防火墙或端口限定。
  • 查抄 Kafka 署理的状态:确认 Kafka 署理处于正常运行状态,可以使用 Kafka 提供的 kafka-broker-api-versions.sh 等工具查抄。
示例
  1. bootstrap.servers=localhost:9092
  2. ,localhost:9093
复制代码
10.6 问题:消费者消费消息时延迟过高

现象:消费者的消息处理延迟过高,消息消费的时间大大超过了预期。
可能缘故原由


  • 消费处理过程慢:消费者处理单条消息的时间过长,导致无法实时消费下一条消息。
  • 消费者配置不当:例如,max.poll.records 设置得太大,导致每次拉取的消息数目过多,增加了消息处理的延迟。
  • 资源瓶颈:消费者地点的机器 CPU、内存等资源不敷,影响消息消费的速度。
办理方案

  • 优化消费者的消息处理逻辑:减少每条消息的处理时间,可以通过多线程或异步处理等方式提高服从。
  • 调解 max.poll.records 参数:减少每次拉取的消息数目,确保每次 poll() 操作的时间不会过长。
  • 监控消费者的资源斲丧:查抄消费者地点的机器的资源使用情况,优化消费者的硬件配置。
示例
  1. max.poll.records=10
复制代码
十一、Kafka消费者性能调优

Kafka 消费者的性能调优是确保高效消费消息、减少延迟、提升吞吐量的关键步调。通过公道配置消费者的参数,调解处理逻辑和资源配置,可以大大提高 Kafka 消费者的性能。
11.1 消费者配置参数调优

11.1.1 fetch.min.bytes 和 fetch.max.wait.ms



  • fetch.min.bytes:指定消费者拉取数据时的最小字节数。消费者只有在获得至少该数目的数据时才会返回数据。适当增加此值可以减少网络请求的次数,但可能会增加拉取延迟。
  • fetch.max.wait.ms:指定消费者等候拉取数据的最大时间。假如未满意 fetch.min.bytes 条件,消费者将在此时间后返回,纵然数据量不敷。
调优发起


  • 增大 fetch.min.bytes:通过增大此值,可以减少请求次数,提高吞吐量,但会导致延迟增大。
  • 适当增加 fetch.max.wait.ms:制止频仍的拉取请求,提高网络使用率。
示例
  1. fetch.min.bytes=50000
  2.   # 增大每次拉取数据的最小字节数fetch.max.wait.ms=500  # 设置拉取超时时间
复制代码
11.1.2 max.poll.records



  • max.poll.records:该参数指定每次调用 poll() 方法时,消费者一次最多拉取的消息数目。增加此值可以一次性拉取更多的消息,减少请求次数,从而提高吞吐量,但同时也会增加每次消费的处理时间。
调优发起


  • 假如消费者处理速度较快,可以增大该值来提高吞吐量。
  • 假如消息处理逻辑复杂或处理时间较长,则应适当减小此值,制止每次拉取的消息太多,导致消费者壅闭。
示例
  1. max.poll.records=10
  2. 0
  3. 0  # 每次最多拉取 1000 条消息
复制代码
11.1.3 session.timeout.ms 和 heartbeat.interval.ms



  • session.timeout.ms:该参数设置消费者心跳的最大等候时间。假如消费者在此时间内未发送心跳,Kafka 会以为该消费者失效,并启动再均衡(rebalance)过程。过低的 session.timeout.ms 会增加再均衡频率,影响性能。
  • heartbeat.interval.ms:消费者发送心跳的频率。适当调解可以确保消费者的稳定性,并减少不必要的网络负载。
调优发起


  • 增大 session.timeout.ms:减少消费者在重新到场消费者组时的再均衡频率,得当高吞吐量的消费者。
  • 调解 heartbeat.interval.ms:确保心跳发送频率公道,制止不必要的网络开销。
示例
  1. session.timeout.ms=30000  # 增加消费者会话超时,减少再均衡频率
  2. heartbeat.interval.ms=10000  # 设置合理的心跳发送频率
复制代码
11.1.4 auto.offset.reset



  • auto.offset.reset:当消费者没有偏移量或偏移量超出范围时,该参数控制消费者的举动。auto.offset.reset 有两个选项:

    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费。

调优发起


  • earliest:适用于需要重新消费汗青消息的场景,但会增加初次消费时的延迟。
  • latest:适用于仅消费新消息的场景,减少延迟。
示例
  1. auto.offset.reset=earliest
  2.   # 从最早的消息开始消费
复制代码
11.2 消费模式和消息处理逻辑优化

11.2.1 批量处理

批量处理可以大大提高 Kafka 消费者的性能,特殊是在处理大量消息时。消费者可以将消息缓存到内存中,进行批量处理,从而减少处理次数和提升吞吐量。
调优发起


  • 增加每次拉取的消息数目:在处理完一批消息后,批量提交偏移量,减少提交次数。
  • 制止每条消息都单独提交偏移量:可以使用批量提交偏移量(例如每处理 100 条消息提交一次),减少提交的频率。
示例
  1. List<ConsumerRecord<String, String>> records = new ArrayList<>();
  2. while (true) {
  3.     ConsumerRecords<String, String> newRecords = consumer.poll(100);
  4.     for (ConsumerRecord<String, String> record : newRecords) {
  5.         records.add(record);
  6.     }
  7.     if (records.size() >= BATCH_SIZE) {
  8.         processBatch(records);  // 批量处理消息
  9.         consumer.commitSync();  // 提交偏移量
  10.         records.clear();  // 清空缓存
  11.     }
  12. }
复制代码
11.2.2 异步处理

为了提高消费性能,可以将消息处理和偏移量提交操作异步化,使消费者不需要等候每个消息的处理完成,从而提高整体吞吐量。
调优发起


  • 使用线程池或者异步框架来处理消息。
  • 将消息处理和提交操作分离,制止因一个消息的处理壅闭其他消息的消费。
示例
  1. ExecutorService executor = Executors.newFixedThreadPool(10);
  2. while (true) {
  3.     ConsumerRecords<String, String> records = consumer.poll(100);
  4.     for (ConsumerRecord<String, String> record : records) {
  5.         executor.submit(() -> processMessage(record));  // 异步处理消息
  6.     }
  7. }
复制代码
11.3 资源和硬件优化

11.3.1 内存和 CPU

对于高吞吐量的 Kafka 消费者,内存和 CPU 的性能非常关键。在大量消息的消费过程中,公道的内存和 CPU 配置可以显著提高消费者的性能。


  • 增加消费者并发性:使用多线程或多个消费者实例来提高 CPU 焦点的使用率。
  • 内存优化:确保消费者能够处理足够大的批量消息,制止因内存不敷导致的频仍 GC。
调优发起


  • 在多核机器上,可以启动多个消费者线程来并行处理消息。
  • 增加 JVM 堆内存,制止频仍的垃圾采取。
示例
  1. -Xms4g  # 设置 JVM 初始堆内存
  2. -Xmx8g  # 设置最大堆内存
复制代码
11.3.2 Kafka 集群优化

消费者的性能不光受客户端配置的影响,还与 Kafka 集群的配置有关。Kafka 集群的吞吐量、延迟和可用性直接影响消费者的性能。


  • 增加分区数目:在 Kafka 主题中增加分区数目,可以让多个消费者并行消费消息,从而提高吞吐量。
  • 负载均衡:确保 Kafka 集群的各个分区能够匀称分布到各个消费者实例中,制止某些消费者过载。
调优发起


  • 适当增加分区数,根据消费者的数目和吞吐量需求来设置主题的分区数。
  • 通过调解生产者的分区策略,使得消息能够匀称地分布到各个分区中。
11.4 监控和故障排查

11.4.1 消费者监控

通过监控 Kafka 消费者的性能,可以实时发现瓶颈,并进行调优。以下是一些关键的监控指标:


  • consumer-lag:消费滞后,表示消费者未处理的消息数目。较大的 consumer-lag 可能意味着消费者处理速度不敷,或系统负载过高。
  • records-consumed-rate:每秒消费的纪录数,反映消费者的吞吐量。
  • fetch-latency:拉取延迟,表示消费者从 Kafka 署理拉取数据的时间。较高的延迟可能表示消费者配置不当或 Kafka 集群负载过高。
调优发起


  • 监控 consumer-lag 和 fetch-latency 指标,实时调解消费者的配置和处理逻辑。
  • 使用工具如 Prometheus 或 JMX 获取 Kafka 消费者的性能数据。
示例
  1. # 启用 JMX 监控
  2. kafka.consumer.metrics.reporters=org.apache.kafka.common.metrics.JmxReporter
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

水军大提督

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表