花瓣小跑 发表于 2024-11-29 17:48:59

Kafka 下载安装及使用总结

1. 下载安装

官网下载地址:Apache Kafka
下载对应的文件
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=http%3A%2F%2Fimg.fan223.cn%2F2024%2F05%2F20240513094645.png&pos_id=img-RlAoA1Ig-1726626664694
上传到服务器上,解压
tar -xzf kafka_2.13-3.7.0.tgz
目次结果如下
├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
└── site-docs
官方文档:Apache Kafka
kafka 有两种启动方式,ZooKeeper 和 KRaft,这里接纳 KRaft 的方式,使用 kraft 目次下的设置文件
config
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-mirror-maker.properties
├── connect-standalone.properties
├── consumer.properties
├── kraft
│   ├── broker.properties
│   ├── controller.properties
│   └── server.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
├── trogdor.conf
└── zookeeper.properties
修改 server.properties 文件
   

[*]process.roles:KRaft 模式角色

[*]broker
[*]controller
[*]broker,controller

[*]node.id:节点 ID,需要为每个节点分配一个唯一的 ID
[*]controller.quorum.voters:Controller 的投票者设置
[*]log.dirs:日记目次
初始化集群,先天生一个 UUID
./bin/kafka-storage.sh random-uuid
再实验下令,使用天生的 UUID 完成集群初始化
./bin/kafka-storage.sh format -t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties
然后启动 Kafka
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
2. Spring Boot 集成

2.1 引入依赖

<dependencies>
      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.1.4</version>
            <exclusions>
                <exclusion>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
      </dependency>

      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.0</version>
      </dependency>
</dependencies>
2.2 设置文件

spring:
kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: fable-group
2.3 生产者

使用 KafkaTemplate 实现消息的生产
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public ProducerController(KafkaTemplate<String, String> kafkaTemplate) {
      this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping("/send/message")
    public String sendMessage() {
      kafkaTemplate.send("test", "Hello, Kafka!");
      return "Message sent successfully";
    }
}
2.4 消费者

添加 @KafkaListener 注解,监听消息
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class ConsumerService {

    @KafkaListener(topics = "test", groupId = "fable-group")
    public void listen(ConsumerRecord<String, String> consumerRecord) {
      System.out.println("Received message: " + consumerRecord.value());
    }
}
2.5 启动类

添加 @EnableKafka 注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
@EnableKafka
public class FableApplication {
    public static void main(String[] args) {
      SpringApplication.run(FableApplication.class, args);
    }
}
2.6 测试

访问 /send/message 接口,可以看到控制台打印出接收到的消息
Received message: Hello, Kafka!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka 下载安装及使用总结