消息队列(Message Queue),是分布式系统中紧张的组件,其通用的利用场景可以简朴地描述为:当不须要立即得到效果,但是并发量又须要举行控制的时间,差不多就是须要利用消息队列的时间。
一、消息队列
1.什么是消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简朴,比如只包含文本字符串,也可以更复杂,大概包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息利用者只管从 MQ 中取消息而不管是谁发布的。如许发布者和利用者都不用知道对方的存在。
2.消息队列的特性
(1)存储
与依赖于利用套接字的基本 TCP 和 UDP 协议的传统哀求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止。
(2)异步
与哀求和响应系统不同,消息队列通过缓冲消息可以在应用步伐中公开一定程度的异步性,允许源进程发送消息并在队列中累积消息,而目标进程则可以挑选消息举行处理。 如许,应用步伐就可以在某些故障情况下运行,例如毗连断断续续或源进程或目标进程故障。
路由:消息队列还可以提供路由功能,此中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信模式。
3.为什么须要消息队列
(1)解耦
允许你独立的扩展或修改双方的处理过程,只要确保它们遵守同样的接口束缚。
(2)冗余
消息队列把数据举行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。很多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你利用完毕。
(3)扩展性
由于消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要别的增长处理过程即可。
(4)灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然须要继承发挥作用,但是如许的突发流量并不常见。假如为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。利用消息队列可以或许使关键组件顶住突发的访问压力,而不会由于突发的超负荷的哀求而完全崩溃。
(5)可规复性
系统的一部门组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统规复后被处理。
(6)序次保证
在大多利用场景下,数据处理的序次都很紧张。大部门消息队列原来就是排序的,而且能保证数据会按照特定的序次来处理。(Kafka 保证一个 Partition 内的消息的有序性)
(7)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消耗消息的处理速度不同等的情况。
(8)异步通信
很多时间,用户不想也不须要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在须要的时间再行止理它们。
二、Kafka基础与入门
1.kafka基本概念
Kafka是一种高吞吐量的分布式发布/订阅消息系统,这是官方对kafka的定义,如许说起来,大概不太好理解,这里简朴举个例子:现在是个大数据期间,各种商业、社交、搜索、浏览都会产生大量的数据。那么怎样快速网络这些数据,怎样实时的分析这些数据,是一个必须要解决的问题,同时,这也形成了一个业务需求模子,即生产者生产(produce)各种数据,消耗者(consume)消耗(分析、处理)这些数据。那么面临这些需求,怎样高效、稳固的完成数据的生产和消耗呢?这就须要在生产者与消耗者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间怎样传递消息。
kafka是Apache构造下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满意各种需求场景:比如基于hadoop平台的数据分析、低时延的实时系统、storm/spark流式处理引擎等。kafka现在它已被多家大型公司作为多种类型的数据管道和消息系统利用。
2.kafka脚色术语
kafka的一些核心概念和脚色
- Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker。
- Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
- Producer:指消息的生产者,负责发布消息到kafka broker。
- Consumer:指消息的消耗者,从kafka broker拉取数据,并消耗这些已发布的消息。
- Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition,每个partition都是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
- Consumer Group:消耗者组,可以给每个Consumer指定消耗组,若不指定消耗者组,则属于默认的group。
- Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息。
3.kafka拓扑架构
一个典型的Kafka集群包含若干Producer,若干broker、若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,推举leader,以及在Consumer Group发生变化时举行rebalance。Producer利用push模式将消息发布到broker,Consumer利用pull模式从broker订阅并消耗消息。典型架构如下图所示:
从图中可以看出,典型的消息系统有生产者(Producer),存储系统(broker)和消耗者(Consumer)构成,Kafka作为分布式的消息系统支持多个生产者和多个消耗者,生产者可以将消息分布到集群中不同节点的不同Partition上,消耗者也可以消耗集群中多个节点上的多个Partition。在写消息时允很多个生产者写到同一个Partition中,但是读消息时一个Partition只允许被一个消耗组中的一个消耗者所消耗,而一个消耗者可以消耗多个Partition。也就是说同一个消耗组下消耗者对Partition是互斥的,而不同消耗组之间是共享的
kafka支持消息持久化存储,持久化数据保存在kafka的日记文件中,在生产者生产消息后,kafka不会直接把消息传递给消耗者,而是先要在broker中举行存储,为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的个数或尺寸、巨细达到一定阀值时,再统一写到磁盘上,如许不光进步了kafka的实行服从,也减少了磁盘IO调用次数。
kafka中每条消息写到partition中,是序次写入磁盘的,这个很紧张,由于在机械盘中假如是随机写入的话,服从将是很低的,但是假如是序次写入,那么服从却是非常高,这种序次写入磁盘机制是kafka高吞吐率的一个很紧张的保证。
4.Topic和partition
Kafka中的topic(主题)是以partition的情势存放的,每一个topic都可以设置它的partition数目,Partition的数目决定了构成topic的log的数目。保举partition的数目一定要大于同时运行的consumer的数目。别的,发起partition的数目要小于即是集群broker的数目,如许消息数据就可以匀称的分布在各个broker中
那么,Topic为什么要设置多个Partition呢,这是由于kafka是基于文件存储的,通过配置多个partition可以将消息内容分散存储到多个broker上,如许可以避免文件尺寸达到单机磁盘的上限。同时,将一个topic切分成恣意多个partitions,可以保证消息存储、消息消耗的服从,由于越多的partitions可以容纳更多的consumer,可有效提拔Kafka的吞吐率。因此,将Topic切分成多个partitions的好处是可以将大量的消息分成多批数据同时写到不同节点上,将写哀求分担负载到各个集群节点。
在存储布局上,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。partiton命名规则为topic名称+序号,第一个partiton序号从0开始,序号最大值为partitions数目减1。
在每个partition(文件夹)中有多个巨细相等的segment(段)数据文件,每个segment的巨细是相同的,但是每条消息的巨细大概不相同,因此segment<br/>数据文件中消息数目不一定相等。segment数据文件有两个部门构成,分别为index file和data file,此两个文件是逐一对应,成对出现,后缀”.index“和“.log”分别表现为segment索引文件和数据文件。
5.Producer生产机制
Producer是消息和数据的生产者,它发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。假如Partition机制设置的合理,所有消息都可以匀称分布到不同的Partition里,如许就实现了数据的负载均衡。假如一个Topic对应一个文件,那这个文件地点的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的进步了吞吐率。
6.Consumer消耗机制
Kafka发布消息通常有两种模式:队列模式(queuing)和发布/订阅模式(publish-subscribe)。在队列模式下,只有一个消耗组,而这个消耗组有多个消耗者,一条消息只能被这个消耗组中的一个消耗者所消耗;而在发布/订阅模式下,可有多个消耗组,每个消耗组只有一个消耗者,同一条消息可被多个消耗组消耗。
Kafka中的Producer和consumer采用的是push、pull的模式,即producer向 broker举行push消息,comsumer从bork举行pull消息,push和pull对于消息的生产和消耗是异步举行的。pull模式的一个好处是consumer可自主控制消耗消息的速率,同时consumer还可以本身控制消耗消息的方式是批量的从broker拉取数据照旧逐条消耗数据。
三、zookeeper概念先容
ZooKeeper是一种分布式协调技术,所谓分布式协调技术紧张是用来解决分布式情况当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的结果。脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和Slave误以为出现两个activemaster,最终使得整个集群处于混乱状态
这里起首先容下什么是分布式系统,所谓分布式系统就是在不同地区分布的多个服务器,共同构成的一个应用系统来为用户提供服务,在分布式系统中最紧张的是进程的调度,这里假设有一个分布在三个地区的服务器构成的一个应用系统,在第一台机器上挂载了一个资源,然后这三个地区分布的应用进程都要竞争这个资源,但我们又不渴望多个进程同时举行访问,这个时间就须要一个协调器(锁),来让它们有序的来访问这个资源。这个协调器就是分布式系统中经常提到的那个“锁”,例如”进程1“在利用该资源的时间,会先去得到这把锁,”进程1“得到锁以后会对该资源保持独占,此时别的进程就无法访问该资源,“进程1”;在用完该资源以后会将该锁开释掉,以便让别的进程来得到锁。由此可见,通过这个“锁”机制,就可以保证分布式系统中多个进程可以或许有序的访问该共享资源。这里把这个分布式情况下的这个“锁”叫作分布式锁。这个分布式锁就是分布式协调技术实现的核心内容。
现在,在分布式协调技术方面做得比力好的有Google的Chubby,还有Apache的ZooKeeper,它们都是分布式锁的实现者。ZooKeeper所提供锁服务在分布式范畴久经磨练,它的可靠性、可用性都是经过理论和实践验证的。
ZooKeeper是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。
1.zookeeper应用举例
(1)什么是单点故障问题呢?
所谓单点故障,就是在一个主从的分布式系统中,主节点负责使命调度分发,从节点负责使命的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。那我们的解决方法就是通过对集群master脚色的选取,来解决分布式系统单点故障的问题。
(2)传统的方式是怎么解决单点故障的?以及有哪些缺点呢?
传统的方式是采用一个备用节点,这个备用节点定期向主节点发送ping包,主节点收到ping包以后向备用节点发送回复Ack信息,当备用节点收到回复的时间就会认为当前主节点运行正常,让它继承提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继承提供服务。
这种传统解决单点故障的方法,固然在一定程度上解决了问题,但是有一个隐患,就是网络问题,大概会存在如许一种情况:主节点并没有出现故障,只是在回复ack响应的时间网络发生了故障,如许备用节点就无法收到回复,那么它就会认为主节点出现了故障,接着,备用节点将接管主节点的服务,并成为新的主节点,此时,分布式系统中就出现了两个主节点(双Master节点)的情况,双Master节点的出现,会导致分布式系统的服务发生混乱。如许的话,整个分布式系统将变得不可用。为了防止出现这种情况,就须要引入ZooKeeper来解决这种问题。
2.zookeeper的工作原理是什么?
下面通过三种情况来讲解:
(1)master启动
在分布式系统中引入Zookeeper以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是 主节点A 和 主节点B,当两个主节点都启动后,它们都会向ZooKeeper中注册节点信息。我们假设 主节点A 锁注册的节点信息是 master00001 , 主节点B 注册的节点信息是 master00002 ,注册完以后会举行推举,推举有多种算法,这里以编号最小作为推举算法,那么编号最小的节点将在推举中获胜并得到锁成为主节点,也就是 主节点A 将会得到锁成为主节点,然后 主节点B 将被壅闭成为一个备用节点。如许,通过这种方式Zookeeper就完成了对两个Master进程的调度。完成了主、备节点的分配和协作。
(2)master故障
假如 主节点A 发生了故障,这时间它在ZooKeeper所注册的节点信息会被自动删除,而ZooKeeper会自动感知节点的变化,发现 主节点A 故障后,会再次发出推举,这时间 主节点B 将在推举中获胜,替换 主节点A 成为新的主节点,如许就完成了主、被节点的重新推举。
(3)master规复
假如主节点规复了,它会再次向ZooKeeper注册自身的节点信息,只不过这时间它注册的节点信息将会酿成 master00003 ,而不是原来的信息。ZooKeeper会感知节点的变化再次发动推举,这时间 主节点B 在推举中会再次获胜继负担任 主节点 , 主节点A 会担任备用节点。
zookeeper就是通过如许的协调、调度机制如此反复的对集群举行管理和状态同步的。
3.zookeeper集群架构
zookeeper一样平常是通过集群架构来提供服务的,下图是zookeeper的基本架构图。
zookeeper集群紧张脚色有server和client,此中server又分为leader、follower和observer三个脚色,每个脚色的寄义如下:
- Leader:领导者脚色,紧张负责投票的发起和决定,以及更新系统状态。
- follower:跟随着脚色,用于接收客户端的哀求并返回效果给客户端,在推举过程中参与投票。
- observer:观察者脚色,用户接收客户端的哀求,并将写哀求转发给leader,同时同步leader状态,但是不参与投票。Observer目的是扩展系统,进步伸缩性。
- client:客户端脚色,用于向zookeeper发起哀求。
4.zookeeper的工作流程
Zookeeper修改数据的流程:Zookeeper集群中每个Server在内存中存储了一份数据,在Zookeeper启动时,将从实例中推举一个Server作为leader,Leader负责处理数据更新等操作,当且仅当大多数Server在内存中成功修改数据,才认为数据修改成功。
Zookeeper写的流程为:客户端Client起首和一个Server或者Observe通信,发起写哀求,然后Server将写哀求转发给Leader,Leader再将写哀求转发给别的Server,别的Server在接收到写哀求后写入数据并响应Leader,Leader在接收到大多数写成功回应后,认为数据写成功,最后响应Client,完成一次写操作过程。
四、Zookeeper 在 Kafka 中的作用
1.Broker注册
Broker是分布式部署而且相互之间相互独立,但是须要有一个注册系统可以或许将整个集群中的Broker管理起来,此时就利用到了Zookeeper。在Zookeeper上会有一个专门用来举行Broker服务器列表记载的节点:
/brokers/ids
每个Broker在启动时,都会到Zookeeper上举行注册,即到/brokers/ids下创建属于本身的节点,如/brokers/ids/[0...N]。
Kafka利用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须利用不同的Broker ID举行注册,创建完节点后,每个Broker就会将本身的IP地址和端口信息记载到该节点中去。此中,Broker创建的节点类型是暂时节点,一旦Broker宕机,则对应的暂时节点也会被自动删除。
2.Topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记载,如:/borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]的情势被记载,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册本身的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表现Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区举行消息存储,同样,这个分区节点也是暂时节点。
3.生产者负载均衡
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者须要将消息合理地发送到这些分布式的Broker上,那么怎样实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
(1) 四层负载均衡
根据生产者的IP地址和端口来为其确定一个相干联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简朴,每个生产者不须要同其他系统建立额外的TCP毗连,只须要和Broker维护单个TCP毗连即可。但是,其无法做到真正的负载均衡,由于实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,假如有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差别巨大,同时,生产者也无法实时感知到Broker的新增和删除。
(2) 利用Zookeeper举行负载均衡
由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,如许就可以实现动态的负载均衡机制。
4.消耗者负载均衡
与生产者雷同,Kafka中的消耗者同样须要举行负载均衡来实现多个消耗者合理地从对应的Broker服务器上接收消息,每个消耗者分组包含若干消耗者,每条消息都只会发送给分组中的一个消耗者,不同的消耗者分组消耗本身特定的Topic下面的消息,互不干扰。
5.分区 与 消耗者 的关系
消耗组 (Consumer Group)下有多个 Consumer(消耗者)。对于每个消耗者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消耗者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消耗者分配一个Consumer ID,通常采用"Hostname:UUID"情势表现。
在Kafka中,规定了每个消息分区 只能被同组的一个消耗者举行消耗,因此,须要在 Zookeeper 上记载 消息分区 与 Consumer 之间的关系,每个消耗者一旦确定了对一个消息分区的消耗权力,须要将其Consumer ID 写入到 Zookeeper 对应消息分区的暂时节点上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
此中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消耗者的Consumer ID。
6.消息消耗进度Offset 记载
在消耗者对指定消息分区举行消息消耗的过程中,须要定时地将分区消息的消耗进度Offset记载到Zookeeper上,以便在该消耗者举行重启或者其他消耗者重新接管该消息分区的消息消耗后,可以或许从之前的进度开始继承举行消息消耗。Offset在Zookeeper中由一个专门节点举行记载,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
7.消耗者注册
消耗者服务器在初始化启动时加入消耗者分组的步骤如下:
(1)注册到消耗者分组
每个消耗者服务器启动时,都会到Zookeeper的指定节点下创建一个属于本身的消耗者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消耗者就会将本身订阅的Topic信息写入该暂时节点。
(2)对 消耗者分组中的消耗者的变化注册监听
每个消耗者都须要关注所属消耗者分组中其他消耗者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消耗者新增或减少,就触发消耗者的负载均衡。
(3)对Broker服务器变化注册监听
消耗者须要对/broker/ids/[0-N]中的节点举行监听,假如发现Broker服务器列表发生变化,那么就根据详细情况来决定是否须要举行消耗者负载均衡。
(4)举行消耗者负载均衡
为了让同一个Topic下不同分区的消息只管均衡地被多个 消耗者 消耗而举行 消耗者 与 消息 分区分配的过程,通常,对于一个消耗者分组,假如组内的消耗者服务器发生变更或Broker服务器发生变更,会发出消耗者负载均衡。
五、单节点部署kafka
主机:
kafka1:192.168.10.101
1.安装zookeeper
- [root@kafka1 ~]# yum -y install java
- [root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
- [root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
- [root@kafka1 ~]# cd /etc/zookeeper/conf
- [root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg
- [root@kafka1 ~]# vim zoo.cfg
- dataDir=/etc/zookeeper/zookeeper-data
- [root@kafka1 ~]# cd /etc/zookeeper/
- [root@kafka1 kafka]# mkdir /etc/zookeeper/zookeeper-data/
- [root@kafka1 zookeeper]# ./bin/zkServer.sh start
- [root@kafka1 zookeeper]# ./bin/zkServer.sh status
复制代码 2.安装kafka
- [root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
- [root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
- [root@kafka1 ~]# cd /etc/kafka/
- [root@kafka1 kafka]# vim config/server.properties
- log.dirs=/etc/kafka/kafka-logs #60行
- [root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
- [root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties &
- 检查两个端口的开启状态
- [root@kafka1 kafka]# netstat -anpt | grep 2181
- [root@kafka1 kafka]# netstat -anpt | grep 9092
- 注意:启动时先启动zookeeper,关闭时先关闭kafka
- 如果要关闭zookeeper
- [root@kafka1 zookeeper]# ./bin/zkServer.sh start
- 如果要关闭kafka
- [root@192 kafka]# bin/kafka-server-stop.sh
- 如果关不了,就kill杀死该进程
复制代码 3.测试
创建topic
- bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
复制代码 列出topic
- bin/kafka-topics.sh --list --zookeeper kafka1:2181
复制代码 查看topic
- bin/kafka-topics.sh --describe --zookeeper kafka1:2181 --topic test
复制代码 生产消息
- bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test
复制代码 消耗消息
- bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test
复制代码 删除topic
- bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test
复制代码 六、群集部署kafka
主机
kafka1:192.168.10.101
kafka2:192.168.10.102
kafka3:192.168.10.103
1.zookeeper的部署
(1)安装zookeeper(三个节点的配置相同)
- [root@kafka1 ~]# yum -y install java
- [root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
- [root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper
复制代码 (2)创建数据保存目次(三个节点的配置相同)
- [root@kafka1 ~]# cd /etc/zookeeper/
- [root@kafka1 zookeeper]# mkdir zookeeper-data
复制代码 (3)修改配置文件(三个节点的配置相同)
- [root@kafka1 zookeeper]# cd /etc/zookeeper/conf
- [root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg
- [root@kafka1 ~]# vim zoo.cfg
- dataDir=/etc/zookeeper/zookeeper-data
- clientPort=2181
- server.1=192.168.10.114:2888:3888
- server.2=192.168.10.115:2888:3888
- server.3=192.168.10.116:2888:3888
复制代码 解释:zookeeper只用的端口
2181:对cline端提供服务
3888:推举leader利用
2888:集群内机器通讯利用(Leader监听此端口)
(4)创建节点id文件(按server编号设置这个id,三个机器不同)
- 节点1:
- [root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid
- 节点2:
- [root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid
- 节点3:
- [root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid
复制代码 (5)三个节点启动zookeeper进程
- [root@kafka1 conf]# cd /etc/zookeeper/
- [root@kafka1 zookeeper]# ./bin/zkServer.sh start
- [root@kafka1 zookeeper]# ./bin/zkServer.sh status
复制代码 2.kafka的部署
(1)kafka的安装(三个节点的配置相同)
- [root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
- [root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
复制代码 (2)修改配置文件
- [root@kafka1 ~]# cd /etc/kafka/
- [root@kafka2 kafka]# vim config/server.properties
- broker.id=1 ##21行 修改,注意其他两个的id分别是2和3
- listeners=PLAINTEXT://192.168.10.114:9092 #31行 修改,其他节点改成各自的IP地址
- log.dirs=/etc/kafka/kafka-logs ## 60行 修改
- num.partitions=1 ##65行 分片数量,不能超过节点数
- zookeeper.connect=192.168.10.114:2181,192.168.10.115:2181,192.168.10.116:2181
复制代码 解释:
9092是kafka的监听端口
(3)创建日记目次(三个节点的配置相同)
- [root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
复制代码 (4)在所有kafka节点上实行开启下令,天生kafka群集(三个节点的配置相同)
- [root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties &
- 如果启动不了,可以将/etc/kafka/kafka-logs中的数据清除再试试
复制代码 3.测试
创建topic(恣意一个节点)
- bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
复制代码 列出topic(恣意一个节点)
- bin/kafka-topics.sh --list --zookeeper kafka1:2181
- bin/kafka-topics.sh --list --zookeeper kafka2:2181
- bin/kafka-topics.sh --list --zookeeper kafka3:2181
复制代码 生产消息
- bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test
复制代码 消耗消息
- bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test
复制代码 删除topic
- bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |