ToB企服应用市场:ToB评测及商务社交产业平台

标题: kafka全解 [打印本页]

作者: 熊熊出没    时间: 2024-10-20 23:09
标题: kafka全解
Kafka概述

定义

Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),重要应用于大数据实时处理范畴。
发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为差别的种别,订阅者只接收感兴趣的消息。
Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键使命应用。

消息队列

目前企业中比较常见的消息队列产物重要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大数据场景重要采用Kafka作为消息队列。在JavaEE开发中重要采用ActiveMQ、RabbitMQ、RocketMQ。


目录结构分析

传统消息队列的应用场景

传统的消息队列的重要应用场景包括**:缓存/消峰、解耦异步通讯**。
缓冲/消峰: 有助于控制和优化数据流颠末体系的速率,解决生产消息和消费消息的处理速率差别等的情况。

解耦:答应你独立的扩展或修改两边的处理过程,只要确保它们服从同样的接口束缚。

异步通讯:答应用户把一个消息放入队列,但并不立刻处理它,然后再需要的时间再行止理它们。

消息队列的两种模式

点对点模式

消费者主动拉去数据,消息收到后扫除消息

发布/订阅模式



Kafka基础架构

1、为方便扩展,并提高吞吐量,一个topic分为多个partition
2、配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
3、为提高可用性,为每个partition增加多少副本,类似NameNode HA
4、ZK中记录谁是leader,Kafka2.8.0 以后也可以设置不采用ZK.


Kafka快速入门

安装部署

集群规划

Hadoop102Hadoop103Hadoop104zkzkzkkafkakafkakafka 集群部署

  1. bin/kafka-server-start.sh -daemon config/server.properties
复制代码
集群启停脚本

  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4.         for i in hadoop102 hadoop103 hadoop104
  5.         do
  6.                 echo " --------启动 $i Kafka-------"
  7.                 ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
  8.         daemon /opt/module/kafka/config/server.properties"
  9.         done
  10. };;
  11. "stop"){
  12.         for i in hadoop102 hadoop103 hadoop104
  13.         do
  14.                 echo " --------停止 $i Kafka-------"
  15.                 ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
  16.         done
  17. };;
  18. esac
复制代码
  1. chmod +x kf.sh
复制代码
  1. kf.sh start
复制代码
  1. kf.sh stop
复制代码
Kafka下令行操纵

Kafka基础架构


主题下令行操纵

生产者下令行操纵

消费者下令行操纵

kafka可视化工具

官网:https://www.kafkatool.com/download.html

Kafka紧张概念

broker


zookeeper


producer(生产者)

生产者负责将数据推送给broker的topic
consumer(消费者)

消费者负责从broker的topic中拉取数据,并自己举行处理
consumer group(消费者组)


分区(Partitions)


在Kafka集群中,主题被分为多个分区
副本(Replicas)


副本可以确保某个服务器出现故障时,确保数据依然可用,在Kafka中,一样平常都会设计副本的个数>1,
主题(Topic)


偏移量(offset)


消费者组

Kafka生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及到了两个线程 — main 线程和Sender线程。在main线程中创建了一个双端队列 RecordAccumulator。main线程将消息发送给ResordAccumlator,Sender线程不绝从 RecordAccumulator 中拉去消息发送到 Kafka Broker。

生产者紧张参数列表


异步发送API

平凡异步发送


2、代码编程go get github.com/Shopify/sarama
  1. func main() {
  2.         config := sarama.NewConfig()
  3.         config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
  4.         config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
  5.         config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
  6.         // 构造一个消息
  7.         msg := &sarama.ProducerMessage{}
  8.         msg.Topic = "first"
  9.         msg.Value = sarama.StringEncoder("this is a test log")
  10.         // 连接kafka
  11.         client, err := sarama.NewSyncProducer([]string{
  12.                 "192.168.71.128:9092", "192.168.71.129:9092", "192.168.71.130:9092",
  13.         }, config)
  14.         if err != nil {
  15.                 fmt.Println("producer closed, err:", err)
  16.                 return
  17.         } else {
  18.                 fmt.Println(client)
  19.         }
  20.         defer client.Close()
  21.         // 发送消息
  22.         pid, offset, err := client.SendMessage(msg)
  23.         if err != nil {
  24.                 fmt.Println("send msg failed, err:", err)
  25.                 return
  26.         }
  27.         fmt.Printf("pid:%v offset:%v\n", pid, offset)
  28. }
复制代码
带回调函数的异步发送


【留意:】消息发送失败会主动重试,不需要我们在回调函数中手动重试。
同步发送API


生产者分区

分区和副本机制

生产者写入消息到topic,Kafka将依据差别的策略将数据分配到差别的分区中
分区利益


轮询策略


随机策略(不消)

随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以根本上很少会使用随机策略。

按key分配策略


按key分配策略,有可能会出现「数据倾斜」,例如:某个key包含了大量的数据,由于key值一样,全部全部的数据将都分配到一个分区中,造成该分区的消息数目远大于其他的分区。
乱序标题

轮询策略、随机策略都会导致一个标题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。由于在其他的Broker上的副本是可用的。
producer的ACKs参数

对副本关系较大的就是,producer设置的acks参数了,acks参数表现当生产者生产消息的时间,写入到副本的要求严酷程度。它决定了生产者如何在性能和可靠性之间做取舍。
acks设置为0


acks设置为1


当生产者的ACK设置为1时,生产者会等候leader副本确认接收后,才会发送下一条数据,性能中等。
acks设置为-1大概all


Kafka生产者幂等性与事件

幂等性

拿http举例来说,一次或多次哀求,得到地响应是同等的(网络超时等标题除外),换句话说,就是实行多次操纵与实行一次操纵的影响是一样的。

假如,某个体系是不具备幂等性的,假如用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在背景天生多个一模一样的订单。
Kafka生产者幂等性


在生产者生产消息时,假如出现retry时,有可能会一条消息被发送了多次,假如Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。
幂等性原理

为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。

Kafka事件


事件操纵API

Producer接口中定义了以下5个事件相干方法:
数据有序和数据乱序


Kafka Broker

Zookeeper存储的Kafka信息

  1. [zk: localhost:2181(CONNECTING) 0] ls /
  2. [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
复制代码

Kafka Broker总体工作流程


Broker紧张参数



Kafka副本

副本根本信息

AR = ISR + OSR
ISR:表现 Leader 保持同步的 Follower 集合。假如 Follower 长时间未 向 Leader 发送通讯哀求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s 。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR:表现 Follower 与 Leader 副本同步时,耽误过多的副本。
Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader ,负责管理集群 broker 的上下线,全部 topic 的分区副本分配 和 Leader 选举等工作。

  1. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
  2. Created topic atguigu1.
复制代码
  1. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
  2. --topic atguigu1
  3. Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
  4. Configs: segment.bytes=1073741824
  5. Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
  6. Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
  7. Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
  8. Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
复制代码
  1. [atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
  2. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
  3. --topic atguigu1
  4. Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
  5. Configs: segment.bytes=1073741824
  6. Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
  7. Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
  8. Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
  9. Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
复制代码
  1. [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
  2. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
  3. --topic atguigu1
  4. Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
  5. Configs: segment.bytes=1073741824
  6. Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
  7. Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
  8. Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
  9. Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
复制代码
  1. [atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
  2. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes=1073741824Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
复制代码
  1. [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
  2. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes=1073741824Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
复制代码
  1. [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
  2. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
  3. --topic atguigu1
  4. Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
  5. Configs: segment.bytes=1073741824
  6. Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
  7. Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
  8. Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
  9. Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2
复制代码
Leader 和 Follower 故障处理细节

LEO(Log End Offset): 每个副本的末了一个offset,LEO其实就是最新的 offset + 1。
HW(High Watermark):全部副本中最小的LEO。

LEOLog End Offset):每个副本的末了一个offset,LEO其实就是最新的offset + 1
HWHigh Watermark):全部副本中最小的LEO

活动调整分区副本存储

在生产环境中,每台服务器的设置和性能差别等,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。全部需要手动调整分区副本的存储。
需求:创建一个新的 topic ,4个分区,两个副本,名称为three 。将该 topic 的全部副本都存储到 broker0 和 broker1 两台服务器上。

手动调整分区副本存储的步骤如下:
  1. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
  2. hadoop102:9092 --create --partitions 4 --replication-factor 2 --
  3. topic three
复制代码
  1. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
  2. hadoop102:9092 --describe --topic three
复制代码
  1. [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
复制代码
输入如下内容:
  1. {
  2.   "version":1,
  3.   "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
  4.   {"topic":"three","partition":1,"replicas":[0,1]},
  5.   {"topic":"three","partition":2,"replicas":[1,0]},
  6.   {"topic":"three","partition":3,"replicas":[1,0]}]
  7. }
复制代码
  1. [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
  2. bootstrap-server hadoop102:9092 --reassignment-json-file
  3. increase-replication-factor.json --execute
复制代码
  1. [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
  2. bootstrap-server hadoop102:9092 --reassignment-json-file
  3. increase-replication-factor.json --verify
复制代码
  1. [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
  2. hadoop102:9092 --describe --topic three
复制代码
Leader Partition 负载平衡

正常情况下,Kafka自己会主动把Leader Partition均匀分散在各个呆板上,来保证每台呆板的读写吞吐量都是均匀的。但是假如某 些broker宕机,会导致Leader Partition过于会合在其他少部分几台broker上,这会导致少数几台broker的读写哀求压力过高,其他宕机的broker重启之后都是follower partition,读写哀求很低,造成集群负载不均衡。

参数名称描述auto.leader.rebalance.enable默认是 true。 主动 Leader Partition 平衡。生产环
境中,leader 重选举的代价比较大,可能会带来
性能影响,发起设置为 false 关闭。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 答应的不平衡的 leader
的比率。假如每个 broker 凌驾了这个值,控制器
会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔
时间。 文件存储

Topic 数据的存储机制


查看 hadoop102(大概 hadoop103、hadoop104)的/opt/module/kafka/datas/first-1 (first-0、first-2)路径上的文件
  1. [atguigu@hadoop104 first-1]$ ls
  2. 00000000000000000092.index
  3. 00000000000000000092.log
  4. 00000000000000000092.snapshot
  5. 00000000000000000092.timeindex
  6. leader-epoch-checkpoint
  7. partition.metadata
复制代码
直接查看 log 日志,发现是乱码。
通过工具查看 index 和 log 信息。
  1. [atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments
  2. --files ./00000000000000000000.index
  3. Dumping ./00000000000000000000.index
  4. offset: 3 position: 152
复制代码

日志存储参数设置
参数描述log.segment.bytesKafka 中 log 日志是分成一块块存储的,此设置是指 log 日志划分
成块的大小,默认值 1G。log.index.interval.bytes默认 4kb,kafka 内里每当写入了 4kb 大小的日志(.log),
然后就往 index 文件内里记录一个索引。 稀疏索引。 文件整理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

那么日志一旦凌驾了设置的时间,怎么处理呢?
Kafka 中提供的日志整理策略有 delete 和 compact 两种。

(1) 基于时间:默认打开。以 segment 中全部记录中的最大时间戳作为该文件时间戳。
(2) 基于大小:默认关闭。凌驾设置的全部日志总大小,阐述最早的 segment 。
log.retention.bytes,默认等于-1,表现无穷大。

compact日志压缩:对于雷同 key 的差别 value 值,值保存末了一个版本。


压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个 offset 大的 offset 对应的消息,实际上会拿到 offset 为 7 的消息,并从这个位置开始消费。
​ 这种策略只得当特别场景,比如消息的 key 是用户 ID,value 是用户的资料,通过这种压缩策略,整个消息集里就保存了全部用户最新的资料。
Kafka 消费者

Kafka 消费方式


pull 模式不足之处是,假如Kafka 没有数据,消费者可能会陷入循环中,不绝返回空数据。

Kafka 消费者工作流程


消费者组原理

Consumer Group (CG):消费者组,由多个consumer组成。形成一个消费者组的条件是全部消费者的 groupid 雷同。



消费者紧张参数

参数名称描述bootstrap.servers向 Kafka 集群创建初始连接用到的 host/port 列表。key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化范例。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit默认值为 true,消费者会主动周期性地向服务器提交偏移量。auto.commit.interval.ms假如设置了 enable.auto.commit 的值为 true, 则该值定义了
消费者偏移量向 Kafka 提交的频率,默认 5s。auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在
(如,数据被删除了),该如何处理? earliest:主动重置偏
移量到最早的偏移量。 latest:默认,主动重置偏移量为最
新的偏移量。 none:假如消费组原来的(previous)偏移量
不存在,则向消费者抛异常。 anything:向消费者抛异常。offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。
该条目的值必须小于 session.timeout.ms ,也不应该高于
session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。
凌驾该值,该消费者被移除,消费者组实行再平衡。max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。凌驾该值,该
消费者被移除,消费者组实行再平衡。fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms默认 500ms。假如没有从服务器端获取到一批数据的最小字
节数。该时间到,仍旧会返回数据。fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批
消息最大的字节数。假如服务器端一批次的数据大于该值
(50m)仍旧可以拉取返来这批数据,因此,这不是一个绝
对最大值。一批次的大小受 message.max.bytes (broker
config)or max.message.bytes (topic config)影响。max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 offset 位移

offset 的默认维护位置


主动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了主动提交offset的功能。
主动提交offset的相干参数:


参数名称描述enable.auto.commit默认值为 true,消费者会主动周期性地向服务器提交偏移量。auto.commit.interval.ms假如设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 手动提交offset

虽然主动提交offset非常简单比那边,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。一次 Kafka 还提供了手动提交 offset 的API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和commitAsync(异步提交)。两者的雷同点是,都会将本次提交的一批数据最高的偏移量提交;差别点是,同步提交阻塞当前线程,不绝到提交乐成,而且会主动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。


指定Offset消费

auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
(1)earliest:主动将偏移量重置为最早的偏移量,–from-beginning。
(2)latest(默认值):主动将偏移量重置为最新偏移量。
(3)none:假如未找到消费者组的先前偏移量,则向消费者抛出异常。

Kafka-Kraft模式

Kafka-Kraft架构


左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 举行 Kafka 集群管理。右图为 kraft 模式架构(实行性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接举行 Kafka 集群管理。
如许做的利益有以下几个:

Go kafka

Kafka简介

Kafka的结构


Producer

Producer即生产者,消息的产生者,是消息的⼊口。
kafka cluster

kafka集群,一台或多台服务器组成
Broker

Broker是指部署了Kafka实例的服务器节点。
Topic

消息的主题,可以明白为消息的分类,kafka的数据就保存在topic。在每个broker上 都可以创建多个topic。实际应用中通常是一个业务线建一个topic。
Partition

Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的⽂件夹!
Replication

每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时间会选择一个备胎(Follower)上位,成为Leader。
在kafka中默认副本的最大数量是10 个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的呆板,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Consumer

消费者,即消息的消费方,是消息的出口。
Consumer Group

我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic的差别分区的数据,这也是为了提高kafka的吞吐量!
Kafka⼯作流程


选择partition的原则(面试重点)

某个topic有多个partition,producer⼜怎么知道该将数据发往哪个partition?
ACK应答机制(面试重点)

producer在向kafka写入消息的时间,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置 的值为 0,1,all
假如往不存在的topic写数据,kafka会⾃动创建topic,partition和replication的数目 默认设置都是1。
Topic和数据⽇志

topic 是同⼀种别的消息记录(record)的集合。在Kafka中,⼀个主题通常有多个订阅者。对于每个
主题,Kafka集群维护了⼀个分区数据⽇志⽂件结构如下:

Kafka可以设置⼀个保存期限,⽤来标识⽇志会在Kafka集群内保存多⻓时间。Kafka集群会保存在保存期限内全部被发布的消息,不管这些消息是否被消费过。
⽐如保存期限设置为两天,那么数据被发布到 Kafka集群的两天以内,全部的这些数据都可以被消费。当凌驾两天,这些数据将会被清空,以便为后 续的数据腾出空间。
由于Kafka会将数据进⾏持久化存储(即写⼊到硬盘上),所以保存的数据⼤⼩可 以设置为⼀个⽐较⼤的值。
Partition结构

Partition在服务器上的表现形式就是⼀个⼀个的⽂件夹,每个partition的⽂件夹下⾯会有多组segment ⽂件,每组segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三个⽂件,此中 .log ⽂件就是实际存储message的地⽅,⽽ .index 和 .timeindex ⽂件为索引⽂件,⽤于检索消息。
消费数据

kafka环境搭建

kafka环境基于zookeeper,zookeeper环境基于JAVA-JDK。
!!!新版本的kafka自带zookeeper,可以不手动安装。
java环境变量

https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html



安装kafka

http://kafka.apache.org/downloads
  1. 1.打开config目录下的server.properties文件
  2. 2.修改log.dirs=F:/tmp/kafka-logs  //日志存放
  3. 3.打开config目录下的zookeeper.properties文件
  4. 4.修改dataDir=F:/tmp/zookeeper //数据存放
  5. 启动:
  6. 先执行:bin\windows\zookeeper-server-start.bat config\zookeeper.properties
  7. 再执行:bin\windows\kafka-server-start.bat config\server.properties
复制代码
zookeeper:

kafka:

GO操纵Kafka

sarama操纵kafka

依赖安装

go get github.com/Shopify/sarama
windows: mod文件中手动加 require github.com/shopify/sarama v1.19.0
Go语言中连接kafka使用第三方库:github.com/IBM/sarama。
go get github.com/IBM/sarama这个库已经由Shopify转给了IBM。
sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:
  1. # github.com/DataDog/zstd
  2. exec: "gcc":executable file not found in %PATH%
复制代码
所以在Windows平台请使用v1.19版本的sarama。
连接kafka发送消息

  1. package main
  2. import (
  3.         "fmt"
  4.         "github.com/IBM/sarama"
  5. )
  6. // 基于sarama第三方库开发的kafka client
  7. func main() {
  8.         config := sarama.NewConfig()
  9.         config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
  10.         config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
  11.         config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
  12.         // 构造一个消息
  13.         msg := &sarama.ProducerMessage{}
  14.         msg.Topic = "web_log"
  15.         msg.Value = sarama.StringEncoder("this is a test log")
  16.         // 连接kafka
  17.         client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
  18.         if err != nil {
  19.                 fmt.Println("producer closed, err:", err)
  20.                 return
  21.         }
  22.         defer client.Close()
  23.         // 发送消息
  24.         pid, offset, err := client.SendMessage(msg)
  25.         if err != nil {
  26.                 fmt.Println("send msg failed, err:", err)
  27.                 return
  28.         }
  29.         fmt.Printf("pid:%v offset:%v\n", pid, offset)
  30. }
复制代码
连接kafka消费消息

  1. package main
  2. import (
  3.         "fmt"
  4.         "github.com/IBM/sarama"
  5. )
  6. // kafka consumer
  7. func main() {
  8.         consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  9.         if err != nil {
  10.                 fmt.Printf("fail to start consumer, err:%v\n", err)
  11.                 return
  12.         }
  13.         partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
  14.         if err != nil {
  15.                 fmt.Printf("fail to get list of partition:err%v\n", err)
  16.                 return
  17.         }
  18.         fmt.Println(partitionList)
  19.         for partition := range partitionList { // 遍历所有的分区
  20.                 // 针对每个分区创建一个对应的分区消费者
  21.                 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
  22.                 if err != nil {
  23.                         fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
  24.                         return
  25.                 }
  26.                 defer pc.AsyncClose()
  27.                 // 异步从每个分区消费信息
  28.                 go func(sarama.PartitionConsumer) {
  29.                         for msg := range pc.Messages() {
  30.                                 fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
  31.                         }
  32.                 }(pc)
  33.         }
  34. }
复制代码
kafka-go操纵kafka

准备Kafka环境

以下docker-compose.yml文件用来搭建一套单节点zookeeper和单节点kafka环境,而且在8080端口提供kafka-ui管理界面。
  1. version: '2.1'
  2. services:
  3.   zoo1:
  4.     image: confluentinc/cp-zookeeper:7.3.2
  5.     hostname: zoo1
  6.     container_name: zoo1
  7.     ports:
  8.       - "2181:2181"
  9.     environment:
  10.       ZOOKEEPER_CLIENT_PORT: 2181
  11.       ZOOKEEPER_SERVER_ID: 1
  12.       ZOOKEEPER_SERVERS: zoo1:2888:3888
  13.   kafka1:
  14.     image: confluentinc/cp-kafka:7.3.2
  15.     hostname: kafka1
  16.     container_name: kafka1
  17.     ports:
  18.       - "9092:9092"
  19.       - "29092:29092"
  20.       - "9999:9999"
  21.     environment:
  22.       KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
  23.       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
  24.       KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
  25.       KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
  26.       KAFKA_BROKER_ID: 1
  27.       KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
  28.       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  29.       KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  30.       KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  31.       KAFKA_JMX_PORT: 9999
  32.       KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
  33.       KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
  34.       KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
  35.     depends_on:
  36.       - zoo1
  37.   kafka-ui:
  38.     container_name: kafka-ui
  39.     image: provectuslabs/kafka-ui:latest
  40.     ports:
  41.       - 8080:8080
  42.     depends_on:
  43.       - kafka1
  44.     environment:
  45.       DYNAMIC_CONFIG_ENABLED: "TRUE"
复制代码
将上述docker-compose.yml文件在本地保存,在同一目录下实行以下下令启动容器。
  1. docker-compose up -d
复制代码
容器启动后,使用浏览器打开127.0.0.1:8080 即可看到如下kafka-ui界面。

安装kafka-go

实行以下下令下载 kafka-go依赖。
  1. go get github.com/segmentio/kafka-go
复制代码
留意:kafka-go 需要 Go 1.15或更高版本。
kafka-go 提供了两套与Kafka交互的API。
低级别( low-level):基于与 Kafka 服务器的原始网络连接实现。
高级别(high-level):对于常用读写操纵封装了一套更易用的API。
通常发起直接使用高级别的交互API。
Connection

Conn 范例是 kafka-go 包的核心。它代表与 Kafka broker之间的连接。基于它实现了一套与Kafka交互的低级别 API。
发送消息

下面是连接至Kafka之后,使用Conn发送消息的代码示例。
  1. // writeByConn 基于Conn发送消息
  2. func writeByConn() {
  3.         topic := "my-topic"
  4.         partition := 0
  5.         // 连接至Kafka集群的Leader节点
  6.         conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
  7.         if err != nil {
  8.                 log.Fatal("failed to dial leader:", err)
  9.         }
  10.         // 设置发送消息的超时时间
  11.         conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
  12.         // 发送消息
  13.         _, err = conn.WriteMessages(
  14.                 kafka.Message{Value: []byte("one!")},
  15.                 kafka.Message{Value: []byte("two!")},
  16.                 kafka.Message{Value: []byte("three!")},
  17.         )
  18.         if err != nil {
  19.                 log.Fatal("failed to write messages:", err)
  20.         }
  21.         // 关闭连接
  22.         if err := conn.Close(); err != nil {
  23.                 log.Fatal("failed to close writer:", err)
  24.         }
  25. }
复制代码
消费消息

  1. // readByConn 连接至kafka后接收消息
  2. func readByConn() {
  3.         // 指定要连接的topic和partition
  4.         topic := "my-topic"
  5.         partition := 0
  6.         // 连接至Kafka的leader节点
  7.         conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
  8.         if err != nil {
  9.                 log.Fatal("failed to dial leader:", err)
  10.         }
  11.         // 设置读取超时时间
  12.         conn.SetReadDeadline(time.Now().Add(10 * time.Second))
  13.         // 读取一批消息,得到的batch是一系列消息的迭代器
  14.         batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
  15.         // 遍历读取消息
  16.         b := make([]byte, 10e3) // 10KB max per message
  17.         for {
  18.                 n, err := batch.Read(b)
  19.                 if err != nil {
  20.                         break
  21.                 }
  22.                 fmt.Println(string(b[:n]))
  23.         }
  24.         // 关闭batch
  25.         if err := batch.Close(); err != nil {
  26.                 log.Fatal("failed to close batch:", err)
  27.         }
  28.         // 关闭连接
  29.         if err := conn.Close(); err != nil {
  30.                 log.Fatal("failed to close connection:", err)
  31.         }
  32. }
复制代码
使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer(上述代码中的b),假如传入的buffer太小(消息装不下)就会返回io.ErrShortBuffer错误。
假如不思量内存分配的效率标题,也可以按以下代码使用batch.ReadMessage读取消息。
  1. for {
  2.   msg, err := batch.ReadMessage()
  3.   if err != nil {
  4.     break
  5.   }
  6.   fmt.Println(string(msg.Value))
  7. }
复制代码
创建topic

当Kafka关闭主动创建topic的设置时,可按如下方式创建topic。
  1. // createTopicByConn 创建topic
  2. func createTopicByConn() {
  3.         // 指定要创建的topic名称
  4.         topic := "my-topic"
  5.         // 连接至任意kafka节点
  6.         conn, err := kafka.Dial("tcp", "localhost:9092")
  7.         if err != nil {
  8.                 panic(err.Error())
  9.         }
  10.         defer conn.Close()
  11.         // 获取当前控制节点信息
  12.         controller, err := conn.Controller()
  13.         if err != nil {
  14.                 panic(err.Error())
  15.         }
  16.         var controllerConn *kafka.Conn
  17.         // 连接至leader节点
  18.         controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
  19.         if err != nil {
  20.                 panic(err.Error())
  21.         }
  22.         defer controllerConn.Close()
  23.         topicConfigs := []kafka.TopicConfig{
  24.                 {
  25.                         Topic:             topic,
  26.                         NumPartitions:     1,
  27.                         ReplicationFactor: 1,
  28.                 },
  29.         }
  30.         // 创建topic
  31.         err = controllerConn.CreateTopics(topicConfigs...)
  32.         if err != nil {
  33.                 panic(err.Error())
  34.         }
  35. }
复制代码
通过非leader节点连接leader节点

下面的示例代码演示了如何通过已有的非leader节点的Conn,连接至 leader节点。
  1. conn, err := kafka.Dial("tcp", "localhost:9092")
  2. if err != nil {
  3.     panic(err.Error())
  4. }
  5. defer conn.Close()
  6. // 获取当前控制节点信息
  7. controller, err := conn.Controller()
  8. if err != nil {
  9.     panic(err.Error())
  10. }
  11. var connLeader *kafka.Conn
  12. connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
  13. if err != nil {
  14.     panic(err.Error())
  15. }
  16. defer connLeader.Close()
复制代码
获取topic列表

  1. conn, err := kafka.Dial("tcp", "localhost:9092")
  2. if err != nil {
  3.     panic(err.Error())
  4. }
  5. defer conn.Close()
  6. partitions, err := conn.ReadPartitions()
  7. if err != nil {
  8.     panic(err.Error())
  9. }
  10. m := map[string]struct{}{}
  11. // 遍历所有分区取topic
  12. for _, p := range partitions {
  13.     m[p.Topic] = struct{}{}
  14. }
  15. for k := range m {
  16.     fmt.Println(k)
  17. }
复制代码
Reader

Reader是由 kafka-go 包提供的另一个概念,对于从单个主题-分区(topic-partition)消费消息这种典范场景,使用它能够简化代码。Reader 还实现了主动重连和偏移量管理,并支持使用 Context 支持异步取消和超时的 API。
留意: 当历程退出时,必须在 Reader 上调用 Close() 。Kafka服务器需要一个优雅的断开连接来阻止它继承实验向已连接的客户端发送消息。假如历程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)终止,那么下面给出的示例不会调用 Close()。当同一topic上有新Reader连接时,可能导致耽误(例如,新历程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在历程关闭时关闭Reader。
消费消息

下面的代码演示了如何使用Reader连接至Kafka消费消息。
  1. // readByReader 通过Reader接收消息
  2. func readByReader() {
  3.         // 创建Reader
  4.         r := kafka.NewReader(kafka.ReaderConfig{
  5.                 Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
  6.                 Topic:     "topic-A",
  7.                 Partition: 0,
  8.                 MaxBytes:  10e6, // 10MB
  9.         })
  10.         r.SetOffset(42) // 设置Offset
  11.         // 接收消息
  12.         for {
  13.                 m, err := r.ReadMessage(context.Background())
  14.                 if err != nil {
  15.                         break
  16.                 }
  17.                 fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
  18.         }
  19.         // 程序退出前关闭Reader
  20.         if err := r.Close(); err != nil {
  21.                 log.Fatal("failed to close reader:", err)
  22.         }
  23. }
复制代码
消费者组

kafka-go支持消费者组,包括broker管理的offset。要启用消费者组,只需在 ReaderConfig 中指定 GroupID。
使用消费者组时,ReadMessage 会主动提交偏移量。
  1. // 创建一个reader,指定GroupID,从 topic-A 消费消息
  2. r := kafka.NewReader(kafka.ReaderConfig{
  3.         Brokers:  []string{"localhost:9092", "localhost:9093", "localhost:9094"},
  4.         GroupID:  "consumer-group-id", // 指定消费者组id
  5.         Topic:    "topic-A",
  6.         MaxBytes: 10e6, // 10MB
  7. })
  8. // 接收消息
  9. for {
  10.         m, err := r.ReadMessage(context.Background())
  11.         if err != nil {
  12.                 break
  13.         }
  14.         fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
  15. }
  16. // 程序退出前关闭Reader
  17. if err := r.Close(); err != nil {
  18.         log.Fatal("failed to close reader:", err)
  19. }
复制代码
在使用消费者组时会有以下限制:
显式提交

kafka-go 也支持显式提交。当需要显式提交时不要调用 ReadMessage,而是调用 FetchMessage获取消息,然后调用 CommitMessages 显式提交。
  1. ctx := context.Background()
  2. for {
  3.     // 获取消息
  4.     m, err := r.FetchMessage(ctx)
  5.     if err != nil {
  6.         break
  7.     }
  8.     // 处理消息
  9.     fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
  10.     // 显式提交
  11.     if err := r.CommitMessages(ctx, m); err != nil {
  12.         log.Fatal("failed to commit messages:", err)
  13.     }
  14. }
复制代码
在消费者组中提交消息时,具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如,假如通过调用 FetchMessage 获取了单个分区的偏移量为 1、2 和 3 的消息,则使用偏移量为3的消息调用 CommitMessages 也将导致该分区的偏移量为 1 和 2 的消息被提交。
管理提交间隔

默认情况下,调用CommitMessages将同步向Kafka提交偏移量。为了提高性能,可以在ReaderConfig中设置CommitInterval来定期向Kafka提交偏移。
  1. // 创建一个reader从 topic-A 消费消息
  2. r := kafka.NewReader(kafka.ReaderConfig{
  3.     Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
  4.     GroupID:        "consumer-group-id",
  5.     Topic:          "topic-A",
  6.     MaxBytes:       10e6, // 10MB
  7.     CommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
  8. })
复制代码
Writer

向Kafka发送消息,除了使用基于Conn的低级API,kafka-go包还提供了更高级别的 Writer 范例。大多数情况下使用Writer即可满意条件,它支持以下特性。
发送消息

  1. // 创建一个writer 向topic-A发送消息
  2. w := &kafka.Writer{
  3.         Addr:         kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  4.         Topic:        "topic-A",
  5.         Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
  6.         RequiredAcks: kafka.RequireAll,    // ack模式
  7.         Async:        true,                // 异步
  8. }
  9. err := w.WriteMessages(context.Background(),
  10.         kafka.Message{
  11.                 Key:   []byte("Key-A"),
  12.                 Value: []byte("Hello World!"),
  13.         },
  14.         kafka.Message{
  15.                 Key:   []byte("Key-B"),
  16.                 Value: []byte("One!"),
  17.         },
  18.         kafka.Message{
  19.                 Key:   []byte("Key-C"),
  20.                 Value: []byte("Two!"),
  21.         },
  22. )
  23. if err != nil {
  24.     log.Fatal("failed to write messages:", err)
  25. }
  26. if err := w.Close(); err != nil {
  27.     log.Fatal("failed to close writer:", err)
  28. }
复制代码
创建不存在的topic

假如给Writer设置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会主动创建topic。
  1. w := &Writer{
  2.     Addr:                   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  3.     Topic:                  "topic-A",
  4.     AllowAutoTopicCreation: true,  // 自动创建topic
  5. }
  6. messages := []kafka.Message{
  7.     {
  8.         Key:   []byte("Key-A"),
  9.         Value: []byte("Hello World!"),
  10.     },
  11.     {
  12.         Key:   []byte("Key-B"),
  13.         Value: []byte("One!"),
  14.     },
  15.     {
  16.         Key:   []byte("Key-C"),
  17.         Value: []byte("Two!"),
  18.     },
  19. }
  20. var err error
  21. const retries = 3
  22. // 重试3次
  23. for i := 0; i < retries; i++ {
  24.     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  25.     defer cancel()
  26.    
  27.     err = w.WriteMessages(ctx, messages...)
  28.     if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
  29.         time.Sleep(time.Millisecond * 250)
  30.         continue
  31.     }
  32.     if err != nil {
  33.         log.Fatalf("unexpected error %v", err)
  34.     }
  35.     break
  36. }
  37. // 关闭Writer
  38. if err := w.Close(); err != nil {
  39.     log.Fatal("failed to close writer:", err)
  40. }
复制代码
写入多个topic

通常,WriterConfig.Topic用于初始化单个topic的Writer。通已往掉WriterConfig中的Topic设置,分别设置每条消息的message.topic,可以实现将消息发送至多个topic。
  1. w := &kafka.Writer{
  2.         Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  3.     // 注意: 当此处不设置Topic时,后续的每条消息都需要指定Topic
  4.         Balancer: &kafka.LeastBytes{},
  5. }
  6. err := w.WriteMessages(context.Background(),
  7.     // 注意: 每条消息都需要指定一个 Topic, 否则就会报错
  8.         kafka.Message{
  9.         Topic: "topic-A",
  10.                 Key:   []byte("Key-A"),
  11.                 Value: []byte("Hello World!"),
  12.         },
  13.         kafka.Message{
  14.         Topic: "topic-B",
  15.                 Key:   []byte("Key-B"),
  16.                 Value: []byte("One!"),
  17.         },
  18.         kafka.Message{
  19.         Topic: "topic-C",
  20.                 Key:   []byte("Key-C"),
  21.                 Value: []byte("Two!"),
  22.         },
  23. )
  24. if err != nil {
  25.     log.Fatal("failed to write messages:", err)
  26. }
  27. if err := w.Close(); err != nil {
  28.     log.Fatal("failed to close writer:", err)
  29. }
复制代码
留意:Writer中的Topic和Message中的Topic是互斥的,同一时刻有且只能设置一处。
其他设置

TLS

对于根本的 Conn 范例或在 Reader/Writer 设置中,可以在Dialer中设置TLS选项。假如 TLS 字段为空,则它将不启用TLS 连接。
留意:不在Conn/Reder/Writer上设置TLS,连接到启用TLS的Kafka集群,可能会出现io.ErrUnexpectedEOF错误。
Connection
  1. dialer := &kafka.Dialer{
  2.     Timeout:   10 * time.Second,
  3.     DualStack: true,
  4.     TLS:       &tls.Config{...tls config...},  // 指定TLS配置
  5. }
  6. conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
复制代码
Reader
  1. dialer := &kafka.Dialer{
  2.     Timeout:   10 * time.Second,
  3.     DualStack: true,
  4.     TLS:       &tls.Config{...tls config...},  // 指定TLS配置
  5. }
  6. r := kafka.NewReader(kafka.ReaderConfig{
  7.     Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
  8.     GroupID:        "consumer-group-id",
  9.     Topic:          "topic-A",
  10.     Dialer:         dialer,
  11. })
复制代码
Writer
创建Writer时可以按如下方式指定TLS设置。
  1. w := kafka.Writer{
  2.   Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  3.    Topic:   "topic-A",
  4.    Balancer: &kafka.Hash{},
  5.    Transport: &kafka.Transport{
  6.        TLS: &tls.Config{},  // 指定TLS配置
  7.      },
  8.    }
复制代码
SASL

可以在Dialer上指定一个选项以使用SASL身份验证。Dialer可以直接用来打开一个 Conn,也可以通过它们各自的设置传递给一个 Reader 或 Writer。假如 SASLMechanism字段为 nil,则不会使用 SASL 举行身份验证。
SASL 身份验证范例
明文
  1. mechanism := plain.Mechanism{
  2.     Username: "username",
  3.     Password: "password",
  4. }
复制代码
SCRAM
  1. mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
  2. if err != nil {
  3.     panic(err)
  4. }
复制代码
Connection
  1. mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
  2. if err != nil {
  3.     panic(err)
  4. }
  5. dialer := &kafka.Dialer{    Timeout:       10 * time.Second,    DualStack:     true,    SASLMechanism: mechanism,}conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
复制代码
Reader
  1. mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
  2. if err != nil {
  3.     panic(err)
  4. }
  5. dialer := &kafka.Dialer{    Timeout:       10 * time.Second,    DualStack:     true,    SASLMechanism: mechanism,}r := kafka.NewReader(kafka.ReaderConfig{    Brokers:        []string{"localhost:9092","localhost:9093", "localhost:9094"},    GroupID:        "consumer-group-id",    Topic:          "topic-A",    Dialer:         dialer,})
复制代码
Writer
  1. mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
  2. if err != nil {
  3.     panic(err)
  4. }
  5. // Transport 负责管理连接池和其他资源,// 通常最好的使用方式是创建后在应用程序中共享使用它们。sharedTransport := &kafka.Transport{    SASL: mechanism,}w := kafka.Writer{        Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),        Topic:     "topic-A",        Balancer:  &kafka.Hash{},        Transport: sharedTransport,}
复制代码
Client
  1. mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
  2. if err != nil {
  3.     panic(err)
  4. }
  5. // Transport 负责管理连接池和其他资源,// 通常最好的使用方式是创建后在应用程序中共享使用它们。sharedTransport := &kafka.Transport{    SASL: mechanism,}client := &kafka.Client{    Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),    Timeout:   10 * time.Second,    Transport: sharedTransport,}
复制代码
Balancer

kafka-go实现了多种负载均衡策略。特别是当你从其他Kafka库迁徙过来时,你可以按如下说明选择合适的Balancer实现。
Sarama
假如从 sarama 切换过来,而且需要/盼望使用雷同的算法举行消息分区,则可以使用kafka.Hash或kafka.ReferenceHash。
  1. kafka.Hash = sarama.NewHashPartitioner
  2. kafka.ReferenceHash = sarama.NewReferenceHashPartitioner
  3. w := &kafka.Writer{
  4.         Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  5.         Topic:    "topic-A",
  6.         Balancer: &kafka.Hash{},
  7. }
复制代码
librdkafka和confluent-kafka-go:kafka.CRC32Balancer与librdkafka默认的consistent_random策略表现同等。
  1. w := &kafka.Writer{
  2.         Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  3.         Topic:    "topic-A",
  4.         Balancer: kafka.CRC32Balancer{},
  5. }
复制代码
Java:使用kafka.Murmur2Balancer可以得到与默认Java客户端雷同的策略。
  1. w := &kafka.Writer{
  2.         Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  3.         Topic:    "topic-A",
  4.         Balancer: kafka.Murmur2Balancer{},
  5. }
复制代码
Compression

可以通过设置Compression字段在Writer上启用压缩:
  1. w := &kafka.Writer{
  2.         Addr:        kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
  3.         Topic:       "topic-A",
  4.         Compression: kafka.Snappy,
  5. }
复制代码
Reader 将通过检查消息属性来确定消费的消息是否被压缩。
Logging

想要记录Reader/Writer范例的操纵,可以在创建时设置日志记录器。
kafka-go中的Logger是一个接口范例。
  1. type Logger interface {
  2.         Printf(string, ...interface{})
  3. }
复制代码
而且提供了一个LoggerFunc范例,帮我们实现了Logger接口。
  1. type LoggerFunc func(string, ...interface{})
  2. func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
复制代码
Reader:借助kafka.LoggerFunc我们可以自定义一个Logger。
  1. // 自定义一个Logger
  2. func logf(msg string, a ...interface{}) {
  3.         fmt.Printf(msg, a...)
  4.         fmt.Println()
  5. }
  6. r := kafka.NewReader(kafka.ReaderConfig{
  7.         Brokers:     []string{"localhost:9092", "localhost:9093", "localhost:9094"},
  8.         Topic:       "q1mi-topic",
  9.         Partition:   0,
  10.         Logger:      kafka.LoggerFunc(logf),
  11.         ErrorLogger: kafka.LoggerFunc(logf),
  12. })
复制代码
Writer:也可以直接使用第三方日志库,例如下面示例代码中使用了zap日志库。
  1. w := &kafka.Writer{
  2.         Addr:        kafka.TCP("localhost:9092"),
  3.         Topic:       "q1mi-topic",
  4.         Logger:      kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
  5.         ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
  6. }
复制代码
FAQ

Kafka中的消费者组(Consumer Group)的明白

Kafka中的消费者组(Consumer Group)是一个非常紧张的概念,涉及到消息如何被消费和分发。让我们来具体表明一下。
消费者组的概念


消费者组内的消息分配

假设你有一个Topic names,它有多个分区,而且你创建了一个消费者组group1,内里有3个消费者:C1、C2 和 C3。消息的消费方式将取决于Topic中的分区数目。

消息分发规则


这意味着,在同一个消费者组内,每条消息只会被一个消费者处理,消费后消息即从队列中移除

如许设计的利益是,你可以根据消费者组的数目灵活地调整并行处理的本领,同时还能保证在同一个消费者组内消息只被消费一次。
让我们通过一个具体的例子来深入明白Kafka消费者组的概念及其工作方式。
实际例子

假设有一个Topic叫做 names,它有6个分区(Partition),用来存储用户的名字。现在,你有一个消费者组 group1,而且在这个组中创建了3个消费者 C1、C2 和 C3。
场景1:消费者组内的消费者数目小于分区数目


在这种情况下,Kafka会将6个分区分配给这3个消费者。假设分配方式如下:

因此,分区 P1 中的全部消息只会被消费者 C1 消费,P2 中的消息只会被消费者 C2 消费,依此类推。每条消息在消费者组 group1 中只会被一个消费者消费
场景2:增加消费者以提高处理本领

假设随着业务发展,你需要更快地处理这些消息。你可以在group1中再增加3个消费者,使总消费者数目达到6个。

Kafka会重新分配分区,现在每个消费者消费一个分区:

如许一来,消息处理本领就提升了,由于现在有6个消费者并行处理这6个分区中的消息。
场景3:消费者数目多于分区数目

假如你增加到8个消费者,而 names 这个Topic仍旧只有6个分区,Kafka会把这6个分区分配给前6个消费者,而剩余的两个消费者 C7 和 C8 不会接收到任何消息,处于空闲状态。
这个设计提供了灵活性,答应你根据需求调整消费者数目以达到所需的吞吐量和并行处理本领。
固然可以。下面是一个简单的Go示例代码,展示如何使用 confluent-kafka-go 库(这是一个流行的Kafka客户端库)来创建Kafka消费者,而且如何增加消费者来提升处理本领。
示例代码

起首,你需要安装 confluent-kafka-go 库。你可以使用以下下令安装它:
  1. go get github.com/confluentinc/confluent-kafka-go/kafka
复制代码
以下是一个简单的Go程序,演示了如何创建一个Kafka消费者,而且增加消费者来处理消息。
1. 单个消费者

Kafka中的消费者组是通过消费者设置中的 group.id 参数来定义的。消费者组的概念是Kafka客户端的设置的一部分,而不是需要显式创建的实体。Kafka会主动管理消费者组的协调和分配。
这是一个单消费者的简单示例:
  1. package main
  2. import (
  3.     "log"
  4.     "github.com/confluentinc/confluent-kafka-go/kafka"
  5. )
  6. func main() {
  7.     // 创建Kafka消费者配置
  8.     config := &kafka.ConfigMap{
  9.         "bootstrap.servers": "localhost:9092",
  10.         "group.id":          "example-group",
  11.         "auto.offset.reset": "earliest",
  12.     }
  13.     // 创建Kafka消费者
  14.     consumer, err := kafka.NewConsumer(config)
  15.     if err != nil {
  16.         log.Fatalf("Failed to create consumer: %s", err)
  17.     }
  18.     // 订阅Topic
  19.     consumer.Subscribe("names", nil)
  20.     // 消费消息
  21.     for {
  22.         msg, err := consumer.ReadMessage(-1)
  23.         if err != nil {
  24.             log.Printf("Consumer error: %v", err)
  25.             continue
  26.         }
  27.         log.Printf("Message: %s", string(msg.Value))
  28.     }
  29. }
复制代码
2. 增加消费者

要增加消费者,你可以启动多个实例的消费者程序,每个实例都使用雷同的 group.id。Kafka会主动负载均衡各个消费者对Topic分区的消费。
以下是启动多个消费者的示例:
假设你已经将上述代码保存为 consumer.go。你可以在差别的终端中运行多个消费者实例,模拟增加消费者:
  1. go run consumer.go
复制代码
你可以在差别的终端窗口中运行这个下令多次。例如:
  1. # 终端 1go run consumer.go
  2. # 终端 2go run consumer.go
  3. # 终端 3go run consumer.go
复制代码
每个运行的消费者实例都会加入到 example-group 消费者组中,Kafka会将Topic的分区分配给这些消费者,确保负载均衡。
留意事项


如许,你可以通过启动多个消费者实例来提高消息处理的并发本领。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4