ToB企服应用市场:ToB评测及商务社交产业平台
标题:
kafka(启动集群,使用spring方法,自定义设置)
[打印本页]
作者:
美食家大橙子
时间:
2024-11-11 08:33
标题:
kafka(启动集群,使用spring方法,自定义设置)
Apache Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开辟,后来成为 Apache 项目。Kafka 重要用于高吞吐量、低延迟的实时数据流处理,常用于日志收集、实时分析、消息传递等场景。以下是关于 Kafka 的详细讲解:
一、理论知识
1.
Kafka 的基本概念
Kafka 是一个分布式的消息队列体系,通常用于处理大规模的实时数据流。它具有以下几个焦点组件:
Producer(生产者)
:生产者负责将消息发布到 Kafka 集群中的一个或多个主题(Topic)。它可以是任何能够天生数据的应用程序。
Consumer(消费者)
:消费者从 Kafka 中订阅主题并消费消息。消费者可以是一个或多个应用程序,它们处理从 Kafka 吸收到的消息。
Broker(署理)
:Kafka 集群由多个 Broker 组成。每个 Broker 负责存储和管理消息,并处理来自生产者和消费者的哀求。
Topic(主题)
:Kafka 中的消息按照主题进行分类。每个消息都有一个指定的主题,消费者可以基于主题进行订阅。
Partition(分区)
:每个主题可以有多个分区,Kafka 会将消息分散到多个分区中,以实现负载均衡和高可用性。分区的数量在创建主题时指定,数据按照分区顺序进行存储。
Replica(副本)
:Kafka 通过副本机制包管数据的高可用性。每个分区可以有多个副本,分布在不同的 Broker 上。如许可以制止单点故障,提高容错性。
2.
Kafka 的消息传递机制
Kafka 使用发布-订阅模式来传递消息。消息的传递过程包罗以下几个步调:
Producer 发布消息
:生产者将消息发送到 Kafka 中指定的主题。消息可以按照特定的策略(如随机、轮询等)分配到不同的分区中。
Broker 存储消息
:消息会被存储在相应的分区中。每个分区的消息是有序的,并且可以被消费者按顺序消费。
Consumer 消费消息
:消费者订阅一个或多个主题,并消费此中的消息。消费者会根据分区的位移(offset)来追踪已经消费的消息,确保不重复消费。
消息确认与可靠性
:消费者在成功消费消息后会发送确认给 Kafka,表示消息已经被成功处理。Kafka 会根据设置(如 acks 设置)决定消息的确认机制。如果消息在某个阶段发生丢失,Kafka 会利用副本机制进行恢复。
3.
Kafka 主题(Topic)
Kafka 中的
主题(Topic)
是消息的基本分类方式,用于组织和区分消息流。可以把主题看作是 Kafka 中的一种
消息类别
,消费者和生产者通过主题来进行消息的发送和吸收。
生产者向主题发送消息
:Kafka 的生产者将消息发送到指定的主题。每个主题都有一个唯一的名字。
消费者订阅主题
:消费者根据主题名来订阅消息。一个消费者可以订阅多个主题,获取来自不同主题的消息。
多个生产者和消费者可以订阅同一个主题
:一个主题可以被多个生产者写入,同时也可以被多个消费者读取。消费者组的管理
确保每条消息在一个组内只能被一个消费者消费。
4.
分区(Partition)
Kafka 中的
分区(Partition)
是主题的子部分,它是 Kafka 中并行处理的基本单元。每个主题可以有多个分区。分区的作用是将主题的消息分散到多个服务器(Kafka Broker)上存储,从而提高吞吐量和可伸缩性。
消息的顺序性
:每个分区内的消息是有顺序的。Kafka 包管同一分区中的消息按照发送顺序排列。
并行消费
:通过将一个主题划分为多个分区,Kafka 可以支持并行处理。当多个消费者组中的消费者订阅一个主题时,Kafka 会将分区分配给不同的消费者,实现消息的并行消费。
5.
偏移量(Offset)
每个分区中的消息都有一个唯一的编号,称为
偏移量(Offset)
。偏移量是 Kafka 用来跟踪消息消费进度的标识符。
消费进度管理
:消费者通过偏移量来跟踪自己消费到哪条消息。例如,消费者读取消息时会记录末了消费的消息偏移量,下次启动时会从该偏移量处继续消费。
消费者组的偏移量
:每个消费者组有独立的偏移量。不同的消费者组会有不同的消费进度。
6.
生产者(Producer)
Kafka 中的
生产者(Producer)
是负责将消息发送到 Kafka 主题的客户端。
消息发送
:生产者将消息发送到一个主题的一个或多个分区。生产者通常会根据消息的键(key)来决定将消息发送到哪个分区。(可自定义)
7.
消费者(Consumer)
Kafka 中的
消费者(Consumer)
是从 Kafka 中读取消息的客户端。
消息消费
:消费者订阅一个或多个主题后,会从 Kafka 集群中消费消息。消费者通常会消费指定分区中的消息。
8.
消费者组(Consumer Group)
Kafka 中的 消费者组(Consumer Group) 是多个消费者的集合,共同消费一个或多个主题中的消息。消费者组的焦点目标是
并行消费
,确保消息在组内
每个分区只被一个消费者消费。
消费的并行性
:消费者组使得多个消费者可以并行消费多个分区。每个消费者组负责管理自己消费的消息的偏移量。
9.
Kafka 的数据存储与日志机制
Kafka 的存储机制基于日志文件,每个主题的分区都会有一个独立的日志文件。Kafka 的日志机制具有以下特点:
顺序写入
:Kafka 使用顺序写入的方式,全部的消息都会追加到日志文件的末尾,这使得磁盘的 I/O 利用非常高效。
不可修改的日志
:消息一旦被写入日志文件,就不能修改。这种设计简化了分布式体系中的数据一致性问题,同时也为后期的消息回溯和重放提供了便利。
日志保留策略
:Kafka 会根据设置的日志保留策略来决定日志文件的保存时间或大小。可以设置基于时间(如保留 7 天内的日志)或基于大小(如日志文件达到一定大小后删除旧的日志)。
10.
Kafka 的高可用性与容错性
Kafka 具有高度的容错性,重要通过以下几种机制实现:
副本机制
:每个分区的消息会有多个副本,存储在不同的 Broker 上。一个分区的向导者(leader)负责处理全部的读写哀求,而副本(follower)则跟随向导者同步数据。如果向导者故障,Kafka 会自动选举新的向导者,包管数据的可用性。
数据复制
:Kafka 会将消息同步到副本中,以防止单点故障。在副本数设置较高的情况下,即使有多个 Broker 故障,Kafka 也能保持数据的高可用性。
消息确认(ACKs)
:Kafka 提供三种不同的确认策略来包管消息的可靠性:
acks=0:生产者不期待确认,消息可能会丢失。
acks=1:生产者期待至少一个副本确认,消息如果被写入一个副本就会以为成功。
acks=all(或 acks=-1):生产者期待全部副本确认,提供最高的可靠性。
二、启动一个kafka服务和一个zookeeper
假设我本地 D:\kafka 下有下载好的 kafka 和 zookeeper。(kafka默认自带了一个 zookeeper, 这里我下载了一个zookeeper)
1.
启动 Zookeeper
Kafka 使用 Zookeeper 进行集群协调,因此 Zookeeper 是启动 Kafka 的前提。你可以使用 Kafka 自带的 Zookeeper 启动脚本启动一个本地 Zookeeper 实例。(非window体系的话不消进到window目次, 只需要bin目次)
打开一个新的 PowerShell 窗口,进入到 Kafka 解压目次中的 bin\windows 目次:
cd D:\kafka\kafka_2.13-3.9.0\bin\windows
复制代码
启动 Zookeeper:
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
复制代码
这个命令会启动一个本地的 Zookeeper 实例,并默认监听在 localhost:2181。出现 Zookeeper 启动日志,表明 Zookeeper 服务正在运行:
[2024-11-08 10:00:00,123] INFO binding to port 2181 (org.apache.zookeeper.server.ZooKeeperServer)
复制代码
2.
启动 Kafka
打开另一个新的 PowerShell 窗口,进入 Kafka 解压目次中的 bin\windows 目次:
cd D:\kafka\kafka_2.13-3.9.0\bin\windows
复制代码
启动 Kafka 服务器:
.\kafka-server-start.bat ..\..\config\server.properties
复制代码
默认情况下,Kafka 会监听 localhost:9092,并连接到本地的 Zookeeper 实例(localhost:2181)。出现 Kafka 启动日志,表明 Kafka 服务正在运行:
[2024-11-08 10:05:00,456] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
复制代码
3.
验证 Kafka 是否正常工作
现在,你已经成功启动了 Kafka 服务。你可以通过创建一个 Kafka 主题并发送/吸收消息来验证 Kafka 是否工作正常。
创建 Kafka 主题
(例如,创建一个名为 test 的主题):
.\kafka-topics.bat --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
复制代码
如果命令执行成功,表示 Kafka 已经启动并且主题 test 被创建。
列出全部 Kafka 主题
:
.\kafka-topics.bat --list --bootstrap-server localhost:9092
复制代码
如果显示 test 主题,说明 Kafka 启动正常。
启动生产者
(发送消息到 test 主题):
.\kafka-console-producer.bat --topic test --bootstrap-server localhost:9092
复制代码
输入一些消息并按回车(例如:Hello Kafka),这会将消息发送到 Kafka 服务器的 test 主题。
启动消费者
(消费 test 主题的消息):
.\kafka-console-consumer.bat --topic test --bootstrap-server localhost:9092 --from-beginning
复制代码
如果消费者显示出你输入的消息,说明 Kafka 正常工作。
二、搭建zookeeper集群
1. Zookeeper 集群的概念
Zookeeper 是一个分布式协调服务,它通过一个
集群(ensemble)
来提供高可用性。Zookeeper 集群通常由多个节点组成(一般是 3、5 或 7 个节点),以包管在单个节点失效时,集群仍能正常工作。
Zookeeper 使用
Leader-Follower
模式,集群中的一个节点会被选举为
Leader
,负责处理写哀求,其他节点作为
Follower
,负责处理读哀求。通过这种方式,Zookeeper 包管了分布式体系的一致性和高可用性。
2.
准备工作
确保已经准备好多个服务器,或者在本地启动多个 Zookeeper 实例来模拟集群。
(这里我们接纳本地启动多个实例来演示)
Zookeeper 集群至少需要 3 个节点,建议使用 3、5 或 7 个节点。
每个节点都需要分配一个
唯一的 id
,这对于 Zookeeper 的 Leader 选举非常重要。
设置 myid 文件来标识每个节点。
3.
设置 Zookeeper 集群
我们以三个节点的 Zookeeper 集群为例,演示如何设置集群。
3.1 设置 zoo.cfg
每个 Zookeeper 节点都需要设置一个 zoo.cfg 设置文件。你可以在每个节点的 conf 目次下找到这个文件(默认是zoo_sample.cfg, 这是官方给的例子, 我们可以复制一份, 弄成 zoo.cfg)。以下是一些重要的设置项:
dataDir
:Zookeeper 存储数据和日志的目次。
clientPort
:客户端连接 Zookeeper 的端口。
server.X
:Zookeeper 集群中的服务器列表,此中 X 是每个节点的唯一标识符。
initLimit
和
syncLimit
:这些参数控制 Zookeeper 集群中的节点之间的同步行为。
3.2 示例设置文件
假设你有 3 个 Zookeeper 节点,分别是 localhost:2181、localhost:2182 和 localhost:2183,以下是每个节点的 zoo.cfg 设置文件的示例。
(这里我们是在本地起了三个zookeeper实例, 把不同的实例运行在不同的端口,正常三台服务器的话应该都运行在默认端口2181, 然后 localhost 换成服务器的ip地址)
下面给出三个服务器的 zoo.cfg 设置文件。(这里我们还是本地起的三个实例)
Node 1 (localhost:2181)
:
# 数据存储目录
dataDir=D:/zookeeper/data1 # 服务器的话可以都写成 /zookeeper/data
# 客户端连接端口
clientPort=2181
# 启动服务器的数量
initLimit=5
syncLimit=2
# 集群成员
server.1=localhost:2888:3888 # 三台服务器的话应该是 localhost 换成三个服务器的ip,三个都是2888.3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
复制代码
Node 2 (localhost:2182)
:
dataDir=D:/zookeeper/data2
clientPort=2182
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
复制代码
Node 3 (localhost:2183)
:
dataDir=D:/zookeeper/data3
clientPort=2183
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
复制代码
3.3
myid 文件
每个 Zookeeper 节点都需要一个 myid 文件,文件中存储一个唯一的 ID,用于标识该节点。这是 Zookeeper 集群中的一个重要设置项。
在每个节点的 dataDir 目次下创建一个名为 myid 的文件。
文件内容为节点的 ID。例如:
Node 1
:在 D:/zookeeper/data1 目次下创建一个文件 myid,内容为 1。
Node 2
:在 D:/zookeeper/data2 目次下创建一个文件 myid,内容为 2。
Node 3
:在 D:/zookeeper/data3 目次下创建一个文件 myid,内容为 3。
4.
启动 Zookeeper 集群
启动每个 Zookeeper 节点
:
使用以下命令分别在不同的终端(或者在不同的 PowerShell 窗口)中启动每个 Zookeeper 节点。进入到 Zookeeper 的 bin\windows 目次,然后执行以下命令启动 Zookeeper。
启动第一个节点:
cd D:\zookeeper\zookeeper-3.8.4\bin\windows
.\zkServer.cmd
复制代码
启动第二个节点:
cd D:\zookeeper\zookeeper-3.8.4\bin\windows
.\zkServer.cmd
复制代码
启动第三个节点:
cd D:\zookeeper\zookeeper-3.8.4\bin\windows
.\zkServer.cmd
复制代码
验证集群状态
:
在启动 Zookeeper 节点之后,可以通过以下命令验证集群的状态:
cd D:\zookeeper\zookeeper-3.8.4\bin\windows
.\zkCli.cmd
复制代码
进入 Zookeeper CLI 后,输入以下命令查看集群状态:
status
复制代码
如果一切正常,你应该会看到以下类似的输出,表示 Zookeeper 集群已经成功启动:
Mode: follower
复制代码
说明你当前地点的节点是
Follower
,而集群中可能有一个节点是
Leader
。
5.
如何连接到 Zookeeper 集群
客户端可以通过任何一个 Zookeeper 节点的 clientPort 来连接到集群。
例如,使用以下命令连接到 localhost:2181:
.\zkCli.cmd -server localhost:2181
复制代码
如许就可以连接到 Zookeeper 集群的某个节点,执行干系的利用。
三、搭建kafka集群
在分布式环境中,通常会启动多个 Kafka 实例,形成一个
Kafka 集群
。Kafka 集群中的全部节点都依赖
Zookeeper
来进行协调(如向导者选举、分区分配等)。以下是如何通过 Zookeeper 启动一个简朴的 Kafka 集群。
1.
设置多个 Kafka 实例
一个 Kafka 集群通常由多个 Kafka broker 组成,每个 broker 都会连接到 Zookeeper。假设有 3 个 Kafka broker,它们的设置文件分别是 server-1.properties、server-2.properties 和 server-3.properties。
(这里是我们在一个服务器上运行三个kafka)
每个 server.properties 设置文件需要设置
唯一的 broker.id
,
Zookeeper 地址
,以及其他干系设置。
复制 server.properties 文件
,创建 3 个设置文件,分别定名为 server-1.properties、server-2.properties 和 server-3.properties,并修改以下参数:
broker.id
:每个 broker 必须有一个唯一的 broker.id。
listeners
:指定每个 broker 的监听地址。
log.dirs
:设置每个 broker 存储日志的目次。
zookeeper.connect
:全部 broker 都需要连接到同一个 Zookeeper 实例。
例如,server-1.properties 文件设置如下:
broker.id=1
listeners=PLAINTEXT://localhost:9091
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=localhost:2181,localhost:2182,localhost2183 # 因为我们上面启动的三个zookeeper是同一主机的这三个端口
复制代码
server-2.properties 和 server-3.properties 文件内容相似,只需修改 broker.id 和 listeners 设置:
server-2.properties 中设置 broker.id=2,listeners=PLAINTEXT://localhost:9092。
server-3.properties 中设置 broker.id=3,listeners=PLAINTEXT://localhost:9093。
2.
启动 Kafka 集群中的各个节点
打开多个 PowerShell 窗口,分别进入 Kafka 的 bin\windows 目次。
启动每个 Kafka broker(分别对应 server-1.properties、server-2.properties 和 server-3.properties 设置文件):
.\kafka-server-start.bat ..\..\config\server-1.properties
复制代码
.\kafka-server-start.bat ..\..\config\server-2.properties
复制代码
.\kafka-server-start.bat ..\..\config\server-3.properties
复制代码
3.
验证 Kafka 集群
你可以通过以下命令验证 Kafka 集群是否正常启动:
列出全部主题
:
.\kafka-topics.bat --list --bootstrap-server localhost:9091
复制代码
你应该能看到 Kafka 集群中的全部主题。
创建一个主题并验证其分区
:
.\kafka-topics.bat --create --topic test --bootstrap-server localhost:9091 --partitions 3 --replication-factor 3
复制代码
该命令会创建一个名为 test 的主题,分配 3 个分区,并为每个分区分配 3 个副本。
查看 Kafka 集群的状态
:
你可以使用以下命令来检查 Kafka 集群的康健状态和分区分布。
.\kafka-topics.bat --describe --topic test --bootstrap-server localhost:9091
复制代码
四、kafka客户端开辟
这里我们讲的是spring在kafka原生的api上进行封装后的用法
1. 添加依赖
首先,你需要在 Spring Boot 项目标 pom.xml 中添加 Kafka 干系的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-streams</artifactId>
</dependency>
</dependencies>
复制代码
简朴的设置
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test1Group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
复制代码
2. 创建 Kafka 消息生产者
Kafka 消息生产者是向 Kafka 主题发送消息的组件。可以使用 KafkaTemplate 来发送消息。
KafkaTemplate 用来发送消息。send() 方法吸收两个参数:主题名和消息内容。
@Autowired
private KafkaTemplate<String, String> kafkaTemplate; // 消息是 key value的形式发送
// 对于指定key的,kafka会根据key的哈希值把他们放到一个partition中
// kafkaTemplate.send("test-topic", "kafkaKey1" "Kafka value");
// 没有指定key的,Kafka 会将消息发送到一个随机或轮询的分区,具体行为取决于我们使用的 Kafka 生产者配置
kafkaTemplate.send("test-topic", "KafkaValue");
复制代码
3. 创建 Kafka 消息消费者
Kafka 消息消费者是从 Kafka 主题吸收消息的组件。可以使用 @KafkaListener 注解来创建消费者,
消费者会自动监听指定的主题
消费者类示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
// @KafkaListener(topics = "test-topic", groupId = "test-group")
// public void listen(ConsumerRecord<String, String> record) {
// System.out.println(record.key());
// System.out.println(record.value());
// }
// 使用 @KafkaListener 注解标注消费的主题和消费者组
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
复制代码
在 @KafkaListener 中,topics 指定消费的主题,groupId 指定消费者组。
4. @KafkaListener
@KafkaListener 是 Spring Kafka 中最常用的注解,用于定义 Kafka 消费者。它标志一个方法为 Kafka 消费者,并指示该方法监听一个或多个 Kafka 主题。
用途
:该注解用于定义一个方法监听 Kafka 主题,自动从 Kafka 消费消息并传递给方法参数。
常用属性
:
topics:指定要监听的 Kafka 主题(可以指定多个主题)。
groupId:指定消费者组 ID(如果未指定,将使用默认值)。
containerFactory:指定自定义的 Kafka 消费者容器工厂,通常用于处理消息的序列化和反序列化。
示例
:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
复制代码
@KafkaListener 注解标志的方法会监听指定的 Kafka 主题(test-topic)并吸收来自该主题的消息。
groupId 定义了消费者组 ID,多个消费者可以属于同一个组,从而实现消息的负载均衡。
5. @KafkaHandler
@KafkaHandler 用于处理复杂消息类型时,将不同的消息类型分配到不同的方法。
用途
:@KafkaHandler 用于区分消息处理方法,通常在处理不同类型的消息时使用。
适用场景
:当同一个 Kafka 监听器需要处理多种不同的消息类型时,你可以使用 @KafkaHandler 标志不同的方法来处理每种类型。
示例
:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
@KafkaListener(topics = "test-topic")
public class KafkaConsumer {
@KafkaHandler
public void handleMessage(String message) {
System.out.println("Received string message: " + message);
}
@KafkaHandler
public void handleOtherMessage(Integer message) {
System.out.println("Received integer message: " + message);
}
}
复制代码
这个例子中,@KafkaListener 注解指定了监听 test-topic 主题,而 @KafkaHandler 注解则将不同类型的消息分配到不同的方法。
如果消息类型是 String,handleMessage 会处理;如果消息类型是 Integer,handleOtherMessage 会处理。
6. @KafkaListeners
@KafkaListeners 是一个容器注解,它允许你在一个类上定义多个 @KafkaListener 注解。
用途
:@KafkaListeners 用于在一个类上同时声明多个 @KafkaListener 注解,如许就能监听多个主题或使用不同的消费者组。
适用场景
:当你需要在一个类中定义多个 Kafka 监听器时,可以用 @KafkaListeners 来批量定义。
示例
:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.stereotype.Service;
@Service
@KafkaListeners({
@KafkaListener(topics = "topic1", groupId = "group1"),
@KafkaListener(topics = "topic2", groupId = "group2")
})
public class KafkaConsumer {
@KafkaListener(topics = "topic1", groupId = "group1")
public void listenTopic1(String message) {
System.out.println("Received message from topic1: " + message);
}
@KafkaListener(topics = "topic2", groupId = "group2")
public void listenTopic2(String message) {
System.out.println("Received message from topic2: " + message);
}
}
复制代码
@KafkaListeners 允许你在一个类上声明多个 @KafkaListener 注解,分别监听不同的主题,并在方法中处理不同的消息。
五、springboot中 kafka的设置
在 Spring Boot 中使用 Kafka,重要涉及设置 Kafka 的生产者(Producer)和消费者(Consumer)。
1.
Kafka 生产者设置(Producer Configuration)
生产者用于将消息发送到 Kafka 主题。常用的生产者设置项如下:
设置示例(application.properties):
# Kafka 生产者配置
spring.kafka.producer.bootstrap-servers=localhost:9092 # Kafka 集群地址 多个地址用 , 隔开
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer # 默认的键的序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 默认的值的序列化器
spring.kafka.producer.acks=all # 确认方式
spring.kafka.producer.retries=3 # 生产者发送失败时的重试次数
spring.kafka.producer.batch-size=16384 # 批量发送的最大消息数(单位:字节)16k
spring.kafka.producer.linger-ms=1 # 生产者等待更多消息的时间(单位:毫秒)
spring.kafka.producer.buffer-memory=33554432 # 内存缓冲区的大小(单位:字节)
复制代码
设置项详细讲解:
spring.kafka.producer.bootstrap-servers
:
作用
:指定 Kafka 集群的地址,生产者连接到该地址进行消息发送。可以设置多个 Kafka 服务器地址,包管高可用性。
示例
:localhost:9092。
spring.kafka.producer.key-serializer
:
作用
:指定生产者键(Key)的序列化器。Kafka 传输的数据需要序列化和反序列化。常用的序列化器有 StringSerializer、IntegerSerializer 等。
示例
:org.apache.kafka.common.serialization.StringSerializer。
spring.kafka.producer.value-serializer
:
作用
:指定生产者值(Value)的序列化器。与 key-serializer 类似,value-serializer 用于指定消息体(值)的序列化器。
示例
:org.apache.kafka.common.serialization.StringSerializer。
spring.kafka.producer.acks
:
作用
:指定生产者消息的确认机制。可以选择以下几个值:
0:不期待确认,即消息发送后立刻返回。
1:期待 Leader 节点确认。
all:期待全部副本确认(包管消息可靠性,推荐使用)。
示例
:all。
spring.kafka.producer.retries
:
作用
:指定消息发送失败时,生产者的重试次数。默认值是 0,如果为 -1,则会一直重试,直到成功。
示例
:3。
spring.kafka.producer.batch-size
:
作用
:指定批量发送的消息大小(单位:字节)。当达到该大小时,生产者将立刻发送消息。如果设置得过小,可能会影响吞吐量;如果设置过大,可能会增加延迟。
示例
:16384 字节(16 KB)。
spring.kafka.producer.linger-ms
:
作用
:指定生产者发送消息前期待的时间。默认是 0,即立刻发送。如果设置为正数,生产者会等到这个时间,尝试批量发送更多的消息,提高吞吐量。
示例
:1 毫秒。
spring.kafka.producer.buffer-memory
:
作用
:指定生产者的缓冲区大小(单位:字节)。如果缓冲区填满,生产者会阻塞,直到缓冲区有空间。如果缓冲区太小,会影响吞吐量。
示例
:33554432 字节(32 MB)。
2.
Kafka 消费者设置(Consumer Configuration)
消费者用于从 Kafka 中消费消息。常用的消费者设置项如下:
设置示例(application.properties):
# Kafka 消费者配置
spring.kafka.consumer.bootstrap-servers=localhost:9092 # Kafka 集群地址
spring.kafka.consumer.group-id=my-group1 # 默认的消费者组,如果@KafkaListener注解没有写消费者组就会使用这个消费者组
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 默认的键的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 默认的值的反序列化器
spring.kafka.consumer.auto-offset-reset=earliest # 消费者如何处理未提交的消息
spring.kafka.consumer.enable-auto-commit=true # 是否自动提交消费偏移量
spring.kafka.consumer.fetch-min-bytes=50000 # 从 Kafka 拉取消息的最小字节数
spring.kafka.consumer.max-poll-records=500 # 每次拉取的最大消息数量
spring.kafka.consumer.session-timeout=15000 # 会话超时时间(单位:毫秒)
复制代码
设置项详细讲解:
spring.kafka.consumer.bootstrap-servers
:
作用
:指定 Kafka 集群的地址,消费者连接到该地址进行消息消费。
示例
:localhost:9092。
spring.kafka.consumer.group-id
:
作用
:指定消费者所属的消费组 ID。Kafka 消费者通过消费组来处理消息。如果多个消费者属于同一组,它们会共享消费队列中的消息。不同组的消费者互不干扰。
示例
:my-group。
spring.kafka.consumer.key-deserializer
:
作用
:指定消费者键(Key)的反序列化器。与生产者的 key-serializer 设置相对,消费者需要对消息的键进行反序列化。常用的反序列化器有 StringDeserializer、IntegerDeserializer 等。
示例
:org.apache.kafka.common.serialization.StringDeserializer。
spring.kafka.consumer.value-deserializer
:
作用
:指定消费者值(Value)的反序列化器。与生产者的 value-serializer 设置相对,消费者需要对消息的值进行反序列化。
示例
:org.apache.kafka.common.serialization.StringDeserializer。
spring.kafka.consumer.auto-offset-reset
:
作用
:指定消费者如何处理偏移量(offset)未提交的情况。常见的值有:
earliest:从最早的消息开始消费。
latest:从最新的消息开始消费(默认行为)。
示例
:earliest。
spring.kafka.consumer.enable-auto-commit
:
作用
:是否启用自动提交消息的偏移量。true 表示自动提交,false 表示手动提交。自动提交方式不适合复杂的消费逻辑,手动提交可以确保消费的消息被精确处理。
示例
:true。
spring.kafka.consumer.fetch-min-bytes
:
作用
:指定从 Kafka 拉取消息时的最小字节数。如果消息总量小于该值,消费者会期待更多的数据一起拉取,以减少频繁的拉取哀求。
示例
:50000 字节(50 KB)。
spring.kafka.consumer.max-poll-records
:
作用
:指定每次拉取的最大消息数。这个设置有助于控制消费者每次从 Kafka 中消费多少消息,可以防止消费者一次性拉取过多消息导致内存溢出。
示例
:500。
spring.kafka.consumer.session-timeout
:
作用
:指定消费者会话的超时时间。消费者会定期发送心跳以保持会话活泼。如果在指定时间内没有收到心跳,Kafka 会以为该消费者已死掉,并将其从消费组中移除。
示例
:15000 毫秒(15 秒)。
3.
Kafka 高级设置
设置示例(application.properties):
# Kafka 生产者高级配置
spring.kafka.producer.compression-type=gzip # 压缩类型(支持 gzip, snappy, lz4, zstd)
spring.kafka.producer.max-request-size=1048576 # 请求的最大字节数
# Kafka 消费者高级配置
spring.kafka.consumer.isolation-level=read_committed # 读取事务提交的消息
spring.kafka.consumer.max-poll-interval=300000 # 最大轮询间隔(单位:毫秒)
spring.kafka.consumer.fetch-max-wait=500 # 最大拉取等待时间(单位:毫秒)
复制代码
解释:
spring.kafka.producer.compression-type
:
作用
:生产者消息的压缩类型,可以减少网络传输的开销。常用的压缩格式有 gzip、snappy、lz4 和 zstd。
spring.kafka.producer.max-request-size
:
作用
:设置 Kafka 生产者哀求的最大字节数,防止消息过大导致的哀求失败。
spring.kafka.consumer.isolation-level
:
作用
:指定消费者读取事务消息的隔离级别,read_committed 表示只读取已提交的消息,read_uncommitted 表示可以读取未提交的消息。
spring.kafka.consumer.max-poll-interval
:
作用
:指定消费者每次拉取消息之间的最大时间间隔。如果超过该时间间隔,Kafka 会以为该消费者已经失效。
spring.kafka.consumer.fetch-max-wait
:
作用
:指定消费者在拉取消息时的最大期待时间。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4