1. 下载安装
官网下载地址:Apache Kafka
下载对应的文件
上传到服务器上,解压
- tar -xzf kafka_2.13-3.7.0.tgz
复制代码 目次结果如下
- ├── bin
- │ └── windows
- ├── config
- │ └── kraft
- ├── libs
- ├── licenses
- └── site-docs
复制代码 官方文档:Apache Kafka
kafka 有两种启动方式,ZooKeeper 和 KRaft,这里接纳 KRaft 的方式,使用 kraft 目次下的设置文件
- config
- ├── connect-console-sink.properties
- ├── connect-console-source.properties
- ├── connect-distributed.properties
- ├── connect-file-sink.properties
- ├── connect-file-source.properties
- ├── connect-log4j.properties
- ├── connect-mirror-maker.properties
- ├── connect-standalone.properties
- ├── consumer.properties
- ├── kraft
- │ ├── broker.properties
- │ ├── controller.properties
- │ └── server.properties
- ├── log4j.properties
- ├── producer.properties
- ├── server.properties
- ├── tools-log4j.properties
- ├── trogdor.conf
- └── zookeeper.properties
复制代码 修改 server.properties 文件
- process.roles:KRaft 模式角色
- broker
- controller
- broker,controller
- node.id:节点 ID,需要为每个节点分配一个唯一的 ID
- controller.quorum.voters:Controller 的投票者设置
- log.dirs:日记目次
初始化集群,先天生一个 UUID
- ./bin/kafka-storage.sh random-uuid
复制代码 再实验下令,使用天生的 UUID 完成集群初始化
- ./bin/kafka-storage.sh format -t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties
复制代码 然后启动 Kafka
- ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
复制代码 2. Spring Boot 集成
2.1 引入依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>3.1.4</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.7.0</version>
- </dependency>
- </dependencies>
复制代码 2.2 设置文件
- spring:
- kafka:
- bootstrap-servers: 127.0.0.1:9092
- consumer:
- group-id: fable-group
复制代码 2.3 生产者
使用 KafkaTemplate 实现消息的生产
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RestController
- public class ProducerController {
- private final KafkaTemplate<String, String> kafkaTemplate;
- @Autowired
- public ProducerController(KafkaTemplate<String, String> kafkaTemplate) {
- this.kafkaTemplate = kafkaTemplate;
- }
- @GetMapping("/send/message")
- public String sendMessage() {
- kafkaTemplate.send("test", "Hello, Kafka!");
- return "Message sent successfully";
- }
- }
复制代码 2.4 消费者
添加 @KafkaListener 注解,监听消息
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Service;
- @Service
- public class ConsumerService {
- @KafkaListener(topics = "test", groupId = "fable-group")
- public void listen(ConsumerRecord<String, String> consumerRecord) {
- System.out.println("Received message: " + consumerRecord.value());
- }
- }
复制代码 2.5 启动类
添加 @EnableKafka 注解
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.kafka.annotation.EnableKafka;
- @SpringBootApplication
- @EnableKafka
- public class FableApplication {
- public static void main(String[] args) {
- SpringApplication.run(FableApplication.class, args);
- }
- }
复制代码 2.6 测试
访问 /send/message 接口,可以看到控制台打印出接收到的消息
- Received message: Hello, Kafka!
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |