ToB企服应用市场:ToB评测及商务社交产业平台

标题: kafka(启动集群,使用spring方法,自定义设置) [打印本页]

作者: 美食家大橙子    时间: 2024-11-11 08:33
标题: kafka(启动集群,使用spring方法,自定义设置)
Apache Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开辟,后来成为 Apache 项目。Kafka 重要用于高吞吐量、低延迟的实时数据流处理,常用于日志收集、实时分析、消息传递等场景。以下是关于 Kafka 的详细讲解:
一、理论知识

1. Kafka 的基本概念

Kafka 是一个分布式的消息队列体系,通常用于处理大规模的实时数据流。它具有以下几个焦点组件:
2. Kafka 的消息传递机制

Kafka 使用发布-订阅模式来传递消息。消息的传递过程包罗以下几个步调:
3. Kafka 主题(Topic)

Kafka 中的 主题(Topic) 是消息的基本分类方式,用于组织和区分消息流。可以把主题看作是 Kafka 中的一种 消息类别,消费者和生产者通过主题来进行消息的发送和吸收。
4. 分区(Partition)

Kafka 中的 分区(Partition) 是主题的子部分,它是 Kafka 中并行处理的基本单元。每个主题可以有多个分区。分区的作用是将主题的消息分散到多个服务器(Kafka Broker)上存储,从而提高吞吐量和可伸缩性。
5. 偏移量(Offset)

每个分区中的消息都有一个唯一的编号,称为 偏移量(Offset)。偏移量是 Kafka 用来跟踪消息消费进度的标识符。
6. 生产者(Producer)

Kafka 中的 生产者(Producer) 是负责将消息发送到 Kafka 主题的客户端。
7. 消费者(Consumer)

Kafka 中的 消费者(Consumer) 是从 Kafka 中读取消息的客户端。
8. 消费者组(Consumer Group)

Kafka 中的 消费者组(Consumer Group) 是多个消费者的集合,共同消费一个或多个主题中的消息。消费者组的焦点目标是 并行消费,确保消息在组内每个分区只被一个消费者消费。
9. Kafka 的数据存储与日志机制

Kafka 的存储机制基于日志文件,每个主题的分区都会有一个独立的日志文件。Kafka 的日志机制具有以下特点:
10. Kafka 的高可用性与容错性

Kafka 具有高度的容错性,重要通过以下几种机制实现:
二、启动一个kafka服务和一个zookeeper

假设我本地 D:\kafka 下有下载好的 kafka 和 zookeeper。(kafka默认自带了一个 zookeeper, 这里我下载了一个zookeeper)

1. 启动 Zookeeper

Kafka 使用 Zookeeper 进行集群协调,因此 Zookeeper 是启动 Kafka 的前提。你可以使用 Kafka 自带的 Zookeeper 启动脚本启动一个本地 Zookeeper 实例。(非window体系的话不消进到window目次, 只需要bin目次)
2. 启动 Kafka

3. 验证 Kafka 是否正常工作

现在,你已经成功启动了 Kafka 服务。你可以通过创建一个 Kafka 主题并发送/吸收消息来验证 Kafka 是否工作正常。
二、搭建zookeeper集群

1. Zookeeper 集群的概念

Zookeeper 是一个分布式协调服务,它通过一个 集群(ensemble) 来提供高可用性。Zookeeper 集群通常由多个节点组成(一般是 3、5 或 7 个节点),以包管在单个节点失效时,集群仍能正常工作。
Zookeeper 使用 Leader-Follower 模式,集群中的一个节点会被选举为 Leader,负责处理写哀求,其他节点作为 Follower,负责处理读哀求。通过这种方式,Zookeeper 包管了分布式体系的一致性和高可用性。
2. 准备工作

确保已经准备好多个服务器,或者在本地启动多个 Zookeeper 实例来模拟集群。(这里我们接纳本地启动多个实例来演示)
3. 设置 Zookeeper 集群

我们以三个节点的 Zookeeper 集群为例,演示如何设置集群。
3.1 设置 zoo.cfg

每个 Zookeeper 节点都需要设置一个 zoo.cfg 设置文件。你可以在每个节点的 conf 目次下找到这个文件(默认是zoo_sample.cfg, 这是官方给的例子, 我们可以复制一份, 弄成 zoo.cfg)。以下是一些重要的设置项:
3.2 示例设置文件

假设你有 3 个 Zookeeper 节点,分别是 localhost:2181、localhost:2182 和 localhost:2183,以下是每个节点的 zoo.cfg 设置文件的示例。(这里我们是在本地起了三个zookeeper实例, 把不同的实例运行在不同的端口,正常三台服务器的话应该都运行在默认端口2181, 然后 localhost 换成服务器的ip地址)
下面给出三个服务器的 zoo.cfg 设置文件。(这里我们还是本地起的三个实例)
  1. # 数据存储目录
  2. dataDir=D:/zookeeper/data1         # 服务器的话可以都写成 /zookeeper/data
  3. # 客户端连接端口
  4. clientPort=2181
  5. # 启动服务器的数量
  6. initLimit=5
  7. syncLimit=2
  8. # 集群成员
  9. server.1=localhost:2888:3888                # 三台服务器的话应该是 localhost 换成三个服务器的ip,三个都是2888.3888
  10. server.2=localhost:2889:3889
  11. server.3=localhost:2890:3890
复制代码
  1. dataDir=D:/zookeeper/data2
  2. clientPort=2182
  3. initLimit=5
  4. syncLimit=2
  5. server.1=localhost:2888:3888
  6. server.2=localhost:2889:3889
  7. server.3=localhost:2890:3890
复制代码
  1. dataDir=D:/zookeeper/data3
  2. clientPort=2183
  3. initLimit=5
  4. syncLimit=2
  5. server.1=localhost:2888:3888
  6. server.2=localhost:2889:3889
  7. server.3=localhost:2890:3890
复制代码
3.3 myid 文件

每个 Zookeeper 节点都需要一个 myid 文件,文件中存储一个唯一的 ID,用于标识该节点。这是 Zookeeper 集群中的一个重要设置项。
4. 启动 Zookeeper 集群

5. 如何连接到 Zookeeper 集群

三、搭建kafka集群

在分布式环境中,通常会启动多个 Kafka 实例,形成一个 Kafka 集群。Kafka 集群中的全部节点都依赖 Zookeeper 来进行协调(如向导者选举、分区分配等)。以下是如何通过 Zookeeper 启动一个简朴的 Kafka 集群。
1. 设置多个 Kafka 实例

一个 Kafka 集群通常由多个 Kafka broker 组成,每个 broker 都会连接到 Zookeeper。假设有 3 个 Kafka broker,它们的设置文件分别是 server-1.properties、server-2.properties 和 server-3.properties。(这里是我们在一个服务器上运行三个kafka)
每个 server.properties 设置文件需要设置 唯一的 broker.idZookeeper 地址,以及其他干系设置。
2. 启动 Kafka 集群中的各个节点

3. 验证 Kafka 集群

你可以通过以下命令验证 Kafka 集群是否正常启动:
四、kafka客户端开辟


这里我们讲的是spring在kafka原生的api上进行封装后的用法
1. 添加依赖

首先,你需要在 Spring Boot 项目标 pom.xml 中添加 Kafka 干系的依赖:
  1. <dependencies>
  2.    
  3.     <dependency>
  4.         <groupId>org.springframework.kafka</groupId>
  5.         <artifactId>spring-kafka</artifactId>
  6.     </dependency>
  7.    
  8.     <dependency>
  9.         <groupId>org.springframework.kafka</groupId>
  10.         <artifactId>spring-kafka-streams</artifactId>
  11.     </dependency>
  12. </dependencies>
复制代码
简朴的设置
  1. spring.kafka.producer.bootstrap-servers=localhost:9092
  2. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  3. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  4. spring.kafka.consumer.bootstrap-servers=localhost:9092
  5. spring.kafka.consumer.group-id=test1Group
  6. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  7. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
复制代码
2. 创建 Kafka 消息生产者

Kafka 消息生产者是向 Kafka 主题发送消息的组件。可以使用 KafkaTemplate 来发送消息。
KafkaTemplate 用来发送消息。send() 方法吸收两个参数:主题名和消息内容。
  1. @Autowired
  2. private KafkaTemplate<String, String> kafkaTemplate; // 消息是 key value的形式发送
  3. // 对于指定key的,kafka会根据key的哈希值把他们放到一个partition中
  4. // kafkaTemplate.send("test-topic", "kafkaKey1" "Kafka value");  
  5. // 没有指定key的,Kafka 会将消息发送到一个随机或轮询的分区,具体行为取决于我们使用的 Kafka 生产者配置
  6. kafkaTemplate.send("test-topic", "KafkaValue");
复制代码
3. 创建 Kafka 消息消费者

Kafka 消息消费者是从 Kafka 主题吸收消息的组件。可以使用 @KafkaListener 注解来创建消费者,
消费者会自动监听指定的主题
消费者类示例:
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class KafkaConsumer {
  5.         // @KafkaListener(topics = "test-topic", groupId = "test-group")
  6.     // public void listen(ConsumerRecord<String, String> record) {
  7.     //     System.out.println(record.key());
  8.     //     System.out.println(record.value());
  9.     // }
  10.     // 使用 @KafkaListener 注解标注消费的主题和消费者组
  11.     @KafkaListener(topics = "test-topic", groupId = "test-group")
  12.     public void listen(String message) {
  13.         System.out.println("Received message: " + message);
  14.     }
  15. }
复制代码
在 @KafkaListener 中,topics 指定消费的主题,groupId 指定消费者组。
4. @KafkaListener

@KafkaListener 是 Spring Kafka 中最常用的注解,用于定义 Kafka 消费者。它标志一个方法为 Kafka 消费者,并指示该方法监听一个或多个 Kafka 主题。
示例
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class KafkaConsumer {
  5.     @KafkaListener(topics = "test-topic", groupId = "test-group")
  6.     public void listen(String message) {
  7.         System.out.println("Received message: " + message);
  8.     }
  9. }
复制代码
5. @KafkaHandler

@KafkaHandler 用于处理复杂消息类型时,将不同的消息类型分配到不同的方法。
示例
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.kafka.annotation.KafkaHandler;
  3. import org.springframework.kafka.annotation.EnableKafka;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. @EnableKafka
  7. @KafkaListener(topics = "test-topic")
  8. public class KafkaConsumer {
  9.     @KafkaHandler
  10.     public void handleMessage(String message) {
  11.         System.out.println("Received string message: " + message);
  12.     }
  13.     @KafkaHandler
  14.     public void handleOtherMessage(Integer message) {
  15.         System.out.println("Received integer message: " + message);
  16.     }
  17. }
复制代码
6. @KafkaListeners

@KafkaListeners 是一个容器注解,它允许你在一个类上定义多个 @KafkaListener 注解。
示例
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.kafka.annotation.KafkaListeners;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. @KafkaListeners({
  6.     @KafkaListener(topics = "topic1", groupId = "group1"),
  7.     @KafkaListener(topics = "topic2", groupId = "group2")
  8. })
  9. public class KafkaConsumer {
  10.     @KafkaListener(topics = "topic1", groupId = "group1")
  11.     public void listenTopic1(String message) {
  12.         System.out.println("Received message from topic1: " + message);
  13.     }
  14.     @KafkaListener(topics = "topic2", groupId = "group2")
  15.     public void listenTopic2(String message) {
  16.         System.out.println("Received message from topic2: " + message);
  17.     }
  18. }
复制代码
五、springboot中 kafka的设置

在 Spring Boot 中使用 Kafka,重要涉及设置 Kafka 的生产者(Producer)和消费者(Consumer)。
1. Kafka 生产者设置(Producer Configuration)

生产者用于将消息发送到 Kafka 主题。常用的生产者设置项如下:
设置示例(application.properties):
  1. # Kafka 生产者配置
  2. spring.kafka.producer.bootstrap-servers=localhost:9092  # Kafka 集群地址 多个地址用 , 隔开
  3. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer  # 默认的键的序列化器
  4. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer  # 默认的值的序列化器
  5. spring.kafka.producer.acks=all  # 确认方式
  6. spring.kafka.producer.retries=3  # 生产者发送失败时的重试次数
  7. spring.kafka.producer.batch-size=16384  # 批量发送的最大消息数(单位:字节)16k
  8. spring.kafka.producer.linger-ms=1  # 生产者等待更多消息的时间(单位:毫秒)
  9. spring.kafka.producer.buffer-memory=33554432  # 内存缓冲区的大小(单位:字节)
复制代码
设置项详细讲解:

2. Kafka 消费者设置(Consumer Configuration)

消费者用于从 Kafka 中消费消息。常用的消费者设置项如下:
设置示例(application.properties):
  1. # Kafka 消费者配置
  2. spring.kafka.consumer.bootstrap-servers=localhost:9092  # Kafka 集群地址
  3. spring.kafka.consumer.group-id=my-group1  # 默认的消费者组,如果@KafkaListener注解没有写消费者组就会使用这个消费者组
  4. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 默认的键的反序列化器
  5. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 默认的值的反序列化器
  6. spring.kafka.consumer.auto-offset-reset=earliest  # 消费者如何处理未提交的消息
  7. spring.kafka.consumer.enable-auto-commit=true  # 是否自动提交消费偏移量
  8. spring.kafka.consumer.fetch-min-bytes=50000  # 从 Kafka 拉取消息的最小字节数
  9. spring.kafka.consumer.max-poll-records=500  # 每次拉取的最大消息数量
  10. spring.kafka.consumer.session-timeout=15000  # 会话超时时间(单位:毫秒)
复制代码
设置项详细讲解:

3. Kafka 高级设置

设置示例(application.properties):
  1. # Kafka 生产者高级配置
  2. spring.kafka.producer.compression-type=gzip  # 压缩类型(支持 gzip, snappy, lz4, zstd)
  3. spring.kafka.producer.max-request-size=1048576  # 请求的最大字节数
  4. # Kafka 消费者高级配置
  5. spring.kafka.consumer.isolation-level=read_committed  # 读取事务提交的消息
  6. spring.kafka.consumer.max-poll-interval=300000  # 最大轮询间隔(单位:毫秒)
  7. spring.kafka.consumer.fetch-max-wait=500  # 最大拉取等待时间(单位:毫秒)
复制代码
解释:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4