Spring Cloud Stream 设置及使用(以 RabbitMQ 为例)

打印 上一主题 下一主题

主题 1952|帖子 1952|积分 5856

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Spring Cloud Stream 设置及使用(以 RabbitMQ 为例)

Spring Cloud Stream 是 Spring 提供的消息驱动微服务框架,它将消息中间件(如 RabbitMQ、Kafka)与 Spring Boot 联合,简化了消息的生产和消费过程。
本文将先容怎样使用 Spring Cloud Stream + RabbitMQ 进行消息的发布和消费。

1. 引入依赖

首先,在 pom.xml 中引入 spring-cloud-stream 和 spring-cloud-starter-stream-rabbit 依赖:
  1. <dependencies>
  2.     <!-- Spring Cloud Stream -->
  3.     <dependency>
  4.         <groupId>org.springframework.cloud</groupId>
  5.         <artifactId>spring-cloud-stream</artifactId>
  6.     </dependency>
  7.    
  8.     <!-- RabbitMQ Binder -->
  9.     <dependency>
  10.         <groupId>org.springframework.cloud</groupId>
  11.         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  12.     </dependency>
  13. </dependencies>
复制代码

2. 设置 RabbitMQ

Spring Cloud Stream 需要毗连 RabbitMQ,可以在 application.yml 进行设置:
  1. spring:
  2.   cloud:
  3.     stream:
  4.       function:
  5.         definition: myProducer;myConsumer
  6.       bindings:
  7.         myProducer-out-0: # 生产者绑定的通道
  8.           destination: my-exchange  # 绑定的 RabbitMQ 交换机(默认 Direct)
  9.         myConsumer-in-0:  # 消费者绑定的通道
  10.           destination: my-exchange
  11.           group: my-group  # 使消费者持久化(相当于队列)
  12.   rabbitmq:
  13.     host: localhost
  14.     port: 5672
  15.     username: guest
  16.     password: guest
复制代码


  • destination: my-exchange:绑定 RabbitMQ 互换机(RabbitMQ 会自动创建)。
  • group: my-group:消费者分组,相称于 RabbitMQ 队列名称
  • myProducer-out-0 和 myConsumer-in-0 代表 Spring Cloud Stream 输入输出绑定的通道

3. 生产者(发送消息)

在 Spring Cloud Stream 3.x 版本中,推荐使用 函数式编程 的方式来界说消息生产者:
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.messaging.Message;
  3. import org.springframework.messaging.support.MessageBuilder;
  4. import org.springframework.stereotype.Service;
  5. import reactor.core.publisher.Flux;
  6. import java.util.function.Supplier;
  7. @Service
  8. public class MessageProducer {
  9.     @Bean
  10.     public Supplier<Message<String>> myProducer() {
  11.         return () -> {
  12.             String message = "Hello, Spring Cloud Stream!";
  13.             System.out.println("发送消息:" + message);
  14.             return MessageBuilder.withPayload(message).build();
  15.         };
  16.     }
  17. }
复制代码


  • Supplier<Message<String>>:界说一个 消息生产者,每次调用时发送一条消息。
  • MessageBuilder.withPayload(message).build();:构造一个 Spring 消息 并发送。
Spring Cloud Stream 会自动将 myProducer 绑定到 myProducer-out-0,并通过 RabbitMQ 发送到 my-exchange 互换机。

4. 消费者(汲取消息)

同样,使用 函数式编程 界说消息消费者:
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.messaging.Message;
  3. import org.springframework.stereotype.Service;
  4. import java.util.function.Consumer;
  5. @Service
  6. public class MessageConsumer {
  7.     @Bean
  8.     public Consumer<Message<String>> myConsumer() {
  9.         return message -> {
  10.             System.out.println("接收到消息:" + message.getPayload());
  11.         };
  12.     }
  13. }
复制代码


  • Consumer<Message<String>>:界说一个 消息消费者,自动消费 my-exchange 互换机的消息。
  • message.getPayload():获取消息内容并处置惩罚。
Spring Cloud Stream 会自动绑定 myConsumer 到 myConsumer-in-0,并监听 my-exchange。

5. 运行 & 测试


  • 启动 RabbitMQ(假如本地没有 RabbitMQ,可使用 Docker 启动):
    1. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
    复制代码

    • 管理界面:打开浏览器访问 http://localhost:15672/,默认用户名/暗码为 guest/guest。
    • 确保 互换机 my-exchange 和队列 my-group 已经被创建。

  • 启动 Spring Boot 项目,会看到:
    1. 发送消息:Hello, Spring Cloud Stream!
    2. 接收到消息:Hello, Spring Cloud Stream!
    复制代码
  • RabbitMQ Web 管理界面

    • “Exchanges” 选项卡,找到 my-exchange。
    • “Queues” 选项卡,找到 my-group,并查看消息情况。


6. 进阶用法

(1)手动确认消息

默认情况下,Spring Cloud Stream 自动确认 RabbitMQ 消息。假如要 手动确认,可以使用 Acknowledgement:
  1. import org.springframework.amqp.support.AmqpHeaders;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.messaging.Message;
  4. import org.springframework.messaging.MessageHeaders;
  5. import org.springframework.stereotype.Service;
  6. import java.util.function.Consumer;
  7. @Service
  8. public class ManualAckConsumer {
  9.     @Bean
  10.     public Consumer<Message<String>> myConsumer() {
  11.         return message -> {
  12.             MessageHeaders headers = message.getHeaders();
  13.             System.out.println("接收到消息:" + message.getPayload());
  14.             // 手动确认
  15.             org.springframework.amqp.rabbit.support.Acknowledgment acknowledgment =
  16.                     headers.get(AmqpHeaders.ACKNOWLEDGMENT, org.springframework.amqp.rabbit.support.Acknowledgment.class);
  17.             if (acknowledgment != null) {
  18.                 acknowledgment.acknowledge();
  19.                 System.out.println("消息已确认!");
  20.             }
  21.         };
  22.     }
  23. }
复制代码
(2)多消费者负载均衡

假如有多个 myConsumer 消费者实例,默认是 广播模式(每个实例都消费一次)。假如要让多个实例 负载均衡 处置惩罚消息:
  1. spring:
  2.   cloud:
  3.     stream:
  4.       bindings:
  5.         myConsumer-in-0:
  6.           destination: my-exchange
  7.           group: my-group  # 组内消费者负载均衡
复制代码
多个 myConsumer 共享 雷同的 group,消息会在多个实例间 轮询分发
(3)死信队列(DLQ)

RabbitMQ 默认会丢弃消费失败的消息。假如要生存未消费的消息:
  1. spring:
  2.   cloud:
  3.     stream:
  4.       rabbit:
  5.         bindings:
  6.           myConsumer-in-0:
  7.             consumer:
  8.               deadLetterQueueName: my-dlq
  9.               republishToDlq: true
复制代码
假如消费失败,消息会进入 my-dlq 队列。

7. 总结

Spring Cloud Stream 让我们可以 解耦 生产者和消费者,并专注于业务逻辑,而不需要管理 RabbitMQ 毗连、互换机、队列等。


  • 生产者 通过 Supplier 发送消息到 RabbitMQ。
  • 消费者 通过 Consumer 监听 RabbitMQ 并处置惩罚消息。
  • 自动绑定 destination 到 Exchange,简化 RabbitMQ 设置。
  • 支持手动确认、负载均衡、死信队列等高级特性

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

笑看天下无敌手

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表