农民 发表于 2024-8-6 06:14:30

Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

https://img-blog.csdnimg.cn/direct/75a99c85beb34855a282bb0e0c6ac196.jpeg#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
本文档参看的视频是:


[*]尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
[*]黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
[*]小朋友也可以懂的Kafka入门教程,还不快来学
本文档参看的文档是:


[*]尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!
参考:


[*] https://mp.weixin.qq.com/s/uI2zkf74KXsWaCOplX1Ing
[*] https://mp.weixin.qq.com/s/oxb2Ezn4K2jMPzubqFULuw
[*] https://mp.weixin.qq.com/s/tTKXb6On5bfJGjcvn9IpbA
[*] https://mp.weixin.qq.com/s/CCAP8n0mTCrUT-NzOAacCg
[*] https://blog.csdn.net/qq_43745578/article/details/135931980
在这之前大家可以看我以下几篇文章,循序渐进:
❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2. Kafka基础

Kafka借鉴了JMS规范的头脑,但是却并没有完全遵循JMS规范,因此从设计原理上,Kafka的内部也会有许多用于数据传输的组件对象,这些组件对象之间会形成关联,组合在一起实现高效的数据传输。所以接下来,我们就按照数据流转的过程详细讲一讲Kafka中的基础概念以及核心组件。
https://img-blog.csdnimg.cn/direct/620dcd57d1644226b1b15d5fd32e19f6.png#pic_center
   大量的生产者,大量的消耗者,可能对于单一的Kafka造成 吞吐量过大,IO热点题目,那么单一的Kafka就有可能作为整个系统的性能瓶颈,降低可用性和稳固性,而且假如Down掉了,那就不是一个好的系统方案。
有效策略:
https://img-blog.csdnimg.cn/direct/8b04de60bf5c43c3a4ec0bec309ad296.png#pic_center
https://img-blog.csdnimg.cn/direct/6b6e1ac0ce2148c9b7be4a33e8502177.png#pic_center
https://img-blog.csdnimg.cn/direct/3babf5a38cdb4c1aa97875159dbe1df1.png#pic_center
https://img-blog.csdnimg.cn/direct/c40bc29a54ef491c922849067e6a8ed6.png#pic_center


[*]为了数据可靠性,可以将数据文件进行备份,但是Kafka没有备份的概念
[*]Kafka中称之为副本,多个副本同时只能有一个提供数据的读写操作
[*]具有读写本领的副本称之为Leader副本,作为备份的副本称之为Follower副本
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
https://img-blog.csdnimg.cn/direct/0d1a4bd688d34a8fb342c2273b180002.png#pic_center
假如管理者出现了题目,有2种解决方案:


[*]给管理者增长备份
https://img-blog.csdnimg.cn/direct/864707d8a4d84449b5c74fa8615b0622.png
[*]任何一个节点都可以做备份
https://img-blog.csdnimg.cn/direct/750a57a89c7142f39c4fad3ccbb3bf7c.png
那都是Standby,假如我的master down掉了,如何确定哪一个上位
谁来控制选举呢?ZooKeeper!!!
SO!!!!
https://img-blog.csdnimg.cn/direct/fc7385f4341e4660af5ad146412cabfe.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.1 集群部署

生产环境都是采用linux系统搭建服务器集群,但是我们的重点是在于学习kafka的基础概念和核心组件,所以这里我们搭建一个简单易用的windows集群方便大家的学习和练习。Linux集群的搭建会在第3章给大家进行讲解。
2.1.1 解压文件

(1) 在磁盘根目录创建文件夹cluster,文件夹名称不要太长
https://img-blog.csdnimg.cn/direct/f4aa5cda4cdf4d938f24092a60b917c1.png#pic_center
(2) 将kafka安装包kafka-3.6.1-src.tgz解压缩到kafka文件夹
https://img-blog.csdnimg.cn/direct/fc3f03f620ba4ebf8e440369816cb02f.png#pic_center
2.1.2 安装ZooKeeper

(1) 修改文件夹名为kafka-zookeeper
因为kafka内置了ZooKeeper软件,所以此处将解压缩的文件作为ZooKeeper软件使用。
https://img-blog.csdnimg.cn/direct/f3937a1b5d1041b1bdc0302fa9b15e5b.png#pic_center
(2) 修改config/zookeeper.properties文件
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
# 此处注意,如果文件目录不存在,会自动创建
dataDir=E:/cluster/kafka-zookeeper/data
# the port at which the clients will connect
# ZooKeeper默认端口为2181
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
https://img-blog.csdnimg.cn/direct/074553201bbf442090f4673005ec39fa.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.1.3 安装Kafka

(1) 将上面解压缩的文件复制一份,改名为kafka-node-1
https://img-blog.csdnimg.cn/direct/07fb27a8998a4b4eaff293308449e566.png#pic_center
(2) 修改config/server.properties设置文件
broker.id=1

############################# Socket Server Settings #############################
# 监听器 9091为本地端口,如果冲突,请重新指定
listeners=PLAINTEXT://:9091

num.network.threads=3

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################
log.dirs=E:/cluster/kafka-node-1/data
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings#############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
log.retention.hours=168


log.segment.bytes=190
log.flush.interval.messages=2
log.index.interval.bytes=17

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# ZooKeeper软件连接地址,2181为默认的ZK端口号 /kafka 为ZK的管理节点
zookeeper.connect=localhost:2181/kafka
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
https://img-blog.csdnimg.cn/direct/19390ff7f8f04284a430f64e8185562e.png#pic_center
https://img-blog.csdnimg.cn/direct/30fc120b11844adda2d292c74bec036d.png#pic_center
https://img-blog.csdnimg.cn/direct/848b6034f5544631bf7726b2d32e0ed0.png#pic_center
(3) 将kafka-node-1文件夹复制两份,改名为kafka-node-2,kafka-node-3
https://img-blog.csdnimg.cn/direct/e627dde353e542beb792fa32a2467c4b.png#pic_center
(4) 分别修改kafka-node-2,kafka-node-3文件夹中的设置文件server.properties


[*]将文件内容中的broker.id=1分别改为broker.id=2,broker.id=3
[*]将文件内容中的9091分别改为9092,9093(假如端口冲突,请重新设置)
[*]将文件内容中的kafka-node-1分别改为kafka-node-2,kafka-node-3
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.1.4 封装启动脚本

因为Kafka启动前,必须先启动ZooKeeper,并且Kafka集群中有多个节点必要启动,所以启动过程比力繁琐,这里我们将启动的指令进行封装。
(1) 在kafka-zookeeper文件夹下创建zk.cmd批处理文件
https://img-blog.csdnimg.cn/direct/fd78e207558348dab597a893b5380c6e.png#pic_center
(2) 在zk.cmd文件中添加内容
# 添加启动命令
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
(3) 在kafka-node-1,kafka-node-2,kafka-node-3文件夹下分别创建kfk.cmd批处理文件
https://img-blog.csdnimg.cn/direct/9b9fab80710e461bb93bda9377e44cdc.png#pic_center
(4) 在kfk.bat文件中添加内容
# 添加启动命令
call bin/windows/kafka-server-start.bat config/server.properties
(5) 在cluster文件夹下创建cluster.cmd批处理文件,用于启动kafka集群
https://img-blog.csdnimg.cn/direct/8242852ead674bdf92270e2003a58f38.png#pic_center
(6) 在cluster.cmd文件中添加内容
cd kafka-zookeeper
start zk.cmd
ping 127.0.0.1 -n 10 >nul
cd ../kafka-broker-1
start kfk.cmd
cd ../kafka-broker-2
start kfk.cmd
cd ../kafka-broker-3
start kfk.cmd
(7) 在cluster文件夹下创建cluster-clear.cmd批处理文件,用于清理和重置kafka数据
https://img-blog.csdnimg.cn/direct/b90ef66769dc45fb871f53efee79267c.png#pic_center
(8) 在cluster-clear.cmd文件中添加内容
cd kafka-zookeeper
rd /s /q data
cd ../kafka-broker-1
rd /s /q data
cd ../kafka-broker-2
rd /s /q data
cd ../kafka-broker-3
rd /s /q data
(9) 双击实行cluster.cmd文件,启动Kafka集群
集群启动命令后,会打开多个黑窗口,每一个窗口都是一个kafka服务,请不要关闭,一旦关闭,对应的kafka服务就停止了。假如启动过程报错,主要是因为zookeeper和kafka的同步题目,请先实行cluster-clear.cmd文件,再实行cluster.cmd文件即可。
https://img-blog.csdnimg.cn/direct/8d9f5a146a884949bbeb7a2e97ff773b.png#pic_center
https://img-blog.csdnimg.cn/direct/141b2925d191458f97831626d2dc8b10.png#pic_center
https://img-blog.csdnimg.cn/direct/48ee77743e544c8cbfa1f4d98f38e547.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.2 集群启动

2.2.1 相关概念

2.2.1.1 署理:Broker

使用Kafka前,我们都会启动Kafka服务历程,这里的Kafka服务历程我们一样平常会称之为Kafka Broker或Kafka Server。因为Kafka是分布式消息系统,所以在实际的生产环境中,是必要多个服务历程形成集群提供消息服务的。所以每一个服务节点都是一个broker,而且在Kafka集群中,为了区分不同的服务节点,每一个broker都应该有一个不重复的全局ID,称之为broker.id,这个ID可以在kafka软件的设置文件server.properties中进行设置
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker
# 集群ID
broker.id=0
咱们的Kafka集群中每一个节点都有自己的ID,整数且唯一。
主机kafka-broker1kafka-broker2kafka-broker3broker.id123 2.2.1.2 控制器:Controller

Kafka是分布式消息传输系统,所以存在多个Broker服务节点,但是它的软件架构采用的是分布式系统中比力常见的主从(Master - Slave)架构,也就是说必要从多个Broker中找到一个用于管理整个Kafka集群的Master节点,这个节点,我们就称之为Controller。它是Apache Kafka的核心组件非常重要。它的主要作用是在Apache Zookeeper的资助下管理和和谐控制整个Kafka集群。
https://img-blog.csdnimg.cn/direct/c9b875bb3afc4141b4f33562c5d4d008.png#pic_center
假如在运行过程中,Controller节点出现了故障,那么Kafka会依托于ZooKeeper软件选举其他的节点作为新的Controller,让Kafka集群实现高可用。
https://img-blog.csdnimg.cn/direct/6f0a07ffd3c24b67b4570cdd8a0a6a7a.png#pic_center
Kafka集群中Controller的基本功能:


[*]Broker管理

[*]监听 /brokers/ids节点相关的变化:

[*]Broker数目增长或减少的变化
[*]Broker对应的数据变化


[*]Topic管理

[*]新增:监听 /brokers/topics节点相关的变化
[*]修改:监听 /brokers/topics节点相关的变化
[*]删除:监听 /admin/delete_topics节点相关的变化

[*]Partation管理

[*]监听 /admin/reassign_partitions节点相关的变化
[*]监听 /isr_change_notification节点相关的变化
[*]监听 /preferred_replica_election节点相关的变化

[*]数据服务
[*]启动分区状态机和副本状态机
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
ZooKeeper工具:
https://img-blog.csdnimg.cn/direct/3cf7aa46efb94dbcb806898ec40dea46.png#pic_center
https://img-blog.csdnimg.cn/direct/b1cd7a21616e44bc9d70baf1ba0a91d2.png#pic_center
https://img-blog.csdnimg.cn/direct/f6482673163b418a8d4036fa163d0aa3.png#pic_center
https://img-blog.csdnimg.cn/direct/01b49190c3c74d9aacf9a31022361978.png#pic_center
ZooKeeper客户端
https://img-blog.csdnimg.cn/direct/e730ddb13b3b4992a9bc80fd4d5277dd.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.2.2 启动ZooKeeper

Kafka集群中含有多个服务节点,而分布式系统中经典的主从(Master - Slave)架构就要求从多个服务节点中找一个节点作为集群管理Master,Kafka集群中的这个Master,我们称之为集群控制器Controller
https://img-blog.csdnimg.cn/direct/fe2ed2c4b7c046f5b16632fb95f73a87.png#pic_center
假如此时Controller节点出现故障,它就不能再管理集群功能,那么其他的Slave节点该如何是好呢?
https://img-blog.csdnimg.cn/direct/ffce4de105b744589b78c35915be79ec.png#pic_center
假如从剩余的2个Slave节点中选一个节点出来作为新的集群控制器是不是一个不错的方案,我们将这个选择的过程称之为:选举(elect)。方案是不错,但是题目就在于选哪一个Slave节点呢?不同的软件实现类似的选举功能都会有一些选举算法,而Kafka是依赖于ZooKeeper软件实现Broker节点选举功能。
https://img-blog.csdnimg.cn/direct/3a39be636929490cb6b99b06be76ac76.png#pic_center
https://img-blog.csdnimg.cn/direct/aa5c64f18a1441a98bc22e601e7fd862.png#pic_center
https://img-blog.csdnimg.cn/direct/a4b579d5efc04b6fab6de7649abc10b9.png#pic_center
https://img-blog.csdnimg.cn/direct/68172bcedd5848ffb737509b7070a889.png#pic_center
ZooKeeper如何实现Kafka的节点选举呢?这就要说到我们用到ZooKeeper的3个功能:


[*] 一个是在ZooKeeper软件中创建节点Node,创建一个Node时,我们会设定这个节点是长期化创建,还是临时创建。所谓的长期化创建,就是Node一旦创建后会不绝存在,而临时创建,是根据当前的客户端连接创建的临时节点Node,一旦客户端连接断开,那么这个临时节点Node也会被主动删除,所以这样的节点称之为临时节点。
[*] https://img-blog.csdnimg.cn/direct/bf0d8236e6bb4256943caa73309a1cb6.png
https://img-blog.csdnimg.cn/direct/9bf3f73cb15b4c11af2480753706ae23.png
[*] ZooKeeper节点是不允许有重复的,所以多个客户端创建同一个节点,只能有一个创建乐成。
[*] 别的一个是客户端可以在ZooKeeper的节点上增长监听器,用于监听节点的状态变化,一旦监听的节点状态发生变化,那么监听器就会触发相应,实现特定监听功能。
[*] https://img-blog.csdnimg.cn/direct/1e4676fc4a94400291bb4326bb20ecb7.png
有了上面的三个知识点,我们这里就介绍一下Kafka是如何使用ZooKeeper实现Controller节点的选举的:

[*]第一次启动Kafka集群时,会同时启动多个Broker节点,每一个Broker节点就会连接ZooKeeper,并尝试创建一个临时节点 /controller
[*]因为ZooKeeper中一个节点不允许重复创建,所以多个Broker节点,终极只能有一个Broker节点可以创建乐成,那么这个创建乐成的Broker节点就会主动作为Kafka集群控制器节点,用于管理整个Kafka集群。
[*]没有选举乐成的其他Slave节点会创建Node监听器,用于监听 /controller节点的状态变化。
[*]一旦Controller节点出现故障或挂掉了,那么对应的ZooKeeper客户端连接就会停止。ZooKeeper中的 /controller 节点就会主动被删除,而其他的那些Slave节点因为增长了监听器,所以当监听到 /controller 节点被删除后,就会马上向ZooKeeper发出创建 /controller 节点的请求,一旦创建乐成,那么该Broker就变成了新的Controller节点了。
   如今我们能明白启动Kafka集群之前,为什么要先启动ZooKeeper集群了吧。就是因为ZooKeeper可以协助Kafka进行集群管理。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.2.3 启动Kafka

ZooKeeper已经启动好了,那我们如今可以启动多个Kafka Broker节点构建Kafka集群了。构建的过程中,每一个Broker节点就是一个Java历程,而在这个历程中,有许多必要提前预备好,并进行初始化的内部组件对象。
2.2.3.1初始化ZooKeeper

Kafka Broker启动时,首先会先创建ZooKeeper客户端(KafkaZkClient),用于和ZooKeeper进行交互。客户端对象创建完成后,会通过该客户端对象向ZooKeeper发送创建Node的请求,留意,这里创建的Node都是长期化Node。
https://img-blog.csdnimg.cn/direct/93c8137a2b614daab3125c7c3692d642.png#pic_center
https://img-blog.csdnimg.cn/direct/48220565ab9d4476ac82e8b6c2721539.png#pic_center
节点类型说明/admin/delete_topics长期化节点设置必要删除的topic,因为删除过程中,可能broker下线,或实行失败,那么就必要在broker重新上线后,根据当前节点继续删除操作,一旦topic所有的分区数据全部删除,那么当前节点的数据才会进行清理/brokers/ids长期化节点服务节点ID标识,只要broker启动,那么就会在当前节点中增长子节点,brokerID不能重复/brokers/topics长期化节点服务节点中的主题详细信息,包罗分区,副本/brokers/seqid长期化节点seqid主要用于主动生产brokerId/config/changes长期化节点kafka的元数据发生变化时,会向该节点下创建子节点。并写入对应信息/config/clients长期化节点客户端设置,默认为空/config/brokers长期化节点服务节点相关设置,默认为空/config/ips长期化节点IP设置,默认为空/config/topics长期化节点主题设置,默认为空/config/users长期化节点用户设置,默认为空/consumers长期化节点消耗者节点,用于记录消耗者相关信息/isr_change_notification长期化节点ISR列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,界说了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。/latest_producer_id_block长期化节点保存PID块,主要用于能够保证生产者的恣意写入请求都能够得到相应。/log_dir_event_notification长期化节点主要用于保存当broker当中某些数据路径出现非常时候,比方磁盘损坏,文件读写失败等非常时候,向ZooKeeper当中增长一个通知序号,Controller节点监听到这个节点的变化之后,就会做出对应的处理操作/cluster/id长期化节点主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号 https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
第一个Broker启动的流程
https://img-blog.csdnimg.cn/direct/75df553d46b4417290501c86d26bdddb.png#pic_center
第二个Broker启动的流程
https://img-blog.csdnimg.cn/direct/71e0f889ceaa454fa80a9951280c28db.png#pic_center
第三个Broker启动的流程
https://img-blog.csdnimg.cn/direct/64a7d3b76b064f2bba6fcbcc1bdcb77f.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
controller节点删除
https://img-blog.csdnimg.cn/direct/10b4fc7324b1436793486281a0e56ff8.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.2.3.2初始化服务

Kafka Broker中有许多的服务对象,用于实现内部管理和外部通信操作。
https://img-blog.csdnimg.cn/direct/74409225a832427cb08b6d14cf233f79.png#pic_center
https://img-blog.csdnimg.cn/direct/178110a742a347f3b46c75f2b79bb4f0.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.2.3.2.1 启动使命调度器

每一个Broker在启动时都会创建内部调度器(KafkaScheduler)并启动,用于完成节点内部的工作使命。底层就是Java中的定时使命线程池ScheduledThreadPoolExecutor
2.2.3.2.2 创建数据管理器

每一个Broker在启动时都会创建数据管理器(LogManager),用于吸取到消息后,完成后续的数据创建,查询,清理等处理。
2.2.3.2.3 创建远程数据管理器

每一个Broker在启动时都会创建远程数据管理器(RemoteLogManager),用于和其他Broker节点进行数据状态同步。
2.2.3.2.4 创建副本管理器

每一个Broker在启动时都会创建副本管理器(ReplicaManager),用于对主题的副本进行处理。
2.2.3.2.5 创建ZK元数据缓存

每一个Broker在启动时会将ZK的关于Kafka的元数据进行缓存,创建元数据对象(ZkMetadataCache)
2.2.3.2.6 创建Broker通信对象

每一个Broker在启动时会创建Broker之间的通道管理器对象(BrokerToControllerChannelManager),用于管理Broker和Controller之间的通信。
2.2.3.2.7 创建网络通信对象

每一个Broker在启动时会创建自己的网络通信对象(SocketServer),用于和其他Broker之间的进行通信,其中包含了Java用于NIO通信的Channel、Selector对象。
https://img-blog.csdnimg.cn/direct/f51fb13c0ed346e48147523e47184778.png#pic_center
2.2.3.2.8 注册Broker节点

Broker启动时,会通过ZK客户端对象向ZK注册当前的Broker 节点ID,注册后创捷的ZK节点为临时节点。假如当前Broker的ZK客户端断开和ZK的连接,注册的节点会被删除。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.2.3.3 启动控制器

控制器(KafkaController)是每一个Broker启动时都会创建的核心对象,用于和ZK之间建立连接并申请自己为整个Kafka集群的Master管理者。假如申请乐成,那么会完成管理者的初始化操作,并建立和其他Broker之间的数据通道吸取各种变乱,进行封装后交给变乱管理器,并界说了process方法,用于真正处理各类变乱。
https://img-blog.csdnimg.cn/direct/403a4ad85cfd4e95b9ec3aaf56c02bd5.png#pic_center
2.2.3.3.1 初始化通道管理器

创建通道管理器(ControllerChannelManager),该管理器维护了Controller和集群所有Broker节点之间的网络连接,并向Broker发送控制类请求及吸取相应。
2.2.3.3.2 初始化变乱管理器

创建变乱管理器(ControllerEventManager)维护了Controller和集群所有Broker节点之间的网络连接,并向Broker发送控制类请求及吸取相应。
2.2.3.3.3 初始化状态管理器

创建状态管理器(ControllerChangeHandler)可以监听 /controller 节点的操作,一旦节点创建(ControllerChange),删除(Reelect),数据发生变化(ControllerChange),那么监听后实行相应的处理。
2.2.3.3.4 启动控制器

控制器对象启动后,会向变乱管理器发送Startup变乱,变乱处理线程吸取到变乱后会通过ZK客户端向ZK申请 /controller 节点,申请乐成后,实行当前节点成为Controller的一些列操作。主要是注册各类 ZooKeeper 监听器、删除日记路径变更和 ISR 副本变更通知变乱、启动 Controller 通道管理器,以及启动副本状态机和分区状态机。
2.2.4 反正到这 我是懵啦~

   我这边阅读了其他人的博客 感谢这位大佬!!! ┭┮﹏┭┮


[*] Kafka服务实例,负责消息的长期化、中转等功能。一个独立的Kafka 服务器被就是一个broker。
[*] broker 是集群的组成部门。每个集群都有一个broker 同时充当了集群控制器Controller的脚色。
[*] kafka使用Zookeeper(ZK)进行元数据管理,保存broker注册信息,包罗主题(Topic)、分区(Partition)信息等,选择分区leader,在低版本kafka消耗者的offset信息也会保存在ZK中。
   简简单单的三句话:大抵总结了好多东西~
在每一个Broker在启动时都会向ZK注册信息,ZK会选取一个最早注册的Broker作为Controller,后面Controller会与ZK进行数据交互获取元数据(即整个Kafka集群的信息,比方有那些Broker,每个Broker中有那些Partition等信息),并负责管理工作,包罗将分区分配给broker 和监控broker,其他Broker是与Controller进行交互进而感知到整个集群的所有信息。
2.2.4.1 Broker总体工作流程

https://img-blog.csdnimg.cn/direct/5db9cb1b079049eea25e0c67ac0ee3b0.png#pic_center
流程如下:

[*]broker 启动后,向 ZK 集群进行注册
[*]各个 broker 的 controller 抢占 ZNode
[*]抢占到的 Controller 监听 brokers 节点的变化
[*]Controller 决定 Leader 的选举规则:在 isr 中存活的 broker,按照 AR 中排在前面的优先
[*]Controller 将节点信息上传到 ZK
[*]其他 controller 从 ZK 同步相关信息
[*]假设此时 broker1 中的 Leader 挂了
[*]Controller 监听到了节点变化
[*]从 ZK 中获取 ISR
[*]选举出新的 Leader
[*]向 ZK 更新 Leader 和 ISR
2.2.4.2 为什么必要Controller

   在Kafka早期版本,对于分区和副本的状态的管理依赖于zookeeper的Watcher和队列:每一个broker都会在zookeeper注册Watcher,所以zookeeper就会出现大量的Watcher, 假如宕机的broker上的partition许多比力多,会造成多个Watcher触发,造成集群内大规模调整;每一个replica都要去再次zookeeper上注册监视器,当集群规模很大的时候,zookeeper负担很重。这种设计很容易出现脑裂和羊群效应以及zookeeper集群过载。
新的版本中该变了这种设计,使用KafkaController,只有KafkaController,Leader会向zookeeper上注册Watcher,其他broker险些不消监听zookeeper的状态变化。
Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,好比partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;大概增长某个topic分区的时候也会由controller管理分区的重新分配工作。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3 创建主题

   Topic主题是Kafka中消息的逻辑分类,但是这个分类不应该是固定的,而是应该由外部的业务场景进行界说(留意:Kafka中其实是有两个固定的,用于记录消耗者偏移量和事务处理的主题),所以Kafka提供了相应的指令和客户端进行主题操作。
2.3.1 相关概念

2.3.1.1 主题:Topic

Kafka是分布式消息传输系统,采用的数据传输方式为发布,订阅模式,也就是说由消息的生产者发布消息,消耗者订阅消息后获取数据。
   为了对消耗者订阅的消息进行区分,所以对消息在逻辑上进行了分类,这个分类我们称之为主题:Topic。消息的生产者必须将消息数据发送到某一个主题,而消耗者必须从某一个主题中获取消息,并且消耗者可以同时消耗一个或多个主题的数据。Kafka集群中可以存放多个主题的消息数据。
为了防止主题的名称和监控指标的名称产生冲突,官方推荐主题的名称中不要同时包含下划线和点。
https://img-blog.csdnimg.cn/direct/c79c80c29eca48fdbad90fcf9ac9a159.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.1.2 分区:Partition

Kafka消息传输采用发布、订阅模式,所以消息生产者必须将数据发送到一个主题,假如发送给这个主题的数据非常多,那么主题所在broker节点的负载和吞吐量就会受到极大的磨练,甚至有可能因为热点题目引起broker节点故障,导致服务不可用。一个好的方案就是将一个主题从物理上分成几块,然后将不同的数据块匀称地分配到不同的broker节点上,这样就可以缓解单节点的负载题目。这个主题的分块我们称之为:分区partition。默认情况下,topic主题创建时分区数目为1,也就是一块分区,可以指定参数–partitions改变。Kafka的分区解决了单一主题topic线性扩展的题目,也解决了负载均衡的题目。
topic主题的每个分区都会用一个编号进行标记,一样平常是从0开始的连续整数数字。Partition分区是物理上的概念,也就意味着会以数据文件的方式真实存在。每个topic包含一个或多个partition,每个partition都是一个有序的队列。partition中每条消息都会分配一个有序的ID,称之为偏移量:Offset
https://img-blog.csdnimg.cn/direct/c215398495554367b119e672a09285d5.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.1.3 副本:Replication

分布式系统出现错误是比力常见的,只要保证集群内部依然存在可用的服务节点即可,当然效率会有所降低,不过只要能保证系统可用就可以了。咱们Kafka的topic也存在类似的题目,也就是说,假如一个topic分别了多个分区partition,那么这些分区就会匀称地分布在不同的broker节点上,一旦某一个broker节点出现了题目,那么在这个节点上的分区就会出现题目,那么Topic的数据就不完整了。所以一样平常情况下,为了防止出现数据丢失的情况,我们会给分区数据设定多个备份,这里的备份,我们称之为:副本Replication。
Kafka支持多副本,使得主题topic可以做到更多容错性,牺牲性能与空间去换取更高的可靠性。
https://img-blog.csdnimg.cn/direct/07f809c7e32241d88690d8cf7c05b456.png#pic_center
留意:这里不能将多个备份放置在同一个broker中,因为一旦出现故障,多个副本就都不能用了,那么副本的意义就没有了。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.1.4 副本类型:Leader & Follower

假设我们有一份文件,一样平常情况下,我们对副本的明白应该是有一个正式的完整文件,然后这个文件的备份,我们称之为副本。但是在Kafka中,不是这样的,所有的文件都称之为副本,只不过会选择其中的一个文件作为主文件,称之为:Leader(主导)副本,其他的文件作为备份文件,称之为:Follower(追随)副本。在Kafka中,这里的文件就是分区,每一个分区都可以存在1个或多个副本,只有Leader副本才气进行数据的读写,Follower副本只做备份使用。
https://img-blog.csdnimg.cn/direct/c1b3c071b980438f871e14c656f70af1.png#pic_center
https://img-blog.csdnimg.cn/direct/54e394cc64c4452b9b22ecbed09490ac.png#pic_center
但是 还是有点题目:
   还是容易出现IO热点题目:
https://img-blog.csdnimg.cn/direct/edc3d99455fc421284aff3fa1ae1ccac.png#pic_center
更合理:
https://img-blog.csdnimg.cn/direct/783d361e5b224b0ab6ec58e5c3ec96a7.png#pic_center
Kafka!没办法站在上帝的视角分配,只能尽可能的做到负载均衡
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.1.5 日记:Log

Kafka最开始的应用场景就是日记场景或MQ场景,更多的扮演着一个日记传输和存储系统,这是Kafka立家之本。所以Kafka吸取到的消息数据终极都是存储在log日记文件中的,底层存储数据的文件的扩展名就是log。
主题创建后,会创建对应的分区数据Log日记。并打开文件连接通道,随时预备写入数据。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.1(续) IDEA创建主题

package com.atguigu.kafka.test.admin;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;

public class AdminTopicTest {
    public static void main(String[] args) {
      Properties properties = new Properties();
      properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");


      // TODO 管理员对象
      Admin admin = Admin.create(properties);
      // TODO 创建主题需要三个参数
      // 主题名称
      String topicName = "test1";
      // 主题分区的数量
      int partitionCount= 1;
      // 主题分区的副本的因子(数量)
      short replicationCount = 1;
      NewTopic topic1 = new NewTopic(topicName,partitionCount,replicationCount);
      // 主题名称
      String topicName1 = "test2";
      // 主题分区的数量
      int partitionCount1= 2;
      // 主题分区的副本的因子(数量)
      short replicationCount1 = 2;
      NewTopic topic2 = new NewTopic(topicName1,partitionCount1,replicationCount1);


      // TODO 创建主题
      admin.createTopics(
                Arrays.asList(topic1,topic2)
      );
      // TODO 关闭管理者对象
      admin.close();
    }
}

https://img-blog.csdnimg.cn/direct/3bc86aecd3834f6ba233fbfd008f9610.png#pic_center
https://img-blog.csdnimg.cn/direct/9c27a6e084a4407ea9055c8c1fa18f86.png#pic_center
https://img-blog.csdnimg.cn/direct/e4162d016be54116a7d51d16b9126ba0.png#pic_center
https://img-blog.csdnimg.cn/direct/4a0ccb254e88497b83ba8bb201381773.png#pic_center
https://img-blog.csdnimg.cn/direct/1461b1e061294219bd349d8d0af80cdc.png#pic_center
   主题是逻辑上的分类,而分区才是文件夹条理上的区分
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.2 创建第一个主题

创建主题Topic的方式有许多种:命令行,工具,客户端API,主动创建。在server.properties文件中设置参数auto.create.topics.enable=true时,假如访问的主题不存在,那么Kafka就会主动创建主题,这个操作不在我们的讨论范围内。由于我们学习的重点在于学习原理和基础概念,所以这里我们选择比力基础的命令行方式即可。
我们首先创建的主题,仅仅指明主题的名称即可,其他参数临时无需设定。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.2.1 实行指令

https://img-blog.csdnimg.cn/direct/ae452a5bd8814e39b6690f9b72981e90.png#pic_center
$ cd /opt/module/kafka
$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic first-topic
2.3.2.2 ZooKeeper节点变化

指令实行后,当前Kafka会增长一个主题,因为指令中没有设置分区和副本参数,所以当前主题分区数目为默认值1,编号为0,副本为1,编号为所在broker的ID值。为了方便集群的管理,创建topic时,会同时在ZK中增长子节点,记录主题相关设置信息:


[*] /config/topics节点中会增长first-topic节点。
https://img-blog.csdnimg.cn/direct/40fd0860dfbc4fe8ae448d5a3a94a834.png#pic_center
[*] /brokers/topics节点中会增长first-topic节点以及相应的子节点。
https://img-blog.csdnimg.cn/direct/b80bed234122410bb46c4c3c92a9e653.png#pic_center
节点节点类型数据名称数据值说明/topics/first-topic长期类型removing_replicas无partitions{“0”:}分区设置topic_id随机字符串adding_replicas无version3/topics/first-topic/partitions长期类型主题分区节点,每个主题都应该设置分区,保存在该节点/topics/first-topic/partitions/0长期类型主题分区副本节点,因为当前主题只有一个分区,所以编号为0/topics/first-topic/partitions/0/state长期类型controller_epoch 7 主题分区副本状态节点leader3Leader副本所在的broker Idversion1leader_epoch0isr副本同步列表,因为当前只有一个副本,所以副本中只有一个副本编号 https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.2.3 数据存储位置

主题创建后,必要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点设置信息可以知道,当前主题的分区数目为1,副本数目为1,那么数据存储的位置就是副本所在的broker节点,从当前数据来看,数据存储在我们的第三台broker上。
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/c327e4bdce1f4a299237c5f28dfc77ed.png#pic_center
$ cd first-topic-0
$ ll
https://img-blog.csdnimg.cn/direct/5bbb832ac5674efb8b9c84016f7b9d5b.png#pic_center
路径中的00000000000000000000.log文件就是真正存储消息数据的文件,文件名称中的0表现当前文件的起始偏移量为0,index文件和timeindex文件都是数据索引文件,用于快速定位数据。只不过index文件采用偏移量的方式进行定位,而timeindex是采用时间戳的方式。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.3 创建第二个主题

接下来我们创建第二个主题,不过创建时,我们必要设定分区参数 --partitions,参数值为3,表现创建3个分区
2.3.3.1 实行指令

https://img-blog.csdnimg.cn/direct/7b15979733264b829ecd502ae810003d.png#pic_center
$ cd /opt/module/kafka
$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic second-topic --partitions 3
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.3.2 ZooKeeper节点变化

指令实行后,当前Kafka会增长一个主题,因为指令中指定了分区数目(–partitions 3),所以当前主题分区数目为3,编号为,副本为1,编号为所在broker的ID值。为了方便集群的管理,创建Topic时,会同时在ZK中增长子节点,记录主题相关设置信息:
 /config/topics节点中会增长second-topic节点。
https://img-blog.csdnimg.cn/direct/e9b5c1d6291149bd93b0dd8635d59da8.png#pic_center
 /brokers/topics节点中会增长second-topic节点以及相应的子节点。
https://img-blog.csdnimg.cn/direct/87cdc53cb9224d6cb442f87164c2f253.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.3.3 数据存储位置

主题创建后,必要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点设置信息可以知道,当前主题的分区数目为3,副本数目为1,那么数据存储的位置就是每个分区Leader副本所在的broker节点。
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/a0e1ac355cba43099c1636b99bf3acfa.png#pic_center
$ cd second-topic-0
$ ll
https://img-blog.csdnimg.cn/direct/b8f1f75ae6cb4fb69522095683b732e2.png#pic_center
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/937c299a48e34eb6a43d00386ab3df14.png#pic_center
$ cd second-topic-1
$ ll
https://img-blog.csdnimg.cn/direct/9dbfdf788af1488db2629ae7b5b7702c.png#pic_center
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/37456a5bc2914760ad6fd9474c407753.png#pic_center
$ cd second-topic-2
$ ll
https://img-blog.csdnimg.cn/direct/c9e87941b7184e2db08babf9820fa136.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.4 创建第三个主题

接下来我们创建第三个主题,不过创建时,我们必要设定副本参数 --replication-factor,参数值为3,表现每个分区创建3个副本。
2.3.4.1 实行指令

https://img-blog.csdnimg.cn/direct/73ce31914c4b4b0683f6c3b86c474196.png#pic_center
$ cd /opt/module/kafka
$ bin/kafka-topics.sh --bootstrap-server kafka-broker1:9092 --create --topic third-topic --partitions 3 --replication-factor 3
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.4.2 ZooKeeper节点变化

指令实行后,当前Kafka会增长一个主题,因为指令中指定了分区数目和副本数目(–replication-factor 3),所以当前主题分区数目为3,编号为,副本为3,编号为。为了方便集群的管理,创建Topic时,会同时在ZK中增长子节点,记录主题相关设置信息:
 /config/topics节点中会增长third-topic节点。
https://img-blog.csdnimg.cn/direct/8bcbc265911a4f6e8d0d081d4a7a7652.png#pic_center
 /brokers/topics节点中会增长third-topic节点以及相应的子节点。
https://img-blog.csdnimg.cn/direct/404c68a0f66e4e66a915ec071d847f5d.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.4.3 数据存储位置

主题创建后,必要找到一个用于存储分区数据的位置,根据上面ZooKeeper存储的节点设置信息可以知道,当前主题的分区数目为3,副本数目为3,那么数据存储的位置就是每个分区副本所在的broker节点。
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/2c27a0f599df4b6eb47b8788a7ff2ba0.png#pic_center
$ cd third-topic-2
$ ll
https://img-blog.csdnimg.cn/direct/b00c4f9112254c31b4bd94ef0fba6865.png#pic_center
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/11b981858e904c01ab2eb0d89b8472fd.png#pic_center
$ cd third-topic-0
$ ll
https://img-blog.csdnimg.cn/direct/8425bf69c27643719a3a9092e1d1214a.png#pic_center
$ cd /opt/module/kafka/datas
$ ll
https://img-blog.csdnimg.cn/direct/80f88ac206074f11a0dfb662cf5d3cf7.png#pic_center
$ cd third-topic-1
$ ll
https://img-blog.csdnimg.cn/direct/109b13e4f7d1457e93b060597a8614e3.png#pic_center
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.5 创建主题流程

Kafka中主题、分区以及副本的概念都和数据存储相关,所以是非常重要的。前面咱们演示了一下创建主题的详细操作和现象,那么接下来,我们就通过图解来相识一下Kafka是如何创建主题,并进行分区副本分配的。
2.3.5.1 命令行提交创建指令

https://img-blog.csdnimg.cn/direct/6e7c55fea9ff4316a5ca1ce2b6535078.png#pic_center


[*]通过命令行提交指令,指令中会包含操作类型(–create)、topic的名称(–topic)、主题分区数目(–partitions)、主题分区副本数目(–replication-facotr)、副本分配策略(–replica-assignment)等参数。
[*]指令会提交到客户端进行处理,客户端获取指令后,会首先对指令参数进行校验。

[*]a. 操作类型取值:create、list、alter、describe、delete,只能存在一个。
[*]b. 分区数目为大于1的整数。
[*]c. 主题是否已经存在
[*]d. 分区副本数目大于1且小于Short.MaxValue,一样平常取值小于等于Broker数目。

[*]将参数封装主题对象(NewTopic)。
[*]创建通信对象,设定请求标记(CREATE_TOPICS),查找Controller,通过通信对象向Controller发起创建主题的网络请求。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.5.2 Controller吸取创建主题请求

https://img-blog.csdnimg.cn/direct/a6df1e0f24f448bba08a944876b32ac7.png#pic_center
(1) Controller节点吸取到网络请求(Acceptor),并将请求数据封装成请求对象放置在队列(requestQueue)中。
(2) 请求控制器(KafkaRequestHandler)周期性从队列中获取请求对象(BaseRequest)。
(3) 将请求对象转发给请求处理器(KafkaApis),根据请求对象的类型调用创建主题的方法。
https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center
2.3.5.3 创建主题

https://img-blog.csdnimg.cn/direct/a0397c81563e4340a381aa281e5864df.png#pic_center
(1) 请求处理器(KafkaApis)校验主题参数。


[*]假如分区数目没有设置,那么会采用Kafka启动时加载的设置项:

[*]num.partitions(默认值为1)

[*]假如副本数目没有设置,那么会采用Kafka启动时纪录的设置项:

[*]default.replication.factor(默认值为1)

(2) 在创建主题时,假如使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;假如没有指定replica-assignment参数,那么就按照Kafka内部逻辑来分配,内部逻辑按照机架信息分为两种策略:【未指定机架信息】和【指定机架信息】。当前课程中采用的是【未指定机架信息】副本分配策略:


[*]分区起始索引设置0
[*]轮询所有分区,计算每一个分区的所有副本位置:

[*]副本起始索引 = (分区编号 + 随机值) % BrokerID列表长度。
[*]其他副本索引 = 。。。随机值(基本算法为使用随机值实行多次模运算)

##################################################################
# 假设
#   当前分区编号 : 0
#   BrokerID列表 :【1,2,3,4】
#   副本数量 : 4
#   随机值(BrokerID列表长度): 2
#   副本分配间隔随机值(BrokerID列表长度): 2
##################################################################
# 第一个副本索引:
(分区编号 + 随机值)% BrokerID列表长度 =
(0 + 2)% 4 = 2
# 第一个副本所在BrokerID : 3

# 第二个副本索引
(第一个副本索引 + (1 +(副本分配间隔 + 0)% (BrokerID列表长度 - 1))) % BrokerID列表长度 =
(2 +(1+(2+0)%3))% 4 = 1
# 第二个副本所在BrokerID:2

# 第三个副本索引:(第一个副本索引 + (1 +(副本分配间隔 + 1)% (BrokerID列表长度 - 1))) % BrokerID列表长度 = (2 +(1+(2+1)%3))% 4 = 3
# 第三个副本所在BrokerID:4

# 第四个副本索引:(第一个副本索引 + (1 +(副本分配间隔 + 2)% (BrokerID列表长度 - 1))) % BrokerID列表长度 = (2 +(1+(2+2)%3))% 4 = 0
# 第四个副本所在BrokerID:1

# 最终分区0的副本所在的Broker节点列表为【3,2,4,1】
# 其他分区采用同样算法


[*]通过索引位置获取副本节点ID
[*]保存分区以及对应的副本ID列表。
(3) 通过ZK客户端在ZK端创建节点:


[*]在 /config/topics节点下,增长当前主题节点,节点类型为长期类型。
[*]在 /brokers/topics节点下,增长当前主题及相关节点,节点类型为长期类型。
(4) Controller节点启动后,会在/brokers/topics节点增长监听器,一旦节点发生变化,会触发相应的功能:
[*]获取必要新增的主题信息
[*]更新当前Controller节点保存的主题状态数据
[*]更新分区状态机的状态为:NewPartition
[*]更新副本状态机的状态:NewReplica
[*]更新分区状态机的状态为:OnlinePartition,从正常的副本列表中的获取第一个作为分区的Leader副本,所有的副本作为分区的同步副本列表,我们称之为ISR( In-Sync Replica)。在ZK路径/brokers/topics/主题名上增长分区节点/partitions,及状态/state节点。
[*]更新副本状态机的状态:OnlineReplica
(5) Controller节点向主题的各个分区副本所属Broker节点发送LeaderAndIsrRequest请求,向所有的Broker发送UPDATE_METADATA请求,更新自身的缓存。
[*]Controller向分区所属的Broker发送请求
[*]Broker节点吸取到请求后,根据分区状态信息,设定当前的副本为Leader或Follower,并创建底层的数据存储文件目录和空的数据文件。
文件目录名:主题名 + 分区编号
文件名说明0000000000000000.log数据文件,用于存储传输的小心0000000000000000.index索引文件,用于定位数据0000000000000000.timeindex时间索引文件,用于定位数据 https://img-blog.csdnimg.cn/direct/1fcb80a8efb94eb38ecc8c1825dac4d2.png#pic_center

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程