RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式) ...

打印 上一主题 下一主题

主题 881|帖子 881|积分 2643

上文着重先容RabbitMQ 七种工作模式先容RabbitMQ 七种工作模式先容_rabbitmq 工作模式-CSDN博客
本篇教学如安在Spring环境下进⾏RabbitMQ的开发.(只演⽰部分常⽤的⼯作模式)
目次
引⼊依赖 
一.工作队列模式
二.Publish/Subscribe(发布订阅模式)
三.Routing(路由模式)
四.Topics(通配符模式)

引⼊依赖 

pom.xml 可以导入依赖
  1. <!--Spring MVC相关依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <!--RabbitMQ相关依赖-->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-amqp</artifactId>
  15. </dependency>
复制代码
大概创建项目时间勾选相应的选项

进入项目第一步先进行分类 三层架构 
进行配置相关rabbitmq属性 


一.工作队列模式



生产者:
  1. @RestController
  2. @RequestMapping("/produce")
  3. public class ProducerController {
  4.     @Autowired
  5.     private RabbitTemplate rabbitTemplate;
  6.     @RequestMapping("/work")
  7.     public String work() {
  8.         rabbitTemplate.convertAndSend("", Constans.WORK_QUEUE,"hello spring amqp:work...");
  9.         return "发送成功";
  10.     }
  11. }
复制代码
  convertAndSendRabbitTemplate类提供的一个重要方法,用于将消息发送到 RabbitMQ 的指定队列中 。
  

  • 第一个参数"":在这里通常表示交换机(Exchange)的名称为空字符串。
  • 第二个参数Constans.WORK_QUEUE
  • 第三个参数"hello spring amqp:work...":这就是要发送的现实消息内容
  

    通过网页进行测试是否发送成功
  


   从rabbitmq上可以看出已经发送成功到队列,等候消耗者进行消耗 
   消耗者:
  1. @Component
  2. public class WorkListener {
  3.     @RabbitListener(queues = Constants.WORK_QUEUE)
  4.     public void queueListener1(Message message) {
  5.         System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
  6.     }
  7.     @RabbitListener(queues = Constants.WORK_QUEUE)
  8.     public void queueListener2(Message message) {
  9.         System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
  10.     }
  11. }
复制代码
  @RabbitListener 是Spring框架中⽤于监听RabbitMQ队列的注解,通过使⽤这个注解,可以定义⼀个⽅法,以便从RabbitMQ队列中接收消息.该注解⽀持多种参数类型,这些参数类型代表了从RabbitMQ接收到的消息和相关信息.
  

  1. @Component
  2. public class WorkListener {
  3.     @RabbitListener(queues = Constants.WORK_QUEUE)
  4.     public void queueListener1(String message) {
  5.         System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
  6.     }
  7.     @RabbitListener(queues = Constants.WORK_QUEUE)
  8.     public void queueListener2(String message) {
  9.         System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
  10.     }
  11. }
复制代码

   1. String 返回消息的内容
2. Message ( org.springframework.amqp.core.Message ):SpringAMQP的
Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等.
  
 二.Publish/Subscribe(发布订阅模式)


声明队列,交换机,绑定队列和交换机
  1.     //发布订阅模式
  2.     public static final String FANOUT_QUEUE1 = "fanout.queue1";
  3.     public static final String FANOUT_QUEUE2 = "fanout.queue2";
  4.     public static final String FANOUT_EXCHANGE = "fanout.exchange";
复制代码
  1.     //发布订阅模式
  2.     @Bean("fanoutQueue1")
  3.     public Queue fanoutQueue1() {
  4.         return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
  5.     }
  6.     @Bean("fanoutQueue2")
  7.     public Queue fanoutQueue2() {
  8.         return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
  9.     }
  10.     @Bean("fanoutExchange")
  11.     public FanoutExchange fanoutExchange() {
  12.         return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
  13.     }
  14.     @Bean("fanoutQueueBinding1")
  15.     public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){
  16.         return BindingBuilder.bind(queue).to(fanoutExchange);
  17.     }
  18.     @Bean("fanoutQueueBinding2")
  19.     public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){
  20.         return BindingBuilder.bind(queue).to(fanoutExchange);
  21.     }
复制代码


生产者:
  1.     @RequestMapping("/fanout")
  2.     public String fanout(){
  3.         rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout...");
  4.         return "发送成功";
  5.     }
复制代码
消耗者:
  1. @Component
  2. public class FanoutListener {
  3.     @RabbitListener(queues = Constants.FANOUT_QUEUE1)
  4.     public void queueListener1(String message) {
  5.         System.out.println("listener 1 ["+Constants.FANOUT_QUEUE1+"] 接收到消息:" +message);
  6.     }
  7.     @RabbitListener(queues = Constants.FANOUT_QUEUE2)
  8.     public void queueListener2(String message) {
  9.         System.out.println("listener 2 ["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message);
  10.     }
  11. }
复制代码


三.Routing(路由模式)

 

声明队列,交换机,绑定队列和交换机 
  1.     //路由模式
  2.     public static final String DIRECT_QUEUE1 = "direct.queue1";
  3.     public static final String DIRECT_QUEUE2 = "direct.queue2";
  4.     public static final String DIRECT_EXCHANGE = "direct.exchange";
复制代码
  1.     //路由模式
  2.     @Bean("directQueue1")
  3.     public Queue directQueue1() {
  4.         return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
  5.     }
  6.     @Bean("directQueue2")
  7.     public Queue directQueue2() {
  8.         return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
  9.     }
  10.     @Bean("directExchange")
  11.     public DirectExchange directExchange() {
  12.         return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
  13.     }
  14.     @Bean("directQueueBinding1")
  15.     public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
  16.         return BindingBuilder.bind(queue).to(directExchange).with("orange");
  17.     }
  18.     @Bean("directQueueBinding2")
  19.     public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
  20.         return BindingBuilder.bind(queue).to(directExchange).with("black");
  21.     }
  22.     @Bean("directQueueBinding3")
  23.     public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
  24.         return BindingBuilder.bind(queue).to(directExchange).with("orange");
  25.     }
复制代码
 


生产者:
  1.     @RequestMapping("/direct/{rountingKey}")
  2.     public String direct(@PathVariable("routingKey") String rountingKey){
  3.         rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"", "hello spring amqp:direct, my routing key is "+rountingKey);
  4.         return "发送成功";
  5.     }
复制代码
  @PathVariable :用于从哀求的 URL 路径中提取参数值。
  
  

  • 当有一个哀求访问/direct/后面跟着某个具体的值(例如/direct/key1)时,@PathVariable("routingKey") String rountingKey会将key1提取出来,并赋值给rountingKey变量。
  



消耗者:
  1. @Component
  2. public class DirectListener {
  3.     @RabbitListener(queues = Constants.DIRECT_QUEUE1)
  4.     public void queueListener1(String message) {
  5.         System.out.println("listener 1 ["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message);
  6.     }
  7.     @RabbitListener(queues = Constants.DIRECT_QUEUE2)
  8.     public void queueListener2(String message) {
  9.         System.out.println("listener 2 ["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message);
  10.     }
  11. }
复制代码


四.Topics(通配符模式)

 

   . 代表一个单词
  # 代码多个单词 
  1.     //通配符模式
  2.     public static final String TOPIC_QUEUE1 = "topics_queue1";
  3.     public static final String TOPIC_QUEUE2 = "topics_queue2";
  4.     public static final String TOPIC_EXCHANGE = "topics_exchange";
复制代码
  1.     //通配符模式
  2.     @Bean("topicQueue1")
  3.     public Queue topicQueue1(){
  4.         return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
  5.     }
  6.     @Bean("topicQueue2")
  7.     public Queue topicQueue2(){
  8.         return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
  9.     }
  10.     @Bean("topicExchange")
  11.     public TopicExchange topicExchange(){
  12.         return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
  13.     }
  14.     @Bean("topicQueueBinding1")
  15.     public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
  16.         return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
  17.     }
  18.     @Bean("topicQueueBinding2")
  19.     public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
  20.         return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
  21.     }
  22.     @Bean("topicQueueBinding3")
  23.     public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
  24.         return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
  25.     }
复制代码



生产者
  1.     @RequestMapping("/topic/{routingKey}")
  2.     public String topic(@PathVariable("routingKey") String routingKey){
  3.         rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is "+routingKey);
  4.         return "发送成功";
  5.     }
复制代码




消耗者
  1. @Component
  2. public class TopicListener {
  3.     @RabbitListener(queues = Constants.TOPIC_QUEUE1)
  4.     public void queueListener1(String message) {
  5.         System.out.println("listener 1 ["+Constants.TOPIC_QUEUE1+"] 接收到消息:" +message);
  6.     }
  7.     @RabbitListener(queues = Constants.TOPIC_QUEUE2)
  8.     public void queueListener2(String message) {
  9.         System.out.println("listener 2 ["+Constants.TOPIC_QUEUE2+"] 接收到消息:" +message);
  10.     }
  11. }
复制代码



结语: 写博客不仅仅是为了分享学习履历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,担当大家的批评,让我改进。同时也盼望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南飓风

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表