ToB企服应用市场:ToB评测及商务社交产业平台

标题: 「Kafka」消费者篇 [打印本页]

作者: 天空闲话    时间: 2024-7-28 08:42
标题: 「Kafka」消费者篇
「Kafka」消费者篇

Kafka 消费方式


Kafka 消费者工作流程

消费者总体工作流程


新版本(0.9之后)的 offset 生存在 kafka 的 Topic 里,持久化到磁盘,可靠性有保障。
老版本(0.9之前)的 offset 生存在 Zookeeper 的 consumers 节点路径下。
为什么转移了呢?假如所有的消费者都把 offset 维护在 Zookeeper 中,那么所有的消费者都必要跟 Zookeeper 进行大量的交互,就会导致网络数据传输非常频繁,压力较大。所以存储在主题里更易于维护管理。
消费者组原理

消费者组



消费者组初始化流程


消费者组详细消费流程



消费者紧张参数



消费者 API

独立消费者案例(订阅主题)


独立消费者案例(订阅分区)


消费者组案例


一个分区的数据只由消费者组中的一个消费者消费。
生产经验—分区的分配以及再均衡


Consumer Leader 就是根据分区分配计谋,制定消费方案。


Range 以及再均衡


Range 分区分配计谋案例
Range 分区分配再均衡案例

RoundRobin 以及再均衡


RoundRobin 分区分配计谋案例
RoundRobin 分区分配再均衡案例

Sticky 以及再均衡

**粘性分区定义:**可以理解为分配的结果带有“粘性的”。即在实行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变更,可以节流大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配计谋,起首会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时间,会尽量保持原有分配的分区不变化

Sticky 分区分配再均衡案例

CooperativeSticky以及再均衡

上述三种分区分配计谋均是基于 eager 协议,Kafka2.4.0开始引入 CooperativeSticky 计谋——在不绝止消费的情况下进行增量再均衡。
CooperativeSticky 与之前的 Sticky 虽然都是维持原来的分区分配方案,最大的区别是:Sticky仍旧是基于 eager 协议,分区重分配时间,都必要 consumers 先放弃当前持有的分区,重新加入consumer group;而 CooperativeSticky 基于 cooperative 协议,该协议将原来的一次全局分区重均衡,改成多次小规模分区重均衡。
比方:一个Topic(T0,三个分区),两个 consumers(consumer1、consumer2) 均订阅 Topic(T0)。
假如consumers订阅信息为:
consumer1T0P0、T0P2consumer2T0P1 此时,新的 consumer3 加入消费者组,那么基于 eager 协议的分区重分配计谋流程:

而基于 cooperative 协议的分区分配计谋的流程:

   参考:Kafka消费者分区分配计谋详解
  该文把这 4 个计谋写的都非常全面。
  offset 位移

offset 的默认维护位置


__consumer_offsets 主题内里采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号 只保留最新数据。
消费 offset 案例


自动提交 offset



消费者自动提交 offset
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.time.Duration;
  6. import java.util.Arrays;
  7. import java.util.Properties;
  8. public class CustomConsumerAutoOffset {
  9.     public static void main(String[] args) {
  10.         // 1. 创建 kafka 消费者配置类
  11.         Properties properties = new Properties();
  12.         // 2. 添加配置参数
  13.         // 添加连接
  14.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  15.         // 配置序列化 必须
  16.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  17.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  18.         // 配置消费者组
  19.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
  20.         // 是否自动提交 offset,默认为true
  21.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  22.         // 提交 offset 的时间周期 1000ms,默认 5s
  23.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  24.         // 3. 创建 kafka 消费者
  25.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  26.         // 4. 设置消费主题 形参是列表
  27.         consumer.subscribe(Arrays.asList("first"));
  28.         // 5. 消费数据
  29.         while (true) {
  30.             // 读取消息
  31.             ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
  32.             // 输出消息
  33.             for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  34.                 System.out.println(consumerRecord.value());
  35.             }
  36.         }
  37.     }
  38. }
复制代码
手动提交 offset


同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于不停等待提交结果,提交的服从比较低。
以下为同步提交 offset 的示例:
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.time.Duration;
  6. import java.util.Arrays;
  7. import java.util.Properties;
  8. public class CustomConsumerByHandSync {
  9.     public static void main(String[] args) {
  10.         // 1. 创建 kafka 消费者配置类
  11.         Properties properties = new Properties();
  12.         // 2. 添加配置参数
  13.         // 添加连接
  14.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  15.         // 配置序列化 必须
  16.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  17.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  18.         // 配置消费者组
  19.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
  20.         // 是否自动提交 offset
  21.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  22.         // 3. 创建 kafka 消费者
  23.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  24.         // 4. 设置消费主题 形参是列表
  25.         consumer.subscribe(Arrays.asList("first"));
  26.         // 5. 消费数据
  27.         while (true) {
  28.             // 读取消息
  29.             ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
  30.             // 输出消息
  31.             for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
  32.                 System.out.println(consumerRecord.value());
  33.             }
  34.             // 同步提交 offset
  35.             consumer.commitSync();
  36.         }
  37.     }
  38. }
复制代码
异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会壅闭当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
以下为异步提交 offset 的示例,更换 KafkaConsumer 调用的 API 即可:
  1. // 异步提交 offset
  2. consumer.commitAsync();
复制代码
指定 offset 消费

auto.offset.reset = earliest | latest | none,默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(比方该数据已被删除),该怎么办?
指定时间消费


重复消费和漏消费



生产经验—消费者事务


生产经验—数据积压(消费者如何进步吞吐量)



生产者进步吞吐量:

消费者进步吞吐量:

在生产环境中合理调整这几个参数,达到最大化吞吐量。

   条记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4