Spring Boot 项目中集成 Kafka-03

打印 上一主题 下一主题

主题 978|帖子 978|积分 2934

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
在 Spring Boot 项目中集成 Kafka 有多种方式,顺应不同的应用场景和需求。以下将详细先容几种常用的集成方法,包罗:

  • 使用 Spring Kafka (KafkaTemplate 和 @KafkaListener)
  • 使用 Spring Cloud Stream 与 Kafka Binder
  • 使用 Spring for Apache Kafka Reactive(基于 Reactor)
  • 手动配置 Producer 和 Consumer Bean
  • 使用 Spring Integration Kafka
  • 在测试中使用嵌入式 Kafka
每种方法都有其特点和适用场景,选择合适的方法可以或许有效提拔开发效率和应用性能。

1. 使用 Spring Kafka (KafkaTemplate 和 @KafkaListener)

这是最常用的 Spring Boot 集成 Kafka 的方式,依赖于 Spring for Apache Kafka 项目,提供了 KafkaTemplate 用于发送消息,以及 @KafkaListener 注解用于接收消息。
步骤一:添加 Maven 依赖

在 pom.xml 中引入 spring-kafka 依赖:
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4. </dependency>
复制代码
步骤二:配置 application.properties 或 application.yml

示例 (application.properties):
  1. # Kafka 集群地址
  2. spring.kafka.bootstrap-servers=worker1:9092,worker2:9092,worker3:9092
  3. # 生产者配置
  4. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  5. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  6. spring.kafka.producer.acks=1
  7. spring.kafka.producer.retries=3
  8. spring.kafka.producer.batch-size=16384
  9. spring.kafka.producer.linger.ms=1
  10. # 消费者配置
  11. spring.kafka.consumer.group-id=myGroup
  12. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  13. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  14. spring.kafka.consumer.auto-offset-reset=earliest
  15. spring.kafka.consumer.enable-auto-commit=true
复制代码
步骤三:编写消息生产者

使用 KafkaTemplate 发送消息:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class ProducerService {
  6.     @Autowired
  7.     private KafkaTemplate<String, String> kafkaTemplate;
  8.     private static final String TOPIC = "topic1";
  9.     public void sendMessage(String message) {
  10.         kafkaTemplate.send(TOPIC, message);
  11.     }
  12. }
复制代码
步骤四:编写消息消费者

使用 @KafkaListener 接收消息:
  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class ConsumerService {
  6.     @KafkaListener(topics = "topic1", groupId = "myGroup")
  7.     public void listen(ConsumerRecord<?, ?> record) {
  8.         System.out.println("Received message: " + record.value());
  9.     }
  10. }
复制代码
优缺点



  • 长处

    • 简单易用,快速上手。
    • 与 Spring 生态体系无缝集成。
    • 支持事务、幂等性等高级特性。

  • 缺点

    • 适用于传统的阻塞式应用,若需要响应式编程则不够友好。


2. 使用 Spring Cloud Stream 与 Kafka Binder

Spring Cloud Stream 是一个构建消息驱动微服务的框架,通过 Binder(绑定器)与不同的消息中间件集成。使用 Kafka Binder,可以更加简化 Kafka 与 Spring Boot 的集成。
步骤一:添加 Maven 依赖

在 pom.xml 中引入 spring-cloud-starter-stream-kafka 依赖:
  1. <dependency>
  2.     <groupId>org.springframework.cloud</groupId>
  3.     <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  4. </dependency>
复制代码
并确保引入 Spring Cloud 的 BOM 以管理版本:
  1. <dependencyManagement>
  2.     <dependencies>
  3.         <dependency>
  4.             <groupId>org.springframework.cloud</groupId>
  5.             <artifactId>spring-cloud-dependencies</artifactId>
  6.             <version>Hoxton.SR12</version>
  7.             <type>pom</type>
  8.             <scope>import</scope>
  9.         </dependency>
  10.     </dependencies>
  11. </dependencyManagement>
复制代码
步骤二:配置 application.yml

  1. spring:
  2.   cloud:
  3.     stream:
  4.       bindings:
  5.         output:
  6.           destination: topic1
  7.           contentType: application/json
  8.         input:
  9.           destination: topic1
  10.           group: myGroup
  11.       kafka:
  12.         binder:
  13.           brokers: worker1:9092,worker2:9092,worker3:9092
复制代码
步骤三:编写消息生产者

使用 @EnableBinding 和 Source 接口:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.cloud.stream.annotation.EnableBinding;
  3. import org.springframework.cloud.stream.messaging.Source;
  4. import org.springframework.messaging.support.MessageBuilder;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. @EnableBinding(Source.class)
  8. public class StreamProducerService {
  9.     @Autowired
  10.     private Source source;
  11.     public void sendMessage(String message) {
  12.         source.output().send(MessageBuilder.withPayload(message).build());
  13.     }
  14. }
复制代码
步骤四:编写消息消费者

使用 @StreamListener 注解:
  1. import org.springframework.cloud.stream.annotation.EnableBinding;
  2. import org.springframework.cloud.stream.annotation.StreamListener;
  3. import org.springframework.cloud.stream.messaging.Sink;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @EnableBinding(Sink.class)
  7. public class StreamConsumerService {
  8.     @StreamListener(Sink.INPUT)
  9.     public void handleMessage(String message) {
  10.         System.out.println("Received message: " + message);
  11.     }
  12. }
复制代码
优缺点



  • 长处

    • 高度抽象,淘汰配置与代码量。
    • 更得当微服务架构,支持绑定多个输入输出。
    • 支持多种消息中间件,易于切换。

  • 缺点

    • 抽象层较高,大概难以实现一些细粒度的自定义配置。
    • 学习曲线较陡,需明白 Binder 和 Channel 的概念。


3. 使用 Spring for Apache Kafka Reactive(基于 Reactor)

对于需要响应式编程的应用,可以使用基于 Reactor 的 Spring Kafka Reactive 举行集成,实现非阻塞的消息处理。
步骤一:添加 Maven 依赖

目前,Spring Kafka 本身并未直接提供响应式支持,但可以结合 Project Reactor Kafka 使用。
引入 Reactor Kafka 依赖:
  1. <dependency>
  2.     <groupId>io.projectreactor.kafka</groupId>
  3.     <artifactId>reactor-kafka</artifactId>
  4.     <version>1.3.11</version>
  5. </dependency>
复制代码
步骤二:配置 application.yml

  1. kafka:
  2.   bootstrap-servers: worker1:9092,worker2:9092,worker3:9092
  3.   producer:
  4.     key-serializer: org.apache.kafka.common.serialization.StringSerializer
  5.     value-serializer: org.apache.kafka.common.serialization.StringSerializer
  6.   consumer:
  7.     group-id: myReactiveGroup
  8.     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9.     value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  10.     auto-offset-reset: earliest
复制代码
步骤三:编写响应式消息生产者

使用 SenderOptions 和 KafkaSender:
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.stereotype.Service;
  6. import reactor.kafka.sender.KafkaSender;
  7. import reactor.kafka.sender.SenderOptions;
  8. import reactor.kafka.sender.SenderRecord;
  9. import reactor.core.publisher.Mono;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Service
  13. public class ReactiveProducerService {
  14.     @Value("${kafka.bootstrap-servers}")
  15.     private String bootstrapServers;
  16.     @Bean
  17.     public SenderOptions<String, String> senderOptions() {
  18.         Map<String, Object> props = new HashMap<>();
  19.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  20.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  21.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  22.         return SenderOptions.create(props);
  23.     }
  24.     @Bean
  25.     public KafkaSender<String, String> kafkaSender(SenderOptions<String, String> senderOptions) {
  26.         return KafkaSender.create(senderOptions);
  27.     }
  28.     public Mono<Void> sendMessage(String topic, String key, String value) {
  29.         SenderRecord<String, String, Integer> record = SenderRecord.create(
  30.                 new org.apache.kafka.clients.producer.ProducerRecord<>(topic, key, value),
  31.                 1
  32.         );
  33.         return kafkaSender(senderOptions())
  34.                 .send(Mono.just(record))
  35.                 .then();
  36.     }
  37. }
复制代码
步骤四:编写响应式消息消费者

使用 ReceiverOptions 和 KafkaReceiver:
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.stereotype.Service;
  6. import reactor.core.publisher.Flux;
  7. import reactor.kafka.receiver.KafkaReceiver;
  8. import reactor.kafka.receiver.ReceiverOptions;
  9. import reactor.kafka.receiver.ReceiverRecord;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Service
  13. public class ReactiveConsumerService {
  14.     @Value("${kafka.bootstrap-servers}")
  15.     private String bootstrapServers;
  16.     @Value("${kafka.consumer.group-id}")
  17.     private String groupId;
  18.     @Bean
  19.     public ReceiverOptions<String, String> receiverOptions() {
  20.         Map<String, Object> props = new HashMap<>();
  21.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  22.         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  23.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  24.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  25.         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  26.         ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);
  27.         return receiverOptions.subscription(java.util.Collections.singleton("topic1"));
  28.     }
  29.     @Bean
  30.     public Flux<ReceiverRecord<String, String>> kafkaFlux(ReceiverOptions<String, String> receiverOptions) {
  31.         return KafkaReceiver.create(receiverOptions).receive();
  32.     }
  33.     public void consumeMessages() {
  34.         kafkaFlux(receiverOptions())
  35.                 .doOnNext(record -> {
  36.                     System.out.println("Received: " + record.value());
  37.                     record.receiverOffset().acknowledge();
  38.                 })
  39.                 .subscribe();
  40.     }
  41. }
复制代码
优缺点



  • 长处

    • 支持响应式编程模子,适用于高并发和非阻塞场景。
    • 更高的资源使用率和吞吐量。

  • 缺点

    • 相较于传统阻塞式,开发复杂度更高。
    • 需要明白 Reactor 和响应式编程的基本概念。


4. 手动配置 Producer 和 Consumer Bean

对于需要更高自定义配置的应用,可以手动配置 ProducerFactory, ConsumerFactory, KafkaTemplate 和 ConcurrentKafkaListenerContainerFactory 等 Bean。
步骤一:添加 Maven 依赖

在 pom.xml 中引入 spring-kafka 依赖:
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4. </dependency>
复制代码
步骤二:编写 Kafka 配置类

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.annotation.EnableKafka;
  9. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  10. import org.springframework.kafka.core.*;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. @Configuration
  14. @EnableKafka
  15. public class KafkaManualConfig {
  16.     @Value("${kafka.bootstrap-servers}")
  17.     private String bootstrapServers;
  18.     @Value("${kafka.consumer.group-id}")
  19.     private String groupId;
  20.     // Producer 配置
  21.     @Bean
  22.     public ProducerFactory<String, String> producerFactory() {
  23.         Map<String, Object> configProps = new HashMap<>();
  24.         configProps.put(
  25.                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  26.                 bootstrapServers);
  27.         configProps.put(
  28.                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  29.                 StringSerializer.class);
  30.         configProps.put(
  31.                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  32.                 StringSerializer.class);
  33.         // 其他自定义配置
  34.         return new DefaultKafkaProducerFactory<>(configProps);
  35.     }
  36.     @Bean
  37.     public KafkaTemplate<String, String> kafkaTemplate() {
  38.         return new KafkaTemplate<>(producerFactory());
  39.     }
  40.     // Consumer 配置
  41.     @Bean
  42.     public ConsumerFactory<String, String> consumerFactory() {
  43.         Map<String, Object> props = new HashMap<>();
  44.         props.put(
  45.                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  46.                 bootstrapServers);
  47.         props.put(
  48.                 ConsumerConfig.GROUP_ID_CONFIG,
  49.                 groupId);
  50.         props.put(
  51.                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  52.                 StringDeserializer.class);
  53.         props.put(
  54.                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  55.                 StringDeserializer.class);
  56.         // 其他自定义配置
  57.         return new DefaultKafkaConsumerFactory<>(props);
  58.     }
  59.     // KafkaListenerContainerFactory
  60.     @Bean
  61.     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  62.         ConcurrentKafkaListenerContainerFactory<String, String> factory =
  63.                 new ConcurrentKafkaListenerContainerFactory<>();
  64.         factory.setConsumerFactory(consumerFactory());
  65.         // 其他自定义配置,如并发数、批量消费等
  66.         return factory;
  67.     }
  68. }
复制代码
步骤三:编写消息生产者和消费者

Producer 示例:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class ManualProducerService {
  6.     @Autowired
  7.     private KafkaTemplate<String, String> kafkaTemplate;
  8.     private static final String TOPIC = "topic1";
  9.     public void sendMessage(String message) {
  10.         kafkaTemplate.send(TOPIC, message);
  11.     }
  12. }
复制代码
Consumer 示例:
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class ManualConsumerService {
  5.     @KafkaListener(topics = "topic1", groupId = "myGroup")
  6.     public void listen(String message) {
  7.         System.out.println("Received message: " + message);
  8.     }
  9. }
复制代码
优缺点



  • 长处

    • 高度自定义,适用于复杂配置需求。
    • 可以机动配置多个 KafkaTemplate 或 KafkaListenerContainerFactory,顺应多种场景。

  • 缺点

    • 配置较为繁琐,代码量增加。
    • 需要深入明白 Spring Kafka 的配置与使用。


5. 使用 Spring Integration Kafka

Spring Integration 提供了对 Kafka 的集成支持,适用于需要集成多种消息渠道和复杂消息路由的应用。
步骤一:添加 Maven 依赖

在 pom.xml 中引入 spring-integration-kafka 依赖:
  1. <dependency>
  2.     <groupId>org.springframework.integration</groupId>
  3.     <artifactId>spring-integration-kafka</artifactId>
  4.     <version>3.3.5.RELEASE</version>
  5. </dependency>
复制代码
步骤二:编写 Kafka Integration 配置

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.integration.annotation.ServiceActivator;
  8. import org.springframework.integration.channel.DirectChannel;
  9. import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
  10. import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
  11. import org.springframework.kafka.core.*;
  12. import org.springframework.messaging.MessageChannel;
  13. import org.springframework.messaging.MessageHandler;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. @Configuration
  17. public class SpringIntegrationKafkaConfig {
  18.     @Bean
  19.     public ProducerFactory<String, String> producerFactory() {
  20.         Map<String, Object> props = new HashMap<>();
  21.         props.put(
  22.                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  23.                 "worker1:9092,worker2:9092,worker3:9092");
  24.         props.put(
  25.                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  26.                 StringSerializer.class);
  27.         props.put(
  28.                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  29.                 StringSerializer.class);
  30.         return new DefaultKafkaProducerFactory<>(props);
  31.     }
  32.     @Bean
  33.     public KafkaTemplate<String, String> kafkaTemplate() {
  34.         return new KafkaTemplate<>(producerFactory());
  35.     }
  36.     // 消费者工厂
  37.     @Bean
  38.     public ConsumerFactory<String, String> consumerFactory() {
  39.         Map<String, Object> props = new HashMap<>();
  40.         props.put(
  41.                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  42.                 "worker1:9092,worker2:9092,worker3:9092");
  43.         props.put(
  44.                 ConsumerConfig.GROUP_ID_CONFIG,
  45.                 "myGroup");
  46.         props.put(
  47.                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  48.                 StringDeserializer.class);
  49.         props.put(
  50.                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  51.                 StringDeserializer.class);
  52.         return new DefaultKafkaConsumerFactory<>(props);
  53.     }
  54.     // 输入通道
  55.     @Bean
  56.     public MessageChannel inputChannel() {
  57.         return new DirectChannel();
  58.     }
  59.     // 消费者适配器
  60.     @Bean
  61.     public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
  62.         KafkaMessageDrivenChannelAdapter<String, String> adapter =
  63.                 new KafkaMessageDrivenChannelAdapter<>(consumerFactory(), "topic1");
  64.         adapter.setOutputChannel(inputChannel());
  65.         return adapter;
  66.     }
  67.     // 消费者处理器
  68.     @Bean
  69.     @ServiceActivator(inputChannel = "inputChannel")
  70.     public MessageHandler messageHandler() {
  71.         return message -> {
  72.             String payload = (String) message.getPayload();
  73.             System.out.println("Received message: " + payload);
  74.         };
  75.     }
  76.     // 输出通道
  77.     @Bean
  78.     public MessageChannel outputChannel() {
  79.         return new DirectChannel();
  80.     }
  81.     // 生产者处理器
  82.     @Bean
  83.     @ServiceActivator(inputChannel = "outputChannel")
  84.     public MessageHandler producerMessageHandler(KafkaTemplate<String, String> kafkaTemplate) {
  85.         KafkaProducerMessageHandler<String, String> handler =
  86.                 new KafkaProducerMessageHandler<>(kafkaTemplate);
  87.         handler.setTopicExpressionString("'topic1'");
  88.         return handler;
  89.     }
  90. }
复制代码
步骤三:发送消息到输出通道

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.messaging.MessageChannel;
  3. import org.springframework.messaging.support.MessageBuilder;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class IntegrationProducerService {
  7.     @Autowired
  8.     private MessageChannel outputChannel;
  9.     public void sendMessage(String message) {
  10.         outputChannel.send(MessageBuilder.withPayload(message).build());
  11.     }
  12. }
复制代码
优缺点



  • 长处

    • 强盛的消息路由和转换功能,适用于复杂集成场景。
    • 可以与 Spring Integration 的其他模块无缝协作。

  • 缺点

    • 配置复杂,学习成本较高。
    • 对于简单的 Kafka 集成场景,大概显得过于痴肥。


6. 在测试中使用嵌入式 Kafka

在集成测试中,使用嵌入式 Kafka 可以避免依赖外部 Kafka 集群,提拔测试效率与稳固性。
步骤一:添加 Maven 依赖

在 pom.xml 中引入 spring-kafka-test 依赖:
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka-test</artifactId>
  4.     <scope>test</scope>
  5. </dependency>
复制代码
步骤二:编写测试类

使用 @EmbeddedKafka 注解启动嵌入式 Kafka:
  1. import org.apache.kafka.clients.consumer.Consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.junit.jupiter.api.AfterAll;
  5. import org.junit.jupiter.api.BeforeAll;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  10. import org.springframework.kafka.test.EmbeddedKafkaBroker;
  11. import org.springframework.kafka.test.context.EmbeddedKafka;
  12. import org.springframework.kafka.test.utils.KafkaTestUtils;
  13. import java.util.Map;
  14. @SpringBootTest
  15. @EmbeddedKafka(partitions = 1, topics = { "topic1" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
  16. public class KafkaIntegrationTest {
  17.     @Autowired
  18.     private EmbeddedKafkaBroker embeddedKafkaBroker;
  19.     private static Consumer<String, String> consumer;
  20.     @BeforeAll
  21.     public static void setUp(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) {
  22.         Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
  23.         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  24.         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  25.         DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
  26.         consumer = consumerFactory.createConsumer();
  27.         embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic1");
  28.     }
  29.     @AfterAll
  30.     public static void tearDown() {
  31.         if (consumer != null) {
  32.             consumer.close();
  33.         }
  34.     }
  35.     @Test
  36.     public void testSendAndReceive() {
  37.         // 发送消息
  38.         // 假设有一个 ProducerService 可以发送消息
  39.         // producerService.sendMessage("Hello, Kafka!");
  40.         // 接收消息
  41.         // Consumer Record 验证逻辑
  42.         // 可以使用 KafkaTestUtils 来接收消息并断言
  43.     }
  44. }
复制代码
优缺点



  • 长处

    • 不依赖外部 Kafka 集群,得当 CI/CD 情况。
    • 提拔测试的可重复性与稳固性。

  • 缺点

    • 嵌入式 Kafka 启动较慢,大概影响测试速率。
    • 需要额外配置,测试代码复杂度增加。


总结

在 Spring Boot 中集成 Kafka 有多种方式,每种方式适用于不同的应用场景和需求:

  • Spring Kafka (KafkaTemplate 和 @KafkaListener)
    适用于大多数常规应用,简单易用,与 Spring 生态体系无缝集成。
  • Spring Cloud Stream 与 Kafka Binder
    适用于微服务架构,需处理复杂消息路由与多中间件支持的场景。
  • Spring for Apache Kafka Reactive
    适用于需要响应式编程模子、高并发和非阻塞消息处理的应用。
  • 手动配置 Producer 和 Consumer Bean
    适用于需要高度自定义 Kafka 配置和行为的应用。
  • Spring Integration Kafka
    适用于复杂集成场景,需要与其他消息渠道和体系协作的应用。
  • 嵌入式 Kafka 在测试中使用
    适用于编写集成测试,提拔测试效率和稳固性。
根据项目的具体需求,选择最合适的集成方式可以或许有效提拔开发效率,确保应用的稳固性与可扩展性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

魏晓东

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表