媒介
Kafka的根本工作原理
我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(署理),这样就可以大抵描画出这样一个局面:
生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操纵。
1.引入spring-kafka的jar包
在pom.xml里面导入spring-kafka包
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.4</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.example</groupId>
- <artifactId>SpringBootKafka</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>SpringBootKafka</name>
- <description>SpringBootKafka</description>
- <properties>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- pom.xml -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- </dependency>
- </dependencies>
- <repositories>
- <repository>
- <id>central</id>
- <name>aliyun maven</name>
- <url>https://maven.aliyun.com/repository/public/</url>
- <layout>default</layout>
- <!-- 是否开启发布版构件下载 -->
- <releases>
- <enabled>true</enabled>
- </releases>
- <!-- 是否开启快照版构件下载 -->
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- </repositories>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码 2.编写配置文件
在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者
- spring:
- kafka:
- bootstrap-servers: 192.168.110.105:9092
- #streams:
- #application-id: my-streams-app
- consumer:
- group-id: myGroupId
- auto-offset-reset: latest
- enable-auto-commit: true
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- retries: 5
复制代码 3.编写生产者
使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果
- package com.example.springbootkafka.service;
- import com.example.springbootkafka.entity.User;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.stereotype.Service;
- import org.springframework.util.concurrent.ListenableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.Future;
- @Slf4j
- @Service
- public class KafkaProducer {
- private final KafkaTemplate<String, String> kafkaTemplate;
- private final ObjectMapper objectMapper;
- @Autowired
- public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
- this.kafkaTemplate = kafkaTemplate;
- this.objectMapper = objectMapper;
- }
- public void sendMessage(String message) {
- log.info("KafkaProducer message:{}", message);
- //kafkaTemplate.send("test", message).addCallback();
- Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);
- CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
- try {
- future.get(); // 等待原始future完成
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- // 使用whenComplete方法
- completableFuture.whenComplete((result, ex) -> {
- if (ex != null) {
- System.out.println("Error occurred: " + ex.getMessage());
- // 成功发送
- } else {
- System.out.println("Completed successfully");
- }
- });
- /*future.whenComplete((result, ex) -> {
- if (ex == null) {
- // 成功发送
- RecordMetadata metadata = result.getRecordMetadata();
- System.out.println("Message sent successfully with offset: " + metadata.offset());
- } else {
- // 发送失败
- System.err.println("Failed to send message due to: " + ex.getMessage());
- }
- });*/
- }
- public void sendUser(User user) throws JsonProcessingException {
- //final ProducerRecord<String, String> record = createRecord(data);
- //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);
- //ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);
- String userJson = objectMapper.writeValueAsString(user);
- ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);
- /*future.addCallback(
- success -> System.out.println("Message sent successfully: " + userJson),
- failure -> System.err.println("Failed to send message: " + failure.getMessage())
- );*/
- CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
- try {
- future.get(); // 等待原始future完成
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- completableFuture.whenComplete((result, ex) -> {
- if (ex != null) {
- System.out.println("Error occurred: " + ex.getMessage());
- // 成功发送
- } else {
- System.out.println("Completed successfully");
- }
- });
- }
- }
复制代码 4.编写消费者
通过org.springframework.kafka.annotation.KafkaListener来监听消息
- package com.example.springbootkafka.service;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.log4j.Logger;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Service;
- @Slf4j
- @Service
- public class KafkaConsumer {
- @KafkaListener(topics = "test", groupId = "myGroupId")
- public void consume(String message) {
- System.out.println("Received message: " + message);
- log.info("KafkaConsumer message:{}", message);
- }
- }
复制代码 5.测试消息的生成与发送
- package com.example.springbootkafka.controller;
- import com.example.springbootkafka.entity.User;
- import com.example.springbootkafka.service.KafkaProducer;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- @Slf4j
- @RestController
- public class MessageController {
- private final KafkaProducer producer;
- @Autowired
- public MessageController(KafkaProducer producer) {
- this.producer = producer;
- }
- @GetMapping("/send-message")
- public String sendMessage() {
- log.info("MessageController sendMessage start!");
- producer.sendMessage("hello, Kafka!");
- log.info("MessageController sendMessage end!");
- return "Message sent successfully.";
- }
- @GetMapping("/send")
- public String sendMessage1() {
- log.info("MessageController sendMessage1 start!");
- User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();
- try {
- producer.sendUser(user);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- log.info("MessageController sendMessage1 end!");
- return "Message sendMessage1 successfully.";
- }
- }
复制代码 6.检察结果:
具体代码见https://gitee.com/dylan_2017/springboot-kafka.git
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |