Kafka概述
定义
Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),重要应用于大数据实时处理范畴。
发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为差别的种别,订阅者只接收感兴趣的消息。
Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键使命应用。
消息队列
目前企业中比较常见的消息队列产物重要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大数据场景重要采用Kafka作为消息队列。在JavaEE开发中重要采用ActiveMQ、RabbitMQ、RocketMQ。
目录结构分析
- bin:Kafka的全部实行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
- config:Kafka的全部设置文件
- libs: 运行Kafka所需要的全部JAR包
- logs: Kafka的全部日志文件,假如Kafka出现一些标题,需要到该目录中去查看异常信息
- site-docs: Kafka的网站资助文件
传统消息队列的应用场景
传统的消息队列的重要应用场景包括**:缓存/消峰、解耦和异步通讯**。
缓冲/消峰: 有助于控制和优化数据流颠末体系的速率,解决生产消息和消费消息的处理速率差别等的情况。
解耦:答应你独立的扩展或修改两边的处理过程,只要确保它们服从同样的接口束缚。
异步通讯:答应用户把一个消息放入队列,但并不立刻处理它,然后再需要的时间再行止理它们。
消息队列的两种模式
点对点模式
消费者主动拉去数据,消息收到后扫除消息
发布/订阅模式
- 可以有多个topic主题(浏览,点赞,收藏,批评等)
- 消费者消费数据之后,不删除数据
- 每个消费者互相独立,都可以消费到数据
Kafka基础架构
1、为方便扩展,并提高吞吐量,一个topic分为多个partition
2、配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
3、为提高可用性,为每个partition增加多少副本,类似NameNode HA
4、ZK中记录谁是leader,Kafka2.8.0 以后也可以设置不采用ZK.
- Producer:消息生产者,就是向Kafka broker 发消息的客户端。
- Consumer:消息消费者,向Kafka broker 取消息的客户端。
- Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费差别分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。全部的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
- Topic: 可以明白为一个队列,生产者和消费者面向的都是一个topic。
- Partition: 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
- Replica:副本。一个topic的每个分区都有多少个副本,一个Leader和多少个Follower。
- Leader:每个分区多个副本的 “主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
- Follower:每个分区多个副本中的 “从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个Follower会成为新的 Leader。
Kafka快速入门
安装部署
集群规划
Hadoop102Hadoop103Hadoop104zkzkzkkafkakafkakafka 集群部署
- docker部署zk集群:参考《zk全解》
- 进入到/usr/local/kafka目录,修改设置文件
- vim server.properties
- #broker 的全局唯一编号,不能重复,只能是数字。
- broker.id=0
- #处理网络请求的线程数量
- num.network.threads=3
- #用来处理磁盘 IO 的线程数量
- num.io.threads=8
- #发送套接字的缓冲区大小
- socket.send.buffer.bytes=102400
- #接收套接字的缓冲区大小
- socket.receive.buffer.bytes=102400
- #请求套接字的缓冲区大小
- socket.request.max.bytes=104857600
- #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
- # 配置多个磁盘路径,路径与路径之间可以用","分隔
- log.dirs=/opt/module/kafka/datas
- # 监听所有网卡地址,允许外部端口连接
- listeners=PLAINTEXT://0.0.0.0:9092
- #topic 在当前 broker 上的分区个数
- num.partitions=1
- #用来恢复和清理 data 下数据的线程数量
- num.recovery.threads.per.data.dir=1
- # 每个 topic 创建时的副本数,默认时 1 个副本
- offsets.topic.replication.factor=1
- #segment 文件保留的最长时间,超时将被删除
- log.retention.hours=168
- #每个 segment 文件的大小,默认最大 1G
- log.segment.bytes=1073741824
- # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
- log.retention.check.interval.ms=300000
- #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
- zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
复制代码 可以提前在hosts文件中设置master,slave1,slave2的ip,之前在学习k8s的时间我已经设置过了,可以直接拿来用。
listeners=PLAINTEXT://0.0.0.0:9092 ,默认情况下,advertised.listeners不设置的话,则默认使用listeners的属性,然而advertised.listeners是不支持0.0.0.0的,所以需要指定暴露的监听器,如下
- listeners=PLAINTEXT://0.0.0.0:9092
- advertised.listeners=PLAINTEXT://虚拟机ip:9092
复制代码 - 将安装包拷贝到其他服务器
- 分别在hadoop103和hadoop104 上修改设置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2
- 设置环境变量
- 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量设置
- sudo vim /etc/profile.d/my_env.sh
- 增加如下内容:
- #KAFKA_HOME
- export KAFKA_HOME=/opt/module/kafka
- export PATH=$PATH:$KAFKA_HOME/bin
复制代码 这里我将kafka直接放在了根目录下的一个文件夹,更加方便:
- sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.shsource /etc/profile
- source /etc/profile
复制代码
- 分别启动kafka:
- bin/kafka-server-start.sh -daemon config/server.properties
复制代码
- 假如遇到cluser_id不符合的标题,直接将日志文件删除重新启动即可。
集群启停脚本
- #! /bin/bash
- case $1 in
- "start"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo " --------启动 $i Kafka-------"
- ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
- daemon /opt/module/kafka/config/server.properties"
- done
- };;
- "stop"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo " --------停止 $i Kafka-------"
- ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
- done
- };;
- esac
复制代码 Kafka下令行操纵
Kafka基础架构
主题下令行操纵
- 查看操纵主题下令参数
- 查看当前服务器中的全部topic
- ./bin/kafka-topics.sh
- --bootstrap-server localhost:9092 --list
复制代码 - 创建 first topic
- ./bin/kafka-topics.sh
- --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first
复制代码 选项说明:
- –topic 定义 topic 名
- –replication-factor 定义副本数
- –partitions 定义分区数
- 查看 first 主题的详情
- ./bin/kafka-topics.sh
- --bootstrap-server localhost:9092 --topic first --describe
复制代码 - 修改分区数(留意:分区数只能增加,不能淘汰)
- ./bin/kafka-topics.sh
- --bootstrap-server localhost:9092 --alter --topic first --partitions 3
复制代码 - 查看结果:
- ./bin/kafka-topics.sh
- --bootstrap-server localhost:9092 --topic first --describe Topic: first TopicId: _Pjhmn1NTr6ufGufcnsw5A PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: first Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: first Partition: 2 Leader: 0 Replicas: 0 Isr: 0
复制代码 - 删除 topic
- ./bin/kafka-topics.sh
- --bootstrap-server localhost:9092 --delete --topic first
复制代码 生产者下令行操纵
- 查看操纵者下令参数
- ./bin/kafka-console-producer.sh
复制代码
- 发送消息
- ./bin/kafka-console-producer.sh
- --bootstrap-server localhost:9092 --topic first>hello world>yooome yooome
复制代码 消费者下令行操纵
- 查看操纵消费者下令参数
- ./bin/kafka-console-consumer.sh
复制代码
- 消费消息
- ./bin/kafka-console-consumer.sh
- --bootstrap-server localhost:9092 --topic first
复制代码 - ./bin/kafka-console-consumer.sh
- --bootstrap-server localhost:9092 --from-beginning --topic first
复制代码
kafka可视化工具
官网:https://www.kafkatool.com/download.html
Kafka紧张概念
broker
- 一个Kafka的集群通常由多个broker组成,如许才气实现负载均衡、以及容错
- broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
- 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
zookeeper
- ZK用来管理和协调broker,而且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
- ZK服务重要用于关照生产者和消费者Kafka集群中有新的broker加入、大概Kafka集群中出现故障的broker。
- Kafka正在渐渐想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
producer(生产者)
生产者负责将数据推送给broker的topic
consumer(消费者)
消费者负责从broker的topic中拉取数据,并自己举行处理
consumer group(消费者组)
- consumer group是kafka提供的可扩展且具有容错性的消费者机制
- 一个消费者组可以包含多个消费者
- 一个消费者组有一个唯一的ID(group Id)
- 组内的消费者一起消费主题的全部分区数据
分区(Partitions)
在Kafka集群中,主题被分为多个分区
副本(Replicas)
副本可以确保某个服务器出现故障时,确保数据依然可用,在Kafka中,一样平常都会设计副本的个数>1,
主题(Topic)
- 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
- Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数目的主题,没有数目上的限制
- 在主题中的消息是有结构的,一样平常一个主题包含某一类消息
- 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
偏移量(offset)
- offset记录着下一条将要发送给Consumer的消息的序号
- 默认Kafka将offset存储在ZooKeeper中
- 在一个分区中,消息是有序次的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
- 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的
消费者组
- Kafka支持有多个消费者同时消费一个主题中的数据。
- 同时运行两个消费者,我们发现,只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息,必须要给test主题,添加一个分区。
- 设置 test topic为2个分区bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test
Kafka生产者
生产者消息发送流程
发送原理
在消息发送的过程中,涉及到了两个线程 — main 线程和Sender线程。在main线程中创建了一个双端队列 RecordAccumulator。main线程将消息发送给ResordAccumlator,Sender线程不绝从 RecordAccumulator 中拉去消息发送到 Kafka Broker。
生产者紧张参数列表
异步发送API
平凡异步发送
- 需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker。
2、代码编程go get github.com/Shopify/sarama
- func main() {
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
- config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
- config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
- // 构造一个消息
- msg := &sarama.ProducerMessage{}
- msg.Topic = "first"
- msg.Value = sarama.StringEncoder("this is a test log")
- // 连接kafka
- client, err := sarama.NewSyncProducer([]string{
- "192.168.71.128:9092", "192.168.71.129:9092", "192.168.71.130:9092",
- }, config)
- if err != nil {
- fmt.Println("producer closed, err:", err)
- return
- } else {
- fmt.Println(client)
- }
- defer client.Close()
- // 发送消息
- pid, offset, err := client.SendMessage(msg)
- if err != nil {
- fmt.Println("send msg failed, err:", err)
- return
- }
- fmt.Printf("pid:%v offset:%v\n", pid, offset)
- }
复制代码 带回调函数的异步发送
【留意:】消息发送失败会主动重试,不需要我们在回调函数中手动重试。
同步发送API
生产者分区
分区和副本机制
生产者写入消息到topic,Kafka将依据差别的策略将数据分配到差别的分区中
- 轮询分区策略
- 随机分区策略
- 按key分区分配策略
- 自定义分区策略
分区利益
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的使命,可以实现负载均衡的效果。
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位举行 消费数据。
轮询策略
- 默认的策略,也是使用最多的策略,可以最大限度保证全部消息平均分配到一个分区
- 假如在生产消息时,key为null,则使用轮询算法均衡地分配分区
随机策略(不消)
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以根本上很少会使用随机策略。
按key分配策略
按key分配策略,有可能会出现「数据倾斜」,例如:某个key包含了大量的数据,由于key值一样,全部全部的数据将都分配到一个分区中,造成该分区的消息数目远大于其他的分区。
乱序标题
轮询策略、随机策略都会导致一个标题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
副本机制
副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。由于在其他的Broker上的副本是可用的。
producer的ACKs参数
对副本关系较大的就是,producer设置的acks参数了,acks参数表现当生产者生产消息的时间,写入到副本的要求严酷程度。它决定了生产者如何在性能和可靠性之间做取舍。
acks设置为0
acks设置为1
当生产者的ACK设置为1时,生产者会等候leader副本确认接收后,才会发送下一条数据,性能中等。
acks设置为-1大概all
Kafka生产者幂等性与事件
幂等性
拿http举例来说,一次或多次哀求,得到地响应是同等的(网络超时等标题除外),换句话说,就是实行多次操纵与实行一次操纵的影响是一样的。
假如,某个体系是不具备幂等性的,假如用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在背景天生多个一模一样的订单。
Kafka生产者幂等性
在生产者生产消息时,假如出现retry时,有可能会一条消息被发送了多次,假如Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。
幂等性原理
为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。
- PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
- Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
- 幂等性只能保证的是在单分区单会话内不重复
Kafka事件
- Kafka事件是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事件。Kafka事件指的是生产者生产消息以及消费者提交offset的操纵可以在一个原子操纵中,要么都乐成,要么都失败。尤其是在生产者、消费者并存时,事件的保障尤其紧张。(consumer-transform-producer模式)
- 开启事件,必须开启幂等性
事件操纵API
Producer接口中定义了以下5个事件相干方法:
- initTransactions(初始化事件):要使用Kafka事件,必须先举行初始化操纵
- beginTransaction(开始事件):启动一个Kafka事件
- sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事件中,方便后续一块提交
- commitTransaction(提交事件):提交事件
- abortTransaction(放弃事件):取消事件
数据有序和数据乱序
Kafka Broker
Zookeeper存储的Kafka信息
- [zk: localhost:2181(CONNECTING) 0] ls /
- [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
复制代码
Kafka Broker总体工作流程
Broker紧张参数
Kafka副本
副本根本信息
- Kafka副本作用:提高数据可靠性。
- Kafka默认副本1个,生产环境一样平常设置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka中副本为:Leader和Follower。Kafka生产者只会把数据发往 Leader,然后Follower 找 Leader 举行同步数据。
- Kafka 分区中的全部副本统称为 AR(Assigned Repllicas)。
AR = ISR + OSR
ISR:表现 Leader 保持同步的 Follower 集合。假如 Follower 长时间未 向 Leader 发送通讯哀求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s 。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR:表现 Follower 与 Leader 副本同步时,耽误过多的副本。
Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader ,负责管理集群 broker 的上下线,全部 topic 的分区副本分配 和 Leader 选举等工作。
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
- Created topic atguigu1.
复制代码- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
- --topic atguigu1
- Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
- Configs: segment.bytes=1073741824
- Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
- Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
- Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
- Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
复制代码
- 制止掉 hadoop105 的 kafka 历程,并查看 Leader 分区情况
- [atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
- --topic atguigu1
- Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
- Configs: segment.bytes=1073741824
- Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
- Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
- Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
- Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
复制代码
- 制止掉 hadoop104 的 kafka 历程,并查看 Leader 分区情况
- [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
- --topic atguigu1
- Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
- Configs: segment.bytes=1073741824
- Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
- Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
- Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
- Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
复制代码
- 启动 hadoop105 的 kafka 历程,并查看 Leader 分区情况
- [atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes=1073741824Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
复制代码
- 启动 hadoop104 的 kafka 历程,并查看 Leader 分区情况
- [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4Configs: segment.bytes=1073741824Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
复制代码
- 制止掉 hadoop103 的 kafka 历程,并查看 Leader 分区情况
- [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
- --topic atguigu1
- Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
- Configs: segment.bytes=1073741824
- Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
- Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
- Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
- Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2
复制代码 Leader 和 Follower 故障处理细节
LEO(Log End Offset): 每个副本的末了一个offset,LEO其实就是最新的 offset + 1。
HW(High Watermark):全部副本中最小的LEO。
LEO(Log End Offset):每个副本的末了一个offset,LEO其实就是最新的offset + 1
HW(High Watermark):全部副本中最小的LEO
活动调整分区副本存储
在生产环境中,每台服务器的设置和性能差别等,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。全部需要手动调整分区副本的存储。
需求:创建一个新的 topic ,4个分区,两个副本,名称为three 。将该 topic 的全部副本都存储到 broker0 和 broker1 两台服务器上。
手动调整分区副本存储的步骤如下:
- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
- hadoop102:9092 --create --partitions 4 --replication-factor 2 --
- topic three
复制代码- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
- hadoop102:9092 --describe --topic three
复制代码
- 创建副本存储筹划(全部副本都指定存储在 broker0、broker1 中)。
- [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
复制代码 输入如下内容:
- {
- "version":1,
- "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
- {"topic":"three","partition":1,"replicas":[0,1]},
- {"topic":"three","partition":2,"replicas":[1,0]},
- {"topic":"three","partition":3,"replicas":[1,0]}]
- }
复制代码- [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
- bootstrap-server hadoop102:9092 --reassignment-json-file
- increase-replication-factor.json --execute
复制代码- [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
- bootstrap-server hadoop102:9092 --reassignment-json-file
- increase-replication-factor.json --verify
复制代码- [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
- hadoop102:9092 --describe --topic three
复制代码 Leader Partition 负载平衡
正常情况下,Kafka自己会主动把Leader Partition均匀分散在各个呆板上,来保证每台呆板的读写吞吐量都是均匀的。但是假如某 些broker宕机,会导致Leader Partition过于会合在其他少部分几台broker上,这会导致少数几台broker的读写哀求压力过高,其他宕机的broker重启之后都是follower partition,读写哀求很低,造成集群负载不均衡。
参数名称描述auto.leader.rebalance.enable默认是 true。 主动 Leader Partition 平衡。生产环
境中,leader 重选举的代价比较大,可能会带来
性能影响,发起设置为 false 关闭。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 答应的不平衡的 leader
的比率。假如每个 broker 凌驾了这个值,控制器
会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔
时间。 文件存储
Topic 数据的存储机制
查看 hadoop102(大概 hadoop103、hadoop104)的/opt/module/kafka/datas/first-1 (first-0、first-2)路径上的文件
- [atguigu@hadoop104 first-1]$ ls
- 00000000000000000092.index
- 00000000000000000092.log
- 00000000000000000092.snapshot
- 00000000000000000092.timeindex
- leader-epoch-checkpoint
- partition.metadata
复制代码 直接查看 log 日志,发现是乱码。
通过工具查看 index 和 log 信息。
- [atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments
- --files ./00000000000000000000.index
- Dumping ./00000000000000000000.index
- offset: 3 position: 152
复制代码
日志存储参数设置
参数描述log.segment.bytesKafka 中 log 日志是分成一块块存储的,此设置是指 log 日志划分
成块的大小,默认值 1G。log.index.interval.bytes默认 4kb,kafka 内里每当写入了 4kb 大小的日志(.log),
然后就往 index 文件内里记录一个索引。 稀疏索引。 文件整理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- Log.retention.hours,最低优先级小时,默认7天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
那么日志一旦凌驾了设置的时间,怎么处理呢?
Kafka 中提供的日志整理策略有 delete 和 compact 两种。
- log.cleanup.policy = delete 全部数据启用阐述策略
(1) 基于时间:默认打开。以 segment 中全部记录中的最大时间戳作为该文件时间戳。
(2) 基于大小:默认关闭。凌驾设置的全部日志总大小,阐述最早的 segment 。
log.retention.bytes,默认等于-1,表现无穷大。
compact日志压缩:对于雷同 key 的差别 value 值,值保存末了一个版本。
- log.cleanup.policy = compact全部数据启动压缩策略
压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个 offset 大的 offset 对应的消息,实际上会拿到 offset 为 7 的消息,并从这个位置开始消费。
这种策略只得当特别场景,比如消息的 key 是用户 ID,value 是用户的资料,通过这种压缩策略,整个消息集里就保存了全部用户最新的资料。
Kafka 消费者
Kafka 消费方式
- pull(拉)模式:consumer 采用从 broker 中主动拉去数据。Kafka 采用这种方式。
- push(推)模式:Kafka没有采用这种方式,由于由 broker 决定消息发送速率,很难适应全部消费者的消费速率。例如推送的速率是 50m/s,Consumer1,Consumer2就来不及处理消息。
pull 模式不足之处是,假如Kafka 没有数据,消费者可能会陷入循环中,不绝返回空数据。
Kafka 消费者工作流程
消费者组原理
Consumer Group (CG):消费者组,由多个consumer组成。形成一个消费者组的条件是全部消费者的 groupid 雷同。
- 消费者组内每个消费者负责消费差别分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响。全部的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者紧张参数
参数名称描述bootstrap.servers向 Kafka 集群创建初始连接用到的 host/port 列表。key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化范例。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit默认值为 true,消费者会主动周期性地向服务器提交偏移量。auto.commit.interval.ms假如设置了 enable.auto.commit 的值为 true, 则该值定义了
消费者偏移量向 Kafka 提交的频率,默认 5s。auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在
(如,数据被删除了),该如何处理? earliest:主动重置偏
移量到最早的偏移量。 latest:默认,主动重置偏移量为最
新的偏移量。 none:假如消费组原来的(previous)偏移量
不存在,则向消费者抛异常。 anything:向消费者抛异常。offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。
该条目的值必须小于 session.timeout.ms ,也不应该高于
session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。
凌驾该值,该消费者被移除,消费者组实行再平衡。max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。凌驾该值,该
消费者被移除,消费者组实行再平衡。fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms默认 500ms。假如没有从服务器端获取到一批数据的最小字
节数。该时间到,仍旧会返回数据。fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批
消息最大的字节数。假如服务器端一批次的数据大于该值
(50m)仍旧可以拉取返来这批数据,因此,这不是一个绝
对最大值。一批次的大小受 message.max.bytes (broker
config)or max.message.bytes (topic config)影响。max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 offset 位移
offset 的默认维护位置
主动提交offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了主动提交offset的功能。
主动提交offset的相干参数:
- enable.auto.commit:是否开启主动提交offset功能,默认是true
- auto.commit.interval.ms:主动提交offset的时间间隔,默认是5s
参数名称描述enable.auto.commit默认值为 true,消费者会主动周期性地向服务器提交偏移量。auto.commit.interval.ms假如设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 手动提交offset
虽然主动提交offset非常简单比那边,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。一次 Kafka 还提供了手动提交 offset 的API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和commitAsync(异步提交)。两者的雷同点是,都会将本次提交的一批数据最高的偏移量提交;差别点是,同步提交阻塞当前线程,不绝到提交乐成,而且会主动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等候offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset哀求后,就开始消费下一批数据了。
指定Offset消费
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
(1)earliest:主动将偏移量重置为最早的偏移量,–from-beginning。
(2)latest(默认值):主动将偏移量重置为最新偏移量。
(3)none:假如未找到消费者组的先前偏移量,则向消费者抛出异常。
Kafka-Kraft模式
Kafka-Kraft架构
左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 举行 Kafka 集群管理。右图为 kraft 模式架构(实行性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接举行 Kafka 集群管理。
如许做的利益有以下几个:
- Kafka 不再依赖外部框架,而是能够独立运行;
- controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
- 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写本领限制;
- controller 不再动态选举,而是由设置文件规定。如许我们可以有针对性的加强controller 节点的设置,而不是像以前一样对随机 controller 节点的高负载束手无策。
Go kafka
Kafka简介
- Kafka是分布式的:其全部的构件borker(服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
- 可以举行分区:每一个分区都是一个序次的、不可变的消息队列, 而且可以持续的添加。
- 高吞吐量。
Kafka的结构
Producer
Producer即生产者,消息的产生者,是消息的⼊口。
kafka cluster
kafka集群,一台或多台服务器组成
Broker
Broker是指部署了Kafka实例的服务器节点。
Topic
消息的主题,可以明白为消息的分类,kafka的数据就保存在topic。在每个broker上 都可以创建多个topic。实际应用中通常是一个业务线建一个topic。
Partition
Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的⽂件夹!
Replication
每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时间会选择一个备胎(Follower)上位,成为Leader。
在kafka中默认副本的最大数量是10 个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的呆板,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Consumer
消费者,即消息的消费方,是消息的出口。
Consumer Group
我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic的差别分区的数据,这也是为了提高kafka的吞吐量!
Kafka⼯作流程
- ⽣产者从Kafka集群获取分区leader信息
- ⽣产者将消息发送给leader
- leader将消息写入本地磁盘
- follower从leader拉取消息数据
- follower将消息写入本地磁盘后向leader发送ACK
- leader收到全部的follower的ACK之后向生产者发送ACK
选择partition的原则(面试重点)
某个topic有多个partition,producer⼜怎么知道该将数据发往哪个partition?
- 直接指定:写入的时间可以指定需要写入的partition,假如有指定,则写入对应的partition。
- hash:假如没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
- 轮询:假如既没指定partition,又没有设置key,则会采用轮询⽅式,即每次取一小段时间的数据写入某个partition,下一小段的时间写入下一个partition。
ACK应答机制(面试重点)
producer在向kafka写入消息的时间,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置 的值为 0,1,all
- 0:代表producer往集群发送数据不需要比及集群的返回,不确保消息发送乐成。安全性最低但是效 率最高。
- 1:代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送乐成。
- all:代表producer往集群发送数据需要全部的follower都完成从leader的同步才会发送下一条,确保 leader发送乐成和全部的副本都完成备份。安全性最⾼高,但是效率最低。
假如往不存在的topic写数据,kafka会⾃动创建topic,partition和replication的数目 默认设置都是1。
Topic和数据⽇志
topic 是同⼀种别的消息记录(record)的集合。在Kafka中,⼀个主题通常有多个订阅者。对于每个
主题,Kafka集群维护了⼀个分区数据⽇志⽂件结构如下:
- 每个partition都是⼀个有序而且不可变的消息记录集合。
- 当新的数据写⼊时,就被追加到partition的末尾。
- 在每个partition中,每条消息都会被分配⼀个序次的唯⼀标识,这个标识被称为offset,即偏移 量。Kafka只保证在同⼀个partition内部消息是有序的,在差别partition之间,并不能保证消息有序。
Kafka可以设置⼀个保存期限,⽤来标识⽇志会在Kafka集群内保存多⻓时间。Kafka集群会保存在保存期限内全部被发布的消息,不管这些消息是否被消费过。
⽐如保存期限设置为两天,那么数据被发布到 Kafka集群的两天以内,全部的这些数据都可以被消费。当凌驾两天,这些数据将会被清空,以便为后 续的数据腾出空间。
由于Kafka会将数据进⾏持久化存储(即写⼊到硬盘上),所以保存的数据⼤⼩可 以设置为⼀个⽐较⼤的值。
Partition结构
Partition在服务器上的表现形式就是⼀个⼀个的⽂件夹,每个partition的⽂件夹下⾯会有多组segment ⽂件,每组segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三个⽂件,此中 .log ⽂件就是实际存储message的地⽅,⽽ .index 和 .timeindex ⽂件为索引⽂件,⽤于检索消息。
消费数据
- 多个消费者实例可以组成⼀个消费者组,并⽤⼀个标签来标识这个消费者组。⼀个消费者组中的差别消费者实例可以运⾏在差别的历程甚⾄差别的服务器上。
- 假如全部的消费者实例都在差别的消费者组,那么每⼀条消息记录会被⼴播到每⼀个消费者实例。
- 在同⼀个消费者组中,每个消费者实例可以消费多个分区,但是每个分区最多只能被消费者组中的⼀个实例消费。
kafka环境搭建
kafka环境基于zookeeper,zookeeper环境基于JAVA-JDK。
!!!新版本的kafka自带zookeeper,可以不手动安装。
java环境变量
https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html
安装kafka
http://kafka.apache.org/downloads
- 1.打开config目录下的server.properties文件
- 2.修改log.dirs=F:/tmp/kafka-logs //日志存放
- 3.打开config目录下的zookeeper.properties文件
- 4.修改dataDir=F:/tmp/zookeeper //数据存放
- 启动:
- 先执行:bin\windows\zookeeper-server-start.bat config\zookeeper.properties
- 再执行:bin\windows\kafka-server-start.bat config\server.properties
复制代码 zookeeper:
kafka:
GO操纵Kafka
sarama操纵kafka
依赖安装
go get github.com/Shopify/sarama
windows: mod文件中手动加 require github.com/shopify/sarama v1.19.0
Go语言中连接kafka使用第三方库:github.com/IBM/sarama。
go get github.com/IBM/sarama这个库已经由Shopify转给了IBM。
sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:
- # github.com/DataDog/zstd
- exec: "gcc":executable file not found in %PATH%
复制代码 所以在Windows平台请使用v1.19版本的sarama。
连接kafka发送消息
- package main
- import (
- "fmt"
- "github.com/IBM/sarama"
- )
- // 基于sarama第三方库开发的kafka client
- func main() {
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
- config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
- config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
- // 构造一个消息
- msg := &sarama.ProducerMessage{}
- msg.Topic = "web_log"
- msg.Value = sarama.StringEncoder("this is a test log")
- // 连接kafka
- client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
- if err != nil {
- fmt.Println("producer closed, err:", err)
- return
- }
- defer client.Close()
- // 发送消息
- pid, offset, err := client.SendMessage(msg)
- if err != nil {
- fmt.Println("send msg failed, err:", err)
- return
- }
- fmt.Printf("pid:%v offset:%v\n", pid, offset)
- }
复制代码 连接kafka消费消息
- package main
- import (
- "fmt"
- "github.com/IBM/sarama"
- )
- // kafka consumer
- func main() {
- consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
- if err != nil {
- fmt.Printf("fail to start consumer, err:%v\n", err)
- return
- }
- partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
- if err != nil {
- fmt.Printf("fail to get list of partition:err%v\n", err)
- return
- }
- fmt.Println(partitionList)
- for partition := range partitionList { // 遍历所有的分区
- // 针对每个分区创建一个对应的分区消费者
- pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
- if err != nil {
- fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
- return
- }
- defer pc.AsyncClose()
- // 异步从每个分区消费信息
- go func(sarama.PartitionConsumer) {
- for msg := range pc.Messages() {
- fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
- }
- }(pc)
- }
- }
复制代码 kafka-go操纵kafka
- 相较于sarama, kafka-go 更简单、更易用。
- segmentio/kafka-go 是纯Go实现,提供了与kafka交互的低级别和高级别两套API,同时也支持Context。
- 此外社区中另一个比较常用的confluentinc/confluent-kafka-go,它是一个基于cgo的librdkafka包装,在项目中使用它会引入对C库的依赖。
准备Kafka环境
以下docker-compose.yml文件用来搭建一套单节点zookeeper和单节点kafka环境,而且在8080端口提供kafka-ui管理界面。
- version: '2.1'
- services:
- zoo1:
- image: confluentinc/cp-zookeeper:7.3.2
- hostname: zoo1
- container_name: zoo1
- ports:
- - "2181:2181"
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ZOOKEEPER_SERVER_ID: 1
- ZOOKEEPER_SERVERS: zoo1:2888:3888
- kafka1:
- image: confluentinc/cp-kafka:7.3.2
- hostname: kafka1
- container_name: kafka1
- ports:
- - "9092:9092"
- - "29092:29092"
- - "9999:9999"
- environment:
- KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
- KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
- KAFKA_BROKER_ID: 1
- KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
- KAFKA_JMX_PORT: 9999
- KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
- KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
- depends_on:
- - zoo1
- kafka-ui:
- container_name: kafka-ui
- image: provectuslabs/kafka-ui:latest
- ports:
- - 8080:8080
- depends_on:
- - kafka1
- environment:
- DYNAMIC_CONFIG_ENABLED: "TRUE"
复制代码 将上述docker-compose.yml文件在本地保存,在同一目录下实行以下下令启动容器。
容器启动后,使用浏览器打开127.0.0.1:8080 即可看到如下kafka-ui界面。
安装kafka-go
实行以下下令下载 kafka-go依赖。
- go get github.com/segmentio/kafka-go
复制代码 留意:kafka-go 需要 Go 1.15或更高版本。
kafka-go 提供了两套与Kafka交互的API。
低级别( low-level):基于与 Kafka 服务器的原始网络连接实现。
高级别(high-level):对于常用读写操纵封装了一套更易用的API。
通常发起直接使用高级别的交互API。
Connection
Conn 范例是 kafka-go 包的核心。它代表与 Kafka broker之间的连接。基于它实现了一套与Kafka交互的低级别 API。
发送消息
下面是连接至Kafka之后,使用Conn发送消息的代码示例。
- // writeByConn 基于Conn发送消息
- func writeByConn() {
- topic := "my-topic"
- partition := 0
- // 连接至Kafka集群的Leader节点
- conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
- if err != nil {
- log.Fatal("failed to dial leader:", err)
- }
- // 设置发送消息的超时时间
- conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
- // 发送消息
- _, err = conn.WriteMessages(
- kafka.Message{Value: []byte("one!")},
- kafka.Message{Value: []byte("two!")},
- kafka.Message{Value: []byte("three!")},
- )
- if err != nil {
- log.Fatal("failed to write messages:", err)
- }
- // 关闭连接
- if err := conn.Close(); err != nil {
- log.Fatal("failed to close writer:", err)
- }
- }
复制代码 消费消息
- // readByConn 连接至kafka后接收消息
- func readByConn() {
- // 指定要连接的topic和partition
- topic := "my-topic"
- partition := 0
- // 连接至Kafka的leader节点
- conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
- if err != nil {
- log.Fatal("failed to dial leader:", err)
- }
- // 设置读取超时时间
- conn.SetReadDeadline(time.Now().Add(10 * time.Second))
- // 读取一批消息,得到的batch是一系列消息的迭代器
- batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
- // 遍历读取消息
- b := make([]byte, 10e3) // 10KB max per message
- for {
- n, err := batch.Read(b)
- if err != nil {
- break
- }
- fmt.Println(string(b[:n]))
- }
- // 关闭batch
- if err := batch.Close(); err != nil {
- log.Fatal("failed to close batch:", err)
- }
- // 关闭连接
- if err := conn.Close(); err != nil {
- log.Fatal("failed to close connection:", err)
- }
- }
复制代码 使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer(上述代码中的b),假如传入的buffer太小(消息装不下)就会返回io.ErrShortBuffer错误。
假如不思量内存分配的效率标题,也可以按以下代码使用batch.ReadMessage读取消息。
- for {
- msg, err := batch.ReadMessage()
- if err != nil {
- break
- }
- fmt.Println(string(msg.Value))
- }
复制代码 创建topic
当Kafka关闭主动创建topic的设置时,可按如下方式创建topic。
- // createTopicByConn 创建topic
- func createTopicByConn() {
- // 指定要创建的topic名称
- topic := "my-topic"
- // 连接至任意kafka节点
- conn, err := kafka.Dial("tcp", "localhost:9092")
- if err != nil {
- panic(err.Error())
- }
- defer conn.Close()
- // 获取当前控制节点信息
- controller, err := conn.Controller()
- if err != nil {
- panic(err.Error())
- }
- var controllerConn *kafka.Conn
- // 连接至leader节点
- controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
- if err != nil {
- panic(err.Error())
- }
- defer controllerConn.Close()
- topicConfigs := []kafka.TopicConfig{
- {
- Topic: topic,
- NumPartitions: 1,
- ReplicationFactor: 1,
- },
- }
- // 创建topic
- err = controllerConn.CreateTopics(topicConfigs...)
- if err != nil {
- panic(err.Error())
- }
- }
复制代码 通过非leader节点连接leader节点
下面的示例代码演示了如何通过已有的非leader节点的Conn,连接至 leader节点。
- conn, err := kafka.Dial("tcp", "localhost:9092")
- if err != nil {
- panic(err.Error())
- }
- defer conn.Close()
- // 获取当前控制节点信息
- controller, err := conn.Controller()
- if err != nil {
- panic(err.Error())
- }
- var connLeader *kafka.Conn
- connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
- if err != nil {
- panic(err.Error())
- }
- defer connLeader.Close()
复制代码 获取topic列表
- conn, err := kafka.Dial("tcp", "localhost:9092")
- if err != nil {
- panic(err.Error())
- }
- defer conn.Close()
- partitions, err := conn.ReadPartitions()
- if err != nil {
- panic(err.Error())
- }
- m := map[string]struct{}{}
- // 遍历所有分区取topic
- for _, p := range partitions {
- m[p.Topic] = struct{}{}
- }
- for k := range m {
- fmt.Println(k)
- }
复制代码 Reader
Reader是由 kafka-go 包提供的另一个概念,对于从单个主题-分区(topic-partition)消费消息这种典范场景,使用它能够简化代码。Reader 还实现了主动重连和偏移量管理,并支持使用 Context 支持异步取消和超时的 API。
留意: 当历程退出时,必须在 Reader 上调用 Close() 。Kafka服务器需要一个优雅的断开连接来阻止它继承实验向已连接的客户端发送消息。假如历程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)终止,那么下面给出的示例不会调用 Close()。当同一topic上有新Reader连接时,可能导致耽误(例如,新历程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在历程关闭时关闭Reader。
消费消息
下面的代码演示了如何使用Reader连接至Kafka消费消息。
- // readByReader 通过Reader接收消息
- func readByReader() {
- // 创建Reader
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
- Topic: "topic-A",
- Partition: 0,
- MaxBytes: 10e6, // 10MB
- })
- r.SetOffset(42) // 设置Offset
- // 接收消息
- for {
- m, err := r.ReadMessage(context.Background())
- if err != nil {
- break
- }
- fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
- }
- // 程序退出前关闭Reader
- if err := r.Close(); err != nil {
- log.Fatal("failed to close reader:", err)
- }
- }
复制代码 消费者组
kafka-go支持消费者组,包括broker管理的offset。要启用消费者组,只需在 ReaderConfig 中指定 GroupID。
使用消费者组时,ReadMessage 会主动提交偏移量。
- // 创建一个reader,指定GroupID,从 topic-A 消费消息
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
- GroupID: "consumer-group-id", // 指定消费者组id
- Topic: "topic-A",
- MaxBytes: 10e6, // 10MB
- })
- // 接收消息
- for {
- m, err := r.ReadMessage(context.Background())
- if err != nil {
- break
- }
- fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
- }
- // 程序退出前关闭Reader
- if err := r.Close(); err != nil {
- log.Fatal("failed to close reader:", err)
- }
复制代码 在使用消费者组时会有以下限制:
- (*Reader).SetOffset 当设置了GroupID时会返回错误
- (*Reader).Offset 当设置了GroupID时会永远返回 -1
- (*Reader).Lag 当设置了GroupID时会永远返回 -1
- (*Reader).ReadLag 当设置了GroupID时会返回错误
- (*Reader).Stats 当设置了GroupID时会返回一个-1的分区
显式提交
kafka-go 也支持显式提交。当需要显式提交时不要调用 ReadMessage,而是调用 FetchMessage获取消息,然后调用 CommitMessages 显式提交。
- ctx := context.Background()
- for {
- // 获取消息
- m, err := r.FetchMessage(ctx)
- if err != nil {
- break
- }
- // 处理消息
- fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
- // 显式提交
- if err := r.CommitMessages(ctx, m); err != nil {
- log.Fatal("failed to commit messages:", err)
- }
- }
复制代码 在消费者组中提交消息时,具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如,假如通过调用 FetchMessage 获取了单个分区的偏移量为 1、2 和 3 的消息,则使用偏移量为3的消息调用 CommitMessages 也将导致该分区的偏移量为 1 和 2 的消息被提交。
管理提交间隔
默认情况下,调用CommitMessages将同步向Kafka提交偏移量。为了提高性能,可以在ReaderConfig中设置CommitInterval来定期向Kafka提交偏移。
- // 创建一个reader从 topic-A 消费消息
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
- GroupID: "consumer-group-id",
- Topic: "topic-A",
- MaxBytes: 10e6, // 10MB
- CommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
- })
复制代码 Writer
向Kafka发送消息,除了使用基于Conn的低级API,kafka-go包还提供了更高级别的 Writer 范例。大多数情况下使用Writer即可满意条件,它支持以下特性。
- 对错误举行主动重试和重新连接。
- 在可用分区之间可设置的消息分布。
- 向Kafka同步或异步写入消息。
- 使用Context的异步取消。
- 关闭时扫除挂起的消息以支持正常关闭。
- 在发布消息之前主动创建不存在的topic。
发送消息
- // 创建一个writer 向topic-A发送消息
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- Balancer: &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
- RequiredAcks: kafka.RequireAll, // ack模式
- Async: true, // 异步
- }
- err := w.WriteMessages(context.Background(),
- kafka.Message{
- Key: []byte("Key-A"),
- Value: []byte("Hello World!"),
- },
- kafka.Message{
- Key: []byte("Key-B"),
- Value: []byte("One!"),
- },
- kafka.Message{
- Key: []byte("Key-C"),
- Value: []byte("Two!"),
- },
- )
- if err != nil {
- log.Fatal("failed to write messages:", err)
- }
- if err := w.Close(); err != nil {
- log.Fatal("failed to close writer:", err)
- }
复制代码 创建不存在的topic
假如给Writer设置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会主动创建topic。
- w := &Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- AllowAutoTopicCreation: true, // 自动创建topic
- }
- messages := []kafka.Message{
- {
- Key: []byte("Key-A"),
- Value: []byte("Hello World!"),
- },
- {
- Key: []byte("Key-B"),
- Value: []byte("One!"),
- },
- {
- Key: []byte("Key-C"),
- Value: []byte("Two!"),
- },
- }
- var err error
- const retries = 3
- // 重试3次
- for i := 0; i < retries; i++ {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- err = w.WriteMessages(ctx, messages...)
- if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
- time.Sleep(time.Millisecond * 250)
- continue
- }
- if err != nil {
- log.Fatalf("unexpected error %v", err)
- }
- break
- }
- // 关闭Writer
- if err := w.Close(); err != nil {
- log.Fatal("failed to close writer:", err)
- }
复制代码 写入多个topic
通常,WriterConfig.Topic用于初始化单个topic的Writer。通已往掉WriterConfig中的Topic设置,分别设置每条消息的message.topic,可以实现将消息发送至多个topic。
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- // 注意: 当此处不设置Topic时,后续的每条消息都需要指定Topic
- Balancer: &kafka.LeastBytes{},
- }
- err := w.WriteMessages(context.Background(),
- // 注意: 每条消息都需要指定一个 Topic, 否则就会报错
- kafka.Message{
- Topic: "topic-A",
- Key: []byte("Key-A"),
- Value: []byte("Hello World!"),
- },
- kafka.Message{
- Topic: "topic-B",
- Key: []byte("Key-B"),
- Value: []byte("One!"),
- },
- kafka.Message{
- Topic: "topic-C",
- Key: []byte("Key-C"),
- Value: []byte("Two!"),
- },
- )
- if err != nil {
- log.Fatal("failed to write messages:", err)
- }
- if err := w.Close(); err != nil {
- log.Fatal("failed to close writer:", err)
- }
复制代码 留意:Writer中的Topic和Message中的Topic是互斥的,同一时刻有且只能设置一处。
其他设置
TLS
对于根本的 Conn 范例或在 Reader/Writer 设置中,可以在Dialer中设置TLS选项。假如 TLS 字段为空,则它将不启用TLS 连接。
留意:不在Conn/Reder/Writer上设置TLS,连接到启用TLS的Kafka集群,可能会出现io.ErrUnexpectedEOF错误。
Connection
- dialer := &kafka.Dialer{
- Timeout: 10 * time.Second,
- DualStack: true,
- TLS: &tls.Config{...tls config...}, // 指定TLS配置
- }
- conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
复制代码 Reader
- dialer := &kafka.Dialer{
- Timeout: 10 * time.Second,
- DualStack: true,
- TLS: &tls.Config{...tls config...}, // 指定TLS配置
- }
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
- GroupID: "consumer-group-id",
- Topic: "topic-A",
- Dialer: dialer,
- })
复制代码 Writer
创建Writer时可以按如下方式指定TLS设置。
- w := kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- Balancer: &kafka.Hash{},
- Transport: &kafka.Transport{
- TLS: &tls.Config{}, // 指定TLS配置
- },
- }
复制代码 SASL
可以在Dialer上指定一个选项以使用SASL身份验证。Dialer可以直接用来打开一个 Conn,也可以通过它们各自的设置传递给一个 Reader 或 Writer。假如 SASLMechanism字段为 nil,则不会使用 SASL 举行身份验证。
SASL 身份验证范例
明文
- mechanism := plain.Mechanism{
- Username: "username",
- Password: "password",
- }
复制代码 SCRAM
- mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
- if err != nil {
- panic(err)
- }
复制代码 Connection
- mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
- if err != nil {
- panic(err)
- }
- dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism,}conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
复制代码 Reader
- mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
- if err != nil {
- panic(err)
- }
- dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism,}r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer,})
复制代码 Writer
- mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
- if err != nil {
- panic(err)
- }
- // Transport 负责管理连接池和其他资源,// 通常最好的使用方式是创建后在应用程序中共享使用它们。sharedTransport := &kafka.Transport{ SASL: mechanism,}w := kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: sharedTransport,}
复制代码 Client
- mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
- if err != nil {
- panic(err)
- }
- // Transport 负责管理连接池和其他资源,// 通常最好的使用方式是创建后在应用程序中共享使用它们。sharedTransport := &kafka.Transport{ SASL: mechanism,}client := &kafka.Client{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Timeout: 10 * time.Second, Transport: sharedTransport,}
复制代码 Balancer
kafka-go实现了多种负载均衡策略。特别是当你从其他Kafka库迁徙过来时,你可以按如下说明选择合适的Balancer实现。
Sarama
假如从 sarama 切换过来,而且需要/盼望使用雷同的算法举行消息分区,则可以使用kafka.Hash或kafka.ReferenceHash。
- kafka.Hash = sarama.NewHashPartitioner
- kafka.ReferenceHash = sarama.NewReferenceHashPartitioner
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- Balancer: &kafka.Hash{},
- }
复制代码 librdkafka和confluent-kafka-go:kafka.CRC32Balancer与librdkafka默认的consistent_random策略表现同等。
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- Balancer: kafka.CRC32Balancer{},
- }
复制代码 Java:使用kafka.Murmur2Balancer可以得到与默认Java客户端雷同的策略。
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- Balancer: kafka.Murmur2Balancer{},
- }
复制代码 Compression
可以通过设置Compression字段在Writer上启用压缩:
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
- Topic: "topic-A",
- Compression: kafka.Snappy,
- }
复制代码 Reader 将通过检查消息属性来确定消费的消息是否被压缩。
Logging
想要记录Reader/Writer范例的操纵,可以在创建时设置日志记录器。
kafka-go中的Logger是一个接口范例。
- type Logger interface {
- Printf(string, ...interface{})
- }
复制代码 而且提供了一个LoggerFunc范例,帮我们实现了Logger接口。
- type LoggerFunc func(string, ...interface{})
- func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
复制代码 Reader:借助kafka.LoggerFunc我们可以自定义一个Logger。
- // 自定义一个Logger
- func logf(msg string, a ...interface{}) {
- fmt.Printf(msg, a...)
- fmt.Println()
- }
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
- Topic: "q1mi-topic",
- Partition: 0,
- Logger: kafka.LoggerFunc(logf),
- ErrorLogger: kafka.LoggerFunc(logf),
- })
复制代码 Writer:也可以直接使用第三方日志库,例如下面示例代码中使用了zap日志库。
- w := &kafka.Writer{
- Addr: kafka.TCP("localhost:9092"),
- Topic: "q1mi-topic",
- Logger: kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
- ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
- }
复制代码 FAQ
Kafka中的消费者组(Consumer Group)的明白
Kafka中的消费者组(Consumer Group)是一个非常紧张的概念,涉及到消息如何被消费和分发。让我们来具体表明一下。
消费者组的概念
- 消费者组 是Kafka中一组协作消费同一个Topic的消费者。组内的消费者共享同一个Group ID。
- Partition 是Kafka中Topic的根本单位,每个Topic可以有一个或多个分区(Partition)。
- 消费者组的工作方式:
- Kafka确保每条消息在一个消费者组内只会被消费一次。
- 消费者组中的差别消费者可以消费Topic的差别分区(Partition)。
消费者组内的消息分配
假设你有一个Topic names,它有多个分区,而且你创建了一个消费者组group1,内里有3个消费者:C1、C2 和 C3。消息的消费方式将取决于Topic中的分区数目。
- 分区数目少于消费者数目:假如names Topic只有2个分区,而你有3个消费者,Kafka会将每个分区分配给一个消费者,剩余的消费者则不会接收到消息。比如C1消费分区1的消息,C2消费分区2的消息,而C3则不会分配到任何分区。
- 分区数目等于消费者数目:假如names Topic有3个分区,每个消费者会分到一个分区,每个分区的消息只会被分配给一个消费者。例如,C1消费分区1,C2消费分区2,C3消费分区3。
- 分区数目多于消费者数目:比如,names Topic有6个分区,而你有3个消费者,这时每个消费者会被分配到多个分区。例如,C1消费分区1和分区4,C2消费分区2和分区5,C3消费分区3和分区6。
消息分发规则
- 每个分区中的消息 会被分配到对应的消费者,分区内的消息序次是被保证的。
- 每条消息 只会被消费者组中的一个消费者消费(在同一个组内)。
这意味着,在同一个消费者组内,每条消息只会被一个消费者处理,消费后消息即从队列中移除。
- 消费者组内的每个消费者通太过区分摊消息。
- 一个Topic中的每个分区会被消费者组中的一个消费者消费。
- 组内的全部消费者共同处理全部分区的消息。
如许设计的利益是,你可以根据消费者组的数目灵活地调整并行处理的本领,同时还能保证在同一个消费者组内消息只被消费一次。
让我们通过一个具体的例子来深入明白Kafka消费者组的概念及其工作方式。
实际例子
假设有一个Topic叫做 names,它有6个分区(Partition),用来存储用户的名字。现在,你有一个消费者组 group1,而且在这个组中创建了3个消费者 C1、C2 和 C3。
场景1:消费者组内的消费者数目小于分区数目
- Topic: names
- 分区数目: 6
- 消费者组: group1
- 消费者数目: 3 (C1, C2, C3)
在这种情况下,Kafka会将6个分区分配给这3个消费者。假设分配方式如下:
- C1 消费分区 P1 和 P4
- C2 消费分区 P2 和 P5
- C3 消费分区 P3 和 P6
因此,分区 P1 中的全部消息只会被消费者 C1 消费,P2 中的消息只会被消费者 C2 消费,依此类推。每条消息在消费者组 group1 中只会被一个消费者消费。
场景2:增加消费者以提高处理本领
假设随着业务发展,你需要更快地处理这些消息。你可以在group1中再增加3个消费者,使总消费者数目达到6个。
- 消费者数目: 6 (C1, C2, C3, C4, C5, C6)
Kafka会重新分配分区,现在每个消费者消费一个分区:
- C1 消费 P1
- C2 消费 P2
- C3 消费 P3
- C4 消费 P4
- C5 消费 P5
- C6 消费 P6
如许一来,消息处理本领就提升了,由于现在有6个消费者并行处理这6个分区中的消息。
场景3:消费者数目多于分区数目
假如你增加到8个消费者,而 names 这个Topic仍旧只有6个分区,Kafka会把这6个分区分配给前6个消费者,而剩余的两个消费者 C7 和 C8 不会接收到任何消息,处于空闲状态。
- “根据消费者组的数目灵活地调整并行处理的本领”:
- 假如需要提高消息处理速率,可以增加消费者组中的消费者数目,让更多消费者并行处理消息。
- 但是消费者数目不应该超太过区数目,否则有些消费者会闲置。
- “一个Topic中的每个分区会被消费者组中的一个消费者消费”:
- 在同一个消费者组内,每个分区只能被一个消费者消费,如许保证了分区内消息的序次性和唯一性。
- 差别消费者组可以同时消费同一个Topic的消息,但每个组内的消息是独立消费的。
这个设计提供了灵活性,答应你根据需求调整消费者数目以达到所需的吞吐量和并行处理本领。
固然可以。下面是一个简单的Go示例代码,展示如何使用 confluent-kafka-go 库(这是一个流行的Kafka客户端库)来创建Kafka消费者,而且如何增加消费者来提升处理本领。
示例代码
起首,你需要安装 confluent-kafka-go 库。你可以使用以下下令安装它:
- go get github.com/confluentinc/confluent-kafka-go/kafka
复制代码 以下是一个简单的Go程序,演示了如何创建一个Kafka消费者,而且增加消费者来处理消息。
1. 单个消费者
Kafka中的消费者组是通过消费者设置中的 group.id 参数来定义的。消费者组的概念是Kafka客户端的设置的一部分,而不是需要显式创建的实体。Kafka会主动管理消费者组的协调和分配。
这是一个单消费者的简单示例:
- package main
- import (
- "log"
- "github.com/confluentinc/confluent-kafka-go/kafka"
- )
- func main() {
- // 创建Kafka消费者配置
- config := &kafka.ConfigMap{
- "bootstrap.servers": "localhost:9092",
- "group.id": "example-group",
- "auto.offset.reset": "earliest",
- }
- // 创建Kafka消费者
- consumer, err := kafka.NewConsumer(config)
- if err != nil {
- log.Fatalf("Failed to create consumer: %s", err)
- }
- // 订阅Topic
- consumer.Subscribe("names", nil)
- // 消费消息
- for {
- msg, err := consumer.ReadMessage(-1)
- if err != nil {
- log.Printf("Consumer error: %v", err)
- continue
- }
- log.Printf("Message: %s", string(msg.Value))
- }
- }
复制代码 2. 增加消费者
要增加消费者,你可以启动多个实例的消费者程序,每个实例都使用雷同的 group.id。Kafka会主动负载均衡各个消费者对Topic分区的消费。
以下是启动多个消费者的示例:
假设你已经将上述代码保存为 consumer.go。你可以在差别的终端中运行多个消费者实例,模拟增加消费者:
你可以在差别的终端窗口中运行这个下令多次。例如:
- # 终端 1go run consumer.go
- # 终端 2go run consumer.go
- # 终端 3go run consumer.go
复制代码 每个运行的消费者实例都会加入到 example-group 消费者组中,Kafka会将Topic的分区分配给这些消费者,确保负载均衡。
留意事项
- 确保你已经正确设置了Kafka集群和Topic,而且它们正在运行。
- 增加消费者的实际效果取决于Topic的分区数目。消费者的数目不应超太过区的数目,否则会有消费者闲置。
- 每个消费者实例会从Kafka中读取消息,处理完后Kafka会将消息标记为已消费,并从Topic中移除。
如许,你可以通过启动多个消费者实例来提高消息处理的并发本领。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |