商道如狼道 发表于 2024-7-18 08:06:45

zookeeper + kafka消息队列

zookeeper + kafka 消息队列

一、消息队列简介

1、什么是消息队列

消息队列(Message Queue)是一种用于跨进程或分布式系统中通报消息的通信机制。消息队列在异步通信、系统解耦、负载均衡和容错方面具有重要作用。
(1)特性


[*] 异步通信:发送方将消息发送到队列中后,不需要等候接收方处理完毕即刻返回继续执行。接收方可以在需要时从队列中读取并处理消息。
[*] 解耦:消息队列在发送方和接收方之间充当中介,允许它们独立运行。这样,纵然其中一个部分暂时不可用,系统的团体功能依然可以保持正常。
[*] 负载均衡:通过消息队列,多个消费者可以分担消息处理的工作量,进步系统的吞吐量。
[*] 可靠性:消息队列可以确保消息在通报过程中不丢失,纵然在系统出现故障时也能包管消息被妥善处理。
[*] 扩展性:可以根据需要增长消息生产者或消费者,从而轻松扩展系统。
[*] 缓冲和流量控制:在高并发的场景下,系统的差别部分可能无法以相同的速率处理请求。消息队列可以作为缓冲区,调节生产者和消费者之间的速率差异,防止系统过载或瓦解。
(2)用途


[*]使命队列:在需要异步处理使命的情况下,消息队列非常有用。例如,Web 应用在接收到用户请求后将耗时操作(如视频处理、文件转换)放入队列中,立刻返回相应,后续处来由工作线程完成。
[*]事故通知:在事故驱动系统中,组件之间可以通过消息队列进行通信和通知。例如,当用户在网站上完成购买操作后,消息队列可以通知库存管理系统更新库存。
[*]日志收集:可以将系统中产生的日志消息通过消息队列发送到会集式日志处理系统中,以便进行及时分析和监控。
[*]数据流处理:用于处理数据流的系统,如及时分析平台,可以通过消息队列来通报和处理数据。
2、常见的消息队列软件

(1)RabbitMQ

特点:


[*]基于AMQP(Advanced Message Queuing Protocol)协议,具有高度灵活性。
[*]支持多种消息通报协议,包括 STOMP、MQTT 和 HTTP。
[*]提供可靠的消息通报机制,包括消息确认、长期化、发布确认等。
[*]支持复杂的路由机制,如使用交换机(Exchanges)和绑定(Bindings)来实现消息路由。
[*]易于管理和监控,提供了丰富的管理工具和插件。
用途:


[*]企业级应用步伐集成。
[*]使命调理系统。
[*]及时消息通知系统。
优点:


[*]功能强大,支持多种协议和复杂的消息路由。
[*]稳固性高,适用于企业级应用。
[*]良好的文档和社区支持。
缺点:


[*]对资源要求较高,配置和维护相对复杂。
[*]吞吐量相对较低,不适合高吞吐量的及时数据流处理。
(2)Apache Kafka

特点:


[*]分布式流处理平台,提供高吞吐量、低延迟的消息通报。
[*]支持长期化,所有消息都被长期化到磁盘上,可以通过配置保留时间。
[*]基于发布-订阅模型,消费者可以独立地读取和处理消息。
[*]支持分区(Partition)和副本(Replica),包管高可用性和容错性。
[*]强大的扩展性,能够轻松扩展以处理大规模数据。
用途:


[*]大数据及时处理和分析。
[*]日志收集和处理。
[*]流式数据处理(如物联网数据、金融交易数据)。
优点:


[*]高吞吐量,适合处理大量及时数据。
[*]低延迟,消息通报和处理速率快。
[*]可扩展性强,适合大规模分布式系统。
缺点:


[*]学习曲线较陡,配置和管理相对复杂。
[*]消费者需要处理消息的次序性和幂等性,增长了应用步伐的复杂性。
(3)ActiveMQ

特点:


[*]基于JMS(Java Message Service)规范的开源消息中央件。
[*]支持多种消息通报协议,包括 OpenWire、STOMP、MQTT、AMQP、REST、WebSocket 等。
[*]提供丰富的特性,如消息长期化、事务、消息优先级等。
[*]易于嵌入 Java 应用步伐,支持多语言客户端(如 C++、.NET、Python、Perl、PHP)。
[*]提供了管理控制台和监控工具,便于管理和监控消息系统。
用途:


[*]企业应用集成。
[*]使命调理和主动化流程。
[*]消息驱动的微服务架构。
优点:


[*]功能全面,支持多种协议和语言。
[*]与 Java 应用步伐集成良好。
[*]社区活泼,文档丰富。
缺点:


[*]性能相对较低,不适合高吞吐量的场景。
[*]在处理大规模分布式系统时,扩展性有限。
(4)Amazon SQS

特点:


[*]AWS 提供的完全托管的消息队列服务,无需管理服务器和底子设施。
[*]提供标准队列和 FIFO(先入先出)队列,满足差别的消息通报需求。
[*]支持主动扩展,能够处理恣意数目的消息。
[*]与 AWS 生态系统无缝集成,易于与其他 AWS 服务(如 Lambda、S3、SNS)集成。
[*]提供消息可见性超时、消息延迟、消息批处理等功能。
用途:


[*]分布式系统中的异步使命处理。
[*]微服务架构中的消息通报。
[*]日志和事故驱动的工作流。
优点:


[*]托管服务,无需运维,简朴易用。
[*]高可用性和可靠性,由 AWS 提供保障。
[*]可与 AWS 生态系统中的其他服务轻松集成。
缺点:


[*]成本可能较高,特别是在高消息吞吐量的场景中。
[*]对于非常高性能和定制化需求的场景,灵活性可能不足。
二、kafka

Apache Kafka 是一个分布式流处理平台,最初由LinkedIn开发,并于2011年成为Apache开源项目。Kafka重要用于构建及时数据管道和流处理应用步伐,它具有高吞吐量、低延迟、可扩展性和容错性。
1、kafka 架构

Kafka 的架构重要包括以下几个焦点组件:
1. Producer(生产者)

生产者是负责发布消息到 Kafka 主题的客户端。生产者可以选择将消息发送到特定的分区,也可以通过键进行分区路由。
2. Consumer(消费者)

消费者是负责从 Kafka 主题中读取消息的客户端。消费者可以订阅一个或多个主题,并以流的方式处理数据。消费者通常会组成一个消费者组(Consumer Group),每个消费者组可以同时读取和处理消息,实现负载均衡。
3. Broker(代理)

Kafka 集群中的每个服务器称为一个代理(Broker)。每个代理负责接收、存储和转发消息。一个 Kafka 集群通常由多个代理组成,以实现高可用性和容错性。
4. Topic(主题)



[*]主题是 Kafka 中数据的分类容器。每个主题可以有多个生产者和消费者。主题可以分为多个分区,每个分区内的消息是有序的,但跨分区的消息次序不包管。
[*]在 Kafka 中,一个代理(Broker)可以承载多个主题(Topic)。具体有多少个主题取决于 Kafka 集群的配置和使用场景。
(1)分区内的次序性



[*]在一个分区内,消息是严格有序的,这意味着消息的生产次序与消费次序同等。消费者在消费消息时,按照偏移量次序读取消息。
(2)跨分区的无序性



[*]固然分区内的消息是有序的,但差别分区之间的消息次序不包管。跨分区的消息无序性是由于分区是独立的并行处理单元,生产者可以同时向多个分区发送消息,导致团体次序无法包管。
(3)特性



[*] 多生产者和多消费者:一个主题可以有多个生产者发布消息,也可以有多个消费者订阅和消费消息。这种特性使得 Kafka 可以轻松实现多对多的数据通报。
[*] 逻辑分组:主题用于将相同类别的数据进行逻辑分组,便于管理和处理。例如,可以创建一个主题来存储网站的访问日志,另一个主题来存储订单信息。
5. Partition(分区)

分区是 Kafka 中并行处理的基本单元。每个分区在磁盘上是一个日志文件,消息以追加的方式写入。分区提供了高吞吐量和并行处理本领。
(1)特性


[*]次序性:在一个分区内,消息是严格有序的。每条消息都有一个唯一的偏移量(Offset),表现消息在分区中的位置。消费者在消费消息时,会按照偏移量次序逐条读取消息。
[*]并行处理:主题的分区允许 Kafka 在多个服务器上并行处理消息。生产者可以将消息分布到差别的分区,消费者也可以从多个分区并行读取消息,从而实现负载均衡和高吞吐量。
[*]副本机制:每个分区可以有多个副本(Replica),其中一个为向导者(Leader),其余为跟随者(Follower)。所有的读写操作都由向导者处理,跟随者从向导者复制数据。副本机制包管了数据的高可用性和容错性。
6. Zookeeper

Zookeeper 负责存储集群的元数据,如代理信息、分区状态等。它还负责推选 Kafka 集群的控制器,管理分区的副本分配和故障恢复。
2、partition的路由规则与默认计谋

(1) Partition 的数据路由规则



[*] 数据路由规则决定了生产者如何将消息发送到 Kafka Topic 的各个 Partition。Kafka 提供了几种常见的 Partitioning 计谋,具体如下:
[*] Round-robin(轮询):这是最简朴的计谋,生产者轮流将消息发送到差别的 Partition。假如所有 Partition 都有相似的负载和数据量,这种计谋可以实现基本的负载均衡。但是,它不能包管消息的相干性或有序性。
[*] Hash-based(基于哈希):生产者使用消息的某个属性(如 key)盘算哈希值,然后根据哈希值将消息路由到对应的 Partition。这种计谋可以包管具有相同 key 的消息总是被发送到同一个 Partition,从而包管了这些消息的次序性。
[*] Custom Partitioner(自界说分区器):开发者可以根据自己的需求实现自界说的 Partitioner 接口,来控制消息的分区逻辑。这种方式灵活性最高,可以根据业务需求界说非常复杂的分区逻辑。
(2) 默认的 Partitioning 计谋



[*] 在 Kafka 中,默认的 Partitioning 计谋是基于消息的 key 进行哈希分区。具体步调如下:
[*] 假如消息有 key,则使用 key 进行哈希盘算,然后将哈希值与 Topic 的 Partition 数取模,以确定消息发送到哪个 Partition。
[*] 假如消息没有 key,则使用轮询计谋,即将消息依次发送到每个 Partition,实现简朴的负载均衡。
3、消费者组具体先容

(1)先容


[*] 消费者组成员:

[*]消费者组由多个消费者实例(Consumer Instance)组成。
[*]每个消费者实例通常运行在差别的进程大概差别的盘算机上。

[*] 主题分区分配:

[*]消费者组的每个实例会订阅一个或多个主题。
[*]每个主题被分为多个分区(Partitions),每个分区只能由消费者组中的一个实例进行消费。
[*]Kafka 通过分区的分配计谋将分区均匀分配给消费者组中的各个实例,确保每个分区只被一个消费者实例消费,但一个消费者实例可以消费多个分区。

[*] 消费者组协调器:

[*]Kafka 集群中有一个特殊的 Broker 作为消费者组的协调器(Consumer Group Coordinator)。
[*]每个消费者组有一个唯一的消费者组 ID,由协调器管理和分配。

(2)消费者组的工作机制


[*] 消费者协调与分配:

[*]当消费者组中的一个新消费者实例加入或离开时,大概分区发生重新分配时,消费者组的协调器负责重新分配分区给消费者实例。
[*]分区的分配基于消费者实例的订阅和分配计谋(Partition Assignment Strategy),如范围分配、轮询分配等。

[*] 消息处理并行性:

[*]每个分区内的消息是有序的,而差别分区之间的消息次序不包管。
[*]消费者组中的每个消费者实例在处理分区内的消息时是单线程的,但是多个实例可以并行处理差别分区的消息,从而进步团体的消费吞吐量。

[*] 消费者偏移提交:

[*]消费者组中的每个消费者实例会周期性地提交消费偏移(Offset)到 Kafka 集群,以记录其对消息的消费进度。
[*]Kafka 提供了主动和手动两种提交偏移的方式,确保消息处理的可靠性和同等性。

(3)消费者组的应用场景



[*] 消费者组在 Kafka 中有多种应用场景,包括但不限于:
[*] 并行处理:多个消费者实例并行处理同一主题的消息,进步消费吞吐量和效率。
[*] 水平扩展:通过增长消费者实例,可以水平扩展消费者组的处理本领,顺应大规模数据流的需求。
[*] 容错和高可用性:当一个消费者实例故障或下线时,协调器会重新分配其负责的分区给其他实例,确保消息的一连性和可用性。
4、偏移量具体先容

(1)偏移量概念



[*] 偏移量是一个64位的整数,用来唯一标识消费者在一个特定分区中已经消费过的消息位置。每个消费者都会为每个分区维护一个偏移量。偏移量的作用包括:
[*] 消费位置的记录:偏移量表现消费者已经处理并成功提交的消息位置。消费者会定期地更新偏移量,以记录自己的消费进度。
[*] 消息处理的次序性:Kafka 包管每个分区内的消息次序,消费者通过记录偏移量来确保消息的有序消费,避免重复消费或消息丢失。
[*] 消费者的恢复:假如消费者实例停止或重启,它可以利用存储的偏移量来恢复消费位置,从上次离开的地方继续消费,而不会丢失消息。
(2)偏移量的管理方式

在 Kafka 中,偏移量的管理可以通过以下几种方式实现:


[*]主动提交偏移量:

[*]消费者可以选择开启主动提交偏移量的功能,Kafka 将定期(根据配置的间隔)主动将消费者的偏移量提交到 Kafka 集群中。
[*]这种方式简朴方便,但可能会因为提交频率不合适而导致消息的重复消费或丢失。

[*]手动提交偏移量:

[*]消费者可以显式地在适当的时候手动提交当前的偏移量。
[*]手动提交偏移量可以精确地控制偏移量的提交时机,避免主动提交可能出现的题目,如重复消费或消息丢失。

[*]偏移量存储:

[*]消费者通常会将偏移量存储在外部系统中,如数据库或文件系统。这样做的好处是,纵然消费者实例重新启动或扩展,也能够方便地恢复之前的消费进度。

(3)偏移量的生命周期

偏移量的生命周期包括:


[*]消费者组内偏移量管理:

[*]每个消费者组内的每个消费者实例会独立地管理自己负责的分区的偏移量。
[*]每次消费者处理完消息后,会更新并提交偏移量。这个过程可以是主动的(由 Kafka 管理)大概手动的(由应用步伐控制)。

[*]偏移量的保留和删除:

[*]Kafka 默认会保留消费者组的偏移量信息一段时间,以便在消费者重新加入或恢复时使用。
[*]可以通过 Kafka 配置来调解偏移量的保留时间(retention period)或存储方式。

[*]偏移量的提交:

[*]偏移量提交到 Kafka 集群后,会被长期化存储,确保在 Kafka 集群故障或消费者重启时,偏移量信息不会丢失。

5、kafka中的副本同步

在 Apache Kafka 中,副本(Replica)的同步是通过一种基于日志的复制机制来实现的,具体过程如下:
(1) 副本同步基本原理



[*]向导者和跟随者:

[*]每个分区(Partition)都有一个向导者(Leader)和若干个跟随者(Follower)。
[*]向导者负责处理所有的读写请求,并维护分区的写入次序。
[*]跟随者从向导者复制数据,并保持与向导者的数据同步。

(2) 副本同步过程

副本同步的过程重要分为两个阶段:起首是向导者将数据写入本地日志(Leader Log),然后跟随者从向导者的日志中复制数据。
(2.1) 向导者写入本地日志(Leader Log)


[*] 生产者发送消息:

[*]当生产者向 Kafka 发送消息时,消息起首写入分区的向导者副本(Leader Replica)的本地日志中。

[*] 消息确认:

[*] 消息发送确认范例:

[*] Kafka 提供了三种消息发送确认范例,分别是:

[*]acks=0:生产者发送消息后,不等候服务端的任何确认,消息被以为已发送成功。
[*]acks=1:生产者发送消息后,等候服务端的向导者副本确认接收到消息后,即以为消息发送成功。
[*]acks=all(或 acks=-1):生产者发送消息后,等候服务端的所有 ISR(同步副本)确认接收到消息后,即以为消息发送成功。


[*] 消息长期性包管:

[*] 根据差别的确认范例,Kafka 提供了差别级别的消息长期性包管:

[*]acks=0:消息可能会丢失,适用于对数据及时性要求高,但可以容忍少量消息丢失的场景。
[*]acks=1:消息被确认写入向导者副本后即以为成功,数据不会丢失,但可能会存在一定水平的重复发送。
[*]acks=all:消息被所有 ISR 中的副本确认写入后才以为成功,提供了最高级别的数据长期性包管,但延迟较高。


[*] 生产者确认机制:

[*]当生产者发送消息后,可以通过设置 acks 参数来选择确认级别。生产者可以通过配置来均衡消息通报的可靠性和延迟。

[*] 消息确认机制的工作流程

[*]生产者发送消息到 Kafka 集群的指定主题和分区。
[*]根据生产者配置的 acks 参数,生产者可能会等候向导者副本或所有 ISR 中的副本确认消息的接收。
[*]假如确认级别设为 acks=all,生产者会等候所有 ISR 中的副本都确认接收消息,然后才会收到确认。
[*]假如确认级别设为 acks=1,生产者会等候向导者副本确认接收消息,并且不会等候其他 ISR 中的副本确认。


(2.2) 跟随者从向导者复制数据


[*] 跟随者复制数据:

[*]跟随者定期从向导者获取数据块(batch)并复制到本地的日志中。
[*]跟随者使用 Kafka 协议从向导者拉取数据,确保向导者和跟随者之间的数据同等性。

[*] 同步方式:

[*] Kafka 支持两种范例的复制同步方式:
[*] 同步复制

[*]在 Kafka 中,同步复制是指跟随者副本在接收到数据后必须向向导者发送确认,向导者才会继续处理新的写入请求。
[*]这种方式确保了每条消息在所有的 ISR 中的副本都得到了写入确认,包管了数据的可靠性和同等性,但会增长延迟。

[*] 异步复制

[*]异步复制是指跟随者副本在接收到数据后不会向向导者发送确认,向导者可以立刻继续处理新的写入请求。
[*]跟随者会在后台异步地复制数据,这样可以降低写入操作的延迟,但可能会造成一段时间内向导者和跟随者之间的数据差别等。


(3) 同步机制细节



[*] 数据批次:

[*]Kafka 使用数据批次(batches)来淘汰网络开销和进步效率。向导者会将多个消息组合成一个批次,然后一次性发送给跟随者。

[*] 保序性:

[*]Kafka 包管同一分区的消息在所有的副本之间的次序同等性。纵然跨越多个副本,消息也会按照写入的次序进行复制和提交。

[*] ISR 机制:

[*]ISR 是指与向导者保持同步的副本集合。只有在 ISR 中的副本才能到场到消息的读写操作中,确保了高可用性和同等性。

(4) 故障处理



[*] 向导者故障:

[*]假如向导者宕机,Kafka 会从 ISR 中选择一个新的向导者。
[*]新的向导者会继续从上一个向导者复制未提交的数据,并负责后续的写入操作。

[*] 跟随者故障:

[*]假如跟随者宕机,向导者会继续向其他跟随者发送数据,直到该跟随者重新加入 ISR 并恢复复制。

6、ISR 的界说和作用

(1)ISR 的界说:



[*]ISR 是指与分区的向导者副本保持同步的一组副本。向导者会将消息写入 ISR 中的所有副本,确保数据的同等性和可用性。
[*]ISR 中的副本在向导者视角下是“同步的”,即它们已经接收并复制了向导者的所有写入操作。
(2)ISR 的作用:



[*]确保高可用性:当向导者失效时,Kafka 可以从 ISR 中选择一个新的向导者,而无需等候其他副本(不在 ISR 中的副本)复制完备份数据。
[*]进步性能:只有 ISR 中的副本才到场到读写请求的处理中,这样可以淘汰网络延迟和进步性能。
7、kafka文件存储机制概述

(1)日志分段(Log Segments)



[*]概念:

[*]Kafka 将每个主题的每个分区的消息以日志分段(Log Segments)的情势进行长期化存储。每个分区都有一个或多个日志分段,每个日志分段对应一个日志段文件。这种存储模型包括日志文件(Log Segments)和索引文件(Index Files),其存储在同一个文件夹下的差别文件中。

[*]日志分段文件:

[*]每个日志分段文件都是一个独立的文件,用于存储一定时间范围内或达到一定大小的消息数据。
[*]Kafka 使用预先配置的参数(例如 log.segment.bytes 控制默认大小,默以为 1GB)来确定何时创建新的日志分段文件。

[*]日志分段文件的特点:

[*]每个日志分段文件都有一个唯一的起始偏移量(Offset)和一个范围,记录了该段文件内包含的消息的偏移量范围。
[*]新消息会追加到当前活泼的日志分段文件中,当文件大小达到预设阈值时,Kafka 将关闭该文件并创建一个新的日志分段文件。

(2) 索引文件(Index Files)



[*]概念:

[*]为了快速查找消息,Kafka 使用索引文件(Index Files)来存储消息偏移量和物理位置之间的映射关系。
[*]索引文件按照预设大小(默以为 4KB)分别成索引条目,每个条目存储一段消息的偏移量和物理位置。

[*]索引文件的作用:

[*]当消费者需要读取消息时,它可以通过索引文件快速定位到对应消息的物理位置,而无需扫描整个日志文件。
[*]索引文件大大进步了消息的读取效率,特别是在大型分区和高吞吐量的情况下。

(3) 清理计谋(Retention Policy)



[*]概念

[*]Kafka 支持基于时间和大小的消息保留计谋,通过配置参数来控制消息在日志分段中的保留时长和大小。
[*]重要参数包括:

[*]log.retention.hours:指定消息在分段中保留的时间,默以为 7 天。
[*]log.retention.bytes:指定分段文件的最大大小,默以为 -1(无限制)。


[*]清理过程

[*]根据配置的保留计谋,Kafka 定期扫描日志分段文件,并删除过期或超出大小限制的分段文件。
[*]清理计谋确保了存储空间的有用利用,同时包管了数据的有用性和可靠性。

(4) 日志压缩(Log Compaction)



[*]概念:

[*]对于一些需要保留最新状态的数据,Kafka 提供了日志压缩(Log Compaction)功能。
[*]日志压缩会定期查抄主题的消息,保留每个键的最新消息,而删除过时的或重复的消息,从而节流存储空间。

[*]应用场景:

[*]日志压缩特别适合用于存储键值对的场景,如状态存储或事故日志,确保最新状态的数据不会因为汗青数据的堆积而被覆盖。

(5)具体配置



[*] 默认情况下,Kafka 的日志文件和索引文件存储在一个或多个目次中,这些目次由 log.dirs 参数指定。
[*] log.dirs 参数可以配置为一个或多个目次路径,多个路径之间用逗号分隔。这样做的目标是为了提供数据的冗余备份和进步性能。
[*] 当 Kafka 创建新的日志分段文件或索引文件时,它会依次选择配置的目次路径之一来存储文件。
[*] 配置示例
在 Kafka 的配置文件(通常是 server.properties)中,可以配置 log.dirs 参数,例如:
log.dirs=/path/to/kafka/logs
假如需要配置多个存储路径,可以用逗号分隔:
log.dirs=/path/to/kafka/logs
1,/path/to/kafka/logs2 8、LEO(Log End Offset)和 HW(High Watermark)

(1)概念



[*]LEO(Log End Offset):指的是每个副本(包括 leader 和 follower)当前最大的消息偏移量(offset)。即,当前副本中最新消息的 offset。
[*]HW(High Watermark):指的是消费者能够见到的最大的 offset,也就是所有副本中最小的 LEO。HW 之前的所有消息被以为是已经提交和可靠的。
(2)处理机制

1. Follower 故障处理



[*]故障情况

[*]假如一个 follower 发生故障,它会暂时被踢出 ISR(In-Sync Replicas,同步副本集合)。
[*]当 follower 恢复后,它会从本地磁盘读取上次的 HW,即从已经被确认的最高偏移量开始。
[*]follower 将本地 log 文件中高于 HW 的部分截掉,然后从 HW 开始向 leader 进行数据同步。
[*]一旦 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader,就可以重新加入 ISR,到场到消息的复制和同步中。

2. Leader 故障处理



[*]故障情况

[*]假如 leader 发生故障,Kafka 会从 ISR 中选出一个新的 leader。
[*]新的 leader 被选出后,其余的 follower 会进行以下操作:

[*]起首,它们会将自己本地 log 文件中高于 HW 的部分截掉,保留 HW 之前的数据。
[*]然后,它们开始重新的 leader 处获取数据进行同步,确保数据的同等性和可靠性。


9、kafka常用命令

(1)Kafka 根目次结构

Kafka 的安装目次结构通常如下所示:
kafka/
├── bin/             # 包含所有 Kafka 命令行工具的目录
├── config/          # 存放 Kafka 配置文件的目录
├── libs/            # 存放 Kafka 所需的库文件
├── logs/            # 存放 Kafka 日志文件的目录
└── ...
(2) 常用 Kafka 命令及使用示例

(2.1)创建和管理主题(Topics)



[*] 创建主题:
./bin/kafka-topics.sh --create --topic <topic_name> --partitions <num_partitions> --replication-factor <replication_factor> --bootstrap-server <broker_list>

--replication-factor <replication_factor>:指定每个分区的副本数。
--bootstrap-server <broker_list>:指定连接的 Kafka Broker 列表。


[*] 示例:
./bin/kafka-topics.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

[*] 说明:在 Kafka 安装目次下的 bin/ 目次中执行命令。

[*] 查看主题列表:
./bin/kafka-topics.sh --list --bootstrap-server <broker_list>


[*]示例:./bin/kafka-topics.sh --list --bootstrap-server localhost:9092


[*] 查看主题具体信息:
./bin/kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_list>


[*]示例:./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092



(2.2)生产者和消费者操作



[*] 生产者发送消息:
./bin/kafka-console-producer.sh --topic <topic_name> --bootstrap-server <broker_list>


[*] 示例:
./bin/kafka-console-producer.sh --topic myTopic --bootstrap-server localhost:9092


[*] 消费者消费消息:
./bin/kafka-console-consumer.sh --topic <topic_name> --from-beginning --bootstrap-server <broker_list>


[*] 示例:
./bin/kafka-console-consumer.sh --topic myTopic --from-beginning --bootstrap-server localhost:9092


(2.3 )消费者组管理



[*] 查看消费者组列表:
./bin/kafka-consumer-groups.sh --list --bootstrap-server <broker_list>


[*] 示例:
./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092


[*] 查看消费者组消费情况:
./bin/kafka-consumer-groups.sh --describe --group <group_name> --bootstrap-server <broker_list>


[*] 示例:
./bin/kafka-consumer-groups.sh --describe --group myConsumerGroup --bootstrap-server localhost:9092


(2.4) 其他管理和调试命令



[*] 查看集群信息:
./bin/kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_list>


[*] 示例:
./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092



[*] 查看 Broker 日志:
tail -f logs/server.log



[*] 示例:在 Kafka 根目次下执行,查看当前 Kafka Broker 的日志。
tail -f logs/server.log



三、zookeeper



[*]ZooKeeper(动物园管理员)是一个开源的分布式协调服务,旨在提供一个高度可靠的协作底子,用于分布式应用步伐中的数据管理、配置管理、定名服务等使命。
1、zookeeper的工作机制

ZooKeeper 的工作机制重要围绕其计划的分布式同等性服务和协作底子展开,它通过一些关键的计划和算法来实现高可靠性和可扩展性。以下是 ZooKeeper 的工作机制的具体解释。
(1)集群模式



[*] ZooKeeper 集群:ZooKeeper 以集群的方式运行,每个集群节点称为 ZooKeeper 服务器。集群中的节点数目通常为奇数,例如 3、5、7 等,以实现多数派推选和容错性。
[*] 客户端毗连:客户端通过毗连到恣意一个 ZooKeeper 服务器来与整个集群进行交互,这些服务器相互之间通过 TCP/IP 进行通信。
(2)数据模型



[*] ZooKeeper 的数据模型:ZooKeeper 提供了类似文件系统的数据模型,称为 ZooKeeper 数据树(ZooKeeper Data Tree)。

[*] /
├── apps
│   ├── app1
│   └── app2
├── config
│   ├── global
│   │   ├── setting1
│   │   └── setting2
│   └── local
│       └── setting3
└── services
    ├── service1
    │   ├── instances
    │   │   ├── instance1
    │   │   └── instance2
    │   └── status
    └── service2


[*] 根节点 / 是 ZooKeeper 数据树的起始点。
apps、config 和 services 是一级子节点。
apps 下有 app1 和 app2 两个子节点。
config 下有 global 和 local 两个子节点,global 下包含 setting1 和 setting2。
services 下有 service1 和 service2,service1 下有 instances 和 status 两个子节点,instances 下有 instance1 和 instance2。

[*] 节点(ZNode):ZooKeeper 数据树中的每个节点称为 ZNode,类似于文件系统中的目次或文件。每个 ZNode 可以存储一小段数据,并且可以关联 ACL(访问控制列表)来限制访问权限。
(3) 原子广播协议(ZAB)



[*] ZAB 协议:ZooKeeper 使用 ZAB(ZooKeeper Atomic Broadcast)协议来实现数据的同等性和可靠性。
[*] 基本概念:

[*]Leader 推选:ZAB 协议确保集群中只有一个 Leader 负责处理客户端请求和更新。
[*]原子广播:当 Leader 接收到客户端的写请求时,它将请求广播给所有的 Follower,并等候大多数 Follower 的确认,然后才将操作应用到自身状态。
[*]事务 ID:每个事务都有一个全局唯一的事务 ID,用于标识和排序。

(4)同等性和长期性



[*] 同等性包管:ZooKeeper 包管了在分布式情况下的数据同等性,所有的更新操作都是原子性的,并且按照客户端的次序执行。
[*] 长期性存储:ZooKeeper 将数据存储在磁盘上,并使用写前日志(Write-Ahead Log,WAL)来包管纵然在节点故障后也不会丢失数据。
(5)事故监听和通知



[*] Watch 机制:ZooKeeper 允许客户端注册 Watcher 监听某个 ZNode 的状态变化。
[*] 事故通知:一旦 ZNode 的状态发生变化(如数据更新、ZNode 删除等),ZooKeeper 将通知所有注册了 Watcher 的客户端,使得客户端可以及时相应状态变化。
2、应用场景

ZooKeeper 的工作机制使其在以下场景中广泛应用:


[*] 分布式锁:通过创建暂时次序节点实现分布式锁,确保在分布式情况下的资源竞争题目。
[*] 推选机制:通过 ZAB 协议实现 Leader 推选,确保分布式系统中只有一个主节点处理请求。
[*] 配置管理:存储和管理配置信息,各个节点通过 Watcher 及时感知配置的变化。
[*] 服务注册与发现:将服务节点作为 ZNode 注册到 ZooKeeper 中,客户端通过 Watcher 发现服务的上线和下线状态。
3、特点

分布式协调:


[*]ZooKeeper 提供了一个分布式情况下的同等性服务,资助应用步伐协调多个节点之间的操作和状态。
数据管理:


[*]应用步伐可以使用 ZooKeeper 存储和管理关键的元数据、配置信息以及其他动态数据。这些数据可以被多个节点共享和访问。
配置管理:


[*]ZooKeeper 提供了一个轻量级的分布式配置管理系统,可以会集管理和动态更新应用步伐的配置。
定名服务:


[*]作为一个定名服务,ZooKeeper 允许应用步伐注册和发现服务、节点大概资源的信息,从而简化了分布式系统中的服务发现和通信。
分布式锁:


[*]ZooKeeper 提供了高效的分布式锁机制,允许多个进程大概线程在分布式情况中安全地竞争资源访问。
事故通知:


[*]客户端可以注册监听器以接收关于数据变更和状态变化的通知,这使得应用步伐可以及时相应系统状态的变化。
4、zookeeper 推选机制

(1)角色先容



[*]Leader:负责处理所有的客户端写请求,并进行数据更新操作。Leader 也负责协调和同步所有的 Follower 节点。
[*]Follower:复制 Leader 的状态,并处理客户端的读请求。假如 Leader 失效,Follower 可以到场推选新的 Leader。
[*]Observer:观察者节点,接收并复制 Leader 的状态,但不到场投票和推选过程。用于扩展集群的读取本领,淘汰对 Leader 的读取压力。
(2)zookeeper 节点状态



[*] LOOKING(推选状态):

[*] 节点正在探求新的 Leader,即处于推选过程中。
[*] 发起推选请求,实行成为新的 Leader。

[*] FOLLOWING(跟随状态):

[*] Follower 节点的状态。
[*] 节点已经确定当前的 Leader,并跟随 Leader 处理请求。

[*] LEADING(向导状态):

[*] Leader 节点的状态。
[*] 节点负责处理和协调解个集群的写请求和事务处理。

[*] OBSERVING(观察状态):

[*] Observer 节点的状态。
[*] 类似于 Follower,但专门用于扩展集群的读取本领。
[*] 不到场 Leader 的推选过程,仅处理读请求,提拔系统的性能和吞吐量。

(3)第一次启动推选机制流程


[*] 启动阶段:

[*]当一个新的 ZooKeeper 节点首次启动时,它会进入初始状态,并实行加入到已经存在的集群中。
[*]假云云时集群中还没有 Leader(即第一次启动大概之前的 Leader 已经失效),新节点会启动推选过程。

[*] 节点状态:

[*]新节点将自己设置为 LOOKING 状态,表现它正在探求新的 Leader。

[*] 投票:

[*]新节点会向集群中的其他节点发送推选请求,请求其他节点投票支持自己成为 Leader。
[*]每个节点会为自己投票,并将投票结果广播给整个集群。

[*] 多数派原则:

[*]根据 ZooKeeper 的多数派原则,假如新节点能够获得超过半数节点的投票支持(N/2 + 1),它将成为新的 Leader。
[*]这确保了选出的 Leader 能够获得集群的大部分节点的认可和支持。

[*] Leader 确定:

[*]一旦新节点获得了多数节点的投票支持,它将成为新的 Leader。
[*]其他节点将根据新的 Leader 的通知和广播更新自己的状态,标志新的 Leader。

[*] 系统启动:

[*]一旦推选完成,ZooKeeper 系统将正式启动,新的 Leader 节点开始处理客户端的写请求,并协调解个集群的状态。

(4)注意事项



[*] 节点数和半数原则:

[*]ZooKeeper 集群中至少需要有奇数个节点,这样才能确保在推选中始终有一半以上的节点支持新的 Leader 的推选。

[*] 超时机制:

[*]在推选过程中,每个节点会设置一个超时时间。假如在超时时间内没有告竣半数投票,节点将重新发起新一轮的推选。

[*] 长期化:

[*]ZooKeeper 通过长期化存储和写前日志(WAL)来包管推选操作和数据更新的长期性,确保纵然在节点故障或重启后,系统状态可以恢复到之前的精确状态。

(5)非第一次推选机制



[*]非第一次推选机制是指在已经运行的 ZooKeeper 集群中,当现有的 Leader 出现故障或失效时,集群如何重新推选新的 Leader 的过程。这种推选机制的执行确保了集群在面对节点故障或网络分区等情况时能够快速恢复并保持高可用性。

[*] Leader 失效检测:

[*]集群中的各个节点(包括 Followers 和 Observers)会定期检测 Leader 节点的健康状态。
[*]假如节点发现 Leader 失效(例如无法与 Leader 通信或未能及时相应),则以为当前 Leader 失效。

[*] 触发推选:

[*]一旦有足够多的节点检测到 Leader 失效,它们会主动启动推选过程。
[*]触发推选的节点会向集群中广播推选请求,盼望其他节点投票支持自己成为新的 Leader。

[*] 投票过程:

[*]每个节点收到推选请求后会为自己投票,并将投票信息广播给集群中的其他节点。
[*]节点根据收到的投票数目来判断是否获得了多数派的支持(超过半数节点的投票支持)。

[*] 推选规则:

[*] EPOCH(推选周期)优先:

[*]每个服务器都有一个称为EPOCH的推选周期标识。EPOCH较大的服务器将优先成为新的Leader。
[*]这确保了假如有多个服务器同时实行成为Leader,具有最新推选周期的服务器将胜出。
事务ID(ZXID)优先:
   

[*]假如多个服务器的EPOCH相同,ZooKeeper将比力它们最近的事务ID(ZXID)。
[*]具有较大ZXID的服务器将优先成为Leader。ZXID表现ZooKeeper事务的全局唯一标识符,通常较大的ZXID表现服务器上存储的最新状态。
服务器ID(SID)优先:
   

[*]假如EPOCH和ZXID都相同,ZooKeeper将比力服务器的ID(SID)。
[*]具有较大SID的服务器将成为新的Leader。服务器ID是在集群配置时分配的唯一标识符,较大的SID通常意味着服务器在集群中具有更高的优先级。


[*] 多数派原则:

[*]根据 ZooKeeper 的多数派原则,只有获得多数节点的投票支持的节点才能成为新的 Leader。
[*]这确保了选出的新 Leader 能够获得集群中大部分节点的认可和支持,从而确保系统的同等性和可用性。

[*] 新 Leader 确定:

[*]一旦某个节点获得了足够的投票支持,它将成为新的 Leader。
[*]其他节点将根据新 Leader 的通知和广播更新自己的状态,标志新的 Leader。

[*] 系统恢复:

[*]新的 Leader 被推选出来后,它将接受集群的所有写请求和事务处理,确保集群状态的同等性和可用性。
[*]集群恢复正常运行,继续处理客户端的请求和事务操作。

(6)注意事项



[*]超时机制:

[*]在推选过程中,每个节点会设置一个超时时间。假如在超时时间内没有告竣半数投票,节点将重新发起新一轮的推选,确保集群能够迅速恢复和相应。

[*]节点状态变更:

[*]节点在从 FOLLOWING 或 OBSERVING 状态切换到 LOOKING 状态后,开始到场推选过程。
[*]推选完成后,节点可能会切换到 FOLLOWING 或 OBSERVING 状态,根据其在集群中的角色来处理读取请求或观察集群状态。

[*]长期化和日志:

[*]ZooKeeper 通过长期化存储和写前日志(WAL)来包管推选操作和数据更新的长期性,纵然在节点故障或重启后,系统状态也能够快速恢复到之前精确的状态。

5、zookeeper常用命令

(1)启动和管理 ZooKeeper 服务


[*] 启动 ZooKeeper 服务
bin/zkServer.sh start

[*] 停止 ZooKeeper 服务
bin/zkServer.sh stop

[*] 重启 ZooKeeper 服务
bin/zkServer.sh restart

[*] 查看 ZooKeeper 服务状态
bin/zkServer.sh status

(2)使用 ZooKeeper 客户端


[*] 启动 ZooKeeper 客户端
bin/zkCli.sh

[*] 毗连到特定的 ZooKeeper 服务器
bin/zkCli.sh
-server 127.0.0.1:2181
(3)ZooKeeper 客户端命令

在启动了 ZooKeeper 客户端后,可以使用以下命令:

[*] 创建 znode
create /my-node "some data"

[*] 获取 znode 数据
get /my-node

[*] 设置 znode 数据
set /my-node "new data"

[*] 删除 znode
delete /my-node

[*] 列出 znode
ls /

[*] 查看 znode 状态
stat /my-node

(4)高级管理命令


[*] 设置集群模式 在 conf/zoo.cfg 中配置多台服务器:
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

[*] 管理 ACL(访问控制列表)
setAcl /my-node world:anyone:r

(5)系统服务管理

将 ZooKeeper 设置为系统服务可以简化管理:

[*] 创建 Systemd 服务文件 在 /etc/systemd/system 目次下创建 zookeeper.service 文件:
Description=ZooKeeperAfter=network.targetType=forkingExecStart=/path/to/zookeeper/bin/zkServer.sh start
ExecStop=/path/to/zookeeper/bin/zkServer.sh stop
ExecReload=/path/to/zookeeper/bin/zkServer.sh restart
User=zookeeperGroup=zookeeperRestart=on-failureWantedBy=multi-user.target
[*] 启动和启用 ZooKeeper 服务
systemctl daemon-reload
systemctl start zookeeper
systemctl enable zookeeper

四、zookeeper + kafka集群搭建

1、项目需求

服务器摆设192.168.20.140zookeeper、kafka192.168.20.141zookeeper、kafka192.168.20.142zookeeper、kafka 2、zookeeper集群搭建

(1)关闭防火墙

systemctl stop firewalld
setenforce 0
(2)安装jdk和zookeeper

https://img-blog.csdnimg.cn/direct/21a8ef3f95884fe3bddca0b05732ae3f.png
(3)移动zookeeper到/usr/local下

mv zookeeper /usr/local/zookeeper
(4)配置zookeeper

# 复制模板配置文件
cp zoo_sample.cfg zoo.cfg

#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
tickTime=2000
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
initLimit=10
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认 为Follwer死掉,并从服务器列表中删除Follwer
syncLimit=5
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataDir=/tmp/zookeeper   
#添加, 指定存放日志的目录,目录需要单独创建
dataLogDir=/tmp/logs
#客户端连接端口
clientPort=2181
#添加集群信息
server.1=192.168.20.140:3188:3288
server.2=192.168.20.141:3188:3288
server.3=192.168.20.142:3188:3288



server.A=B:C:D
●A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件
myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面
的配置信息比较从而判断到底是哪个server
●c是这个服务器Follower与集群中的Leader服务器交换信息的端口
●D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就
是用来执行选举时服务器相互通信的端口
如果指定节点不参加选举,在末尾加observer
server.3=192.168.19.102:3188:3288:observer




#在每个节点上创建数据目录和日志目录
mkdir /tmp/zookeeper
mkdir /tmp/logs
#在每个节点的dataDir指定的目录下创建一个myid的文件
echo 1 > /tmp/zookeeper/myid
echo 2 > /tmp/zookeeper/myid
echo 3 > /tmp/zookeeper/myid
(5)将zookeeper加入到系统服务管理

cd /etc/systemd/systemvim zookeeper.serviceDescription=ZooKeeperAfter=network.targetType=forkingExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
Restart=on-failureWantedBy=multi-user.target# 写入后加载配置systemctl daemon-reload https://img-blog.csdnimg.cn/direct/ddecf2cbabe04c348a940e3f55a97474.png
3、kafka集群搭建

(1)解压kafka压缩包,将其移动到/usr/local目次下

https://img-blog.csdnimg.cn/direct/b69aa13f3f8d44878680000126817f4e.png
(2)配置kafka的配置文件

cd /kafka/config
# 备份配置文件
cp server.properties{,.bak}

vim server.properties
broker.id=0   
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置
broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.19.100:9092   
broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3   
#42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8         
#45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400      
#48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400   
socket.request.max.bytes=104857600   
log.dirs=/usr/local/kafka/logs      
#51行,接收套接字的缓冲区大小
#54行,请求套接字的缓冲区大小
#31行,指定监听的IP和端口,可以修改每个
#60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1   

#65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1   
log.retention.hours=168   
#69行,用来恢复和清理data下数据的线程数量
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默
认为7天,超时将被删除
log.segment.bytes=1073741824   
#110行,一个segment文件最大的大小,默认为 1G,超出将新建
一个新的segment文件
zookeeper.connect=192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181   
#123行,配置连接Zookeeper集群地址
# 如果设备延迟高,可以将zookeeper的连接超时时间改高一些
zookeeper.connection.timeout.ms=30000
(3)配置kafka启动脚本

#修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
#配置 Kafka 启动脚本
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'

case $1 in
start)
    echo "---------- Kafka 启动 ------------"
    ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
    ;;
stop)
    echo "---------- Kafka 停止 ------------"
    ${KAFKA_HOME}/bin/kafka-server-stop.sh
    ;;
restart)
    $0 stop
    $0 start
    ;;
status)
    echo "---------- Kafka 状态 ------------"
    count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    if [ "$count" -eq 0 ]; then
      echo "kafka is not running"
    else
      echo "kafka is running"
    fi
    ;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
    exit 1
    ;;
esac


# 添加权限
chmod +x /etc/init.d/kafka
(4)启动kafka,创建topic

service kafka start

# 创建topic
./kafka-topics.sh --create --zookeeper 192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181 --replication-factor 2 --partitions 3 --topic test

https://img-blog.csdnimg.cn/direct/738b74919ccf4a7c8d8a5ae02053714d.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: zookeeper + kafka消息队列