RabbitMQ ③-Spring使用RabbitMQ

打印 上一主题 下一主题

主题 1937|帖子 1937|积分 5811


Spring使用RabbitMQ

创建 Spring 项目后,引入依赖:
  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
配置文件 application.yml:
  1. spring:
  2.   application:
  3.     name: spring-rabbitmq-demo
  4.   rabbitmq:
  5. #    host: 47.94.9.33
  6. #    port: 5672
  7. #    username: admin
  8. #    password: admin
  9. #    virtual-host: /
  10.     addresses: amqp://admin:admin@47.94.9.33:5672/
复制代码
Work-Queue(工作队列模式)

声明队列
  1. package com.ljh.mq.springrabbitmqdemo.config;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     // * 工作队列模式
  10.     @Bean
  11.     public Queue workQueue() {
  12.         return QueueBuilder.durable(Constants.WORK_QUEUE)
  13.                 .build();
  14.     }
  15. }
复制代码
生产者
  1. package com.ljh.mq.springrabbitmqdemo.controller;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. @RequestMapping("/producer")
  11. @RestController
  12. public class ProducerController {
  13.     private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
  14.     @Autowired
  15.     RabbitTemplate rabbitTemplate;
  16.     @RequestMapping("/work")
  17.     public String work() {
  18.         for (int i = 0; i < 10; i++) {
  19.             String msg = "hello work queue mode~ " + i;
  20.             // ? 当使用默认交换机时,routingKey 和队列名称保持一致
  21.             rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);
  22.         }
  23.         log.info("消息发送成功");
  24.         return "消息发送成功";
  25.     }
  26. }
复制代码
消耗者
  1. package com.ljh.mq.springrabbitmqdemo.listener;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import com.rabbitmq.client.Channel;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. public class WorkListener {
  11.     private static final Logger log = LoggerFactory.getLogger(WorkListener.class);
  12.     @RabbitListener(queues = Constants.WORK_QUEUE)
  13.     public void process1(Message message, Channel channel) {
  14.         log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
  15.     }
  16.     @RabbitListener(queues = Constants.WORK_QUEUE)
  17.     public void process2(String message) {
  18.         log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
  19.     }
  20. }
复制代码
Publish/Subscribe(发布/订阅模式)

声明队列和交换机
  1. package com.ljh.mq.springrabbitmqdemo.config;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     // * 发布订阅模式
  10.     @Bean("fanoutExchange")
  11.     public FanoutExchange fanoutExchange() {
  12.         return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE)
  13.                 .durable(true)
  14.                 .build();
  15.     }
  16.     @Bean("fanoutQueue1")
  17.     public Queue fanoutQueue1 () {
  18.         return QueueBuilder.durable(Constants.FANOUT_QUEUE1)
  19.                 .build();
  20.     }
  21.     @Bean("fanoutQueue2")
  22.     public Queue fanoutQueue2 () {
  23.         return QueueBuilder.durable(Constants.FANOUT_QUEUE2)
  24.                 .build();
  25.     }
  26.     @Bean("bindingFanout1")
  27.     public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {
  28.         return BindingBuilder.bind(queue)
  29.                 .to(exchange);
  30.     }
  31.     @Bean("bindingFanout2")
  32.     public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {
  33.         return BindingBuilder.bind(queue)
  34.                 .to(exchange);
  35.     }
  36. }
复制代码
生产者
  1. package com.ljh.mq.springrabbitmqdemo.controller;
  2. package com.ljh.mq.springrabbitmqdemo.controller;
  3. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. @RequestMapping("/producer")
  12. @RestController
  13. public class ProducerController {
  14.     private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
  15.     @Autowired
  16.     RabbitTemplate rabbitTemplate;
  17.     @RequestMapping("/fanout")
  18.     public String fanout() {
  19.         for (int i = 0; i < 10; i++) {
  20.             String msg = "hello publish fanout mode~ " + i;
  21.             // ? 当使用默认交换机时,routingKey 和队列名称保持一致
  22.             rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);
  23.         }
  24.         log.info("消息发送成功");
  25.         return "消息发送成功";
  26.     }
  27. }
复制代码
消耗者
  1. package com.ljh.mq.springrabbitmqdemo.listener;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import com.rabbitmq.client.Channel;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. public class FanoutListener {
  11.     private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);
  12.     @RabbitListener(queues = Constants.FANOUT_QUEUE1)
  13.     public void process1(String message) {
  14.         log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);
  15.     }
  16.     @RabbitListener(queues = Constants.FANOUT_QUEUE2)
  17.     public void process2(String message) {
  18.         log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);
  19.     }
  20. }
复制代码
Routing(路由模式)

声明队列和交换机
  1. package com.ljh.mq.springrabbitmqdemo.config;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     // * 路由模式
  10.     @Bean("directExchange")
  11.     public DirectExchange directExchange() {
  12.         return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE)
  13.                 .durable(true)
  14.                 .build();
  15.     }
  16.     @Bean("directQueue1")
  17.     public Queue directQueue1() {
  18.         return QueueBuilder.durable(Constants.DIRECT_QUEUE1)
  19.                 .build();
  20.     }
  21.     @Bean("directQueue2")
  22.     public Queue directQueue2() {
  23.         return QueueBuilder.durable(Constants.DIRECT_QUEUE2)
  24.                 .build();
  25.     }
  26.     @Bean("bindingDirect1")
  27.     public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {
  28.         return BindingBuilder.bind(queue)
  29.                 .to(exchange)
  30.                 .with("orange");
  31.     }
  32.     @Bean("bindingDirect2")
  33.     public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
  34.         return BindingBuilder.bind(queue)
  35.                 .to(exchange)
  36.                 .with("orange");
  37.     }
  38.     @Bean("bindingDirect3")
  39.     public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
  40.         return BindingBuilder.bind(queue)
  41.                 .to(exchange)
  42.                 .with("black");
  43.     }
  44. }
复制代码
生产者
  1. package com.ljh.mq.springrabbitmqdemo.controller;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. @RequestMapping("/producer")
  11. @RestController
  12. public class ProducerController {
  13.     private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
  14.     @Autowired
  15.     RabbitTemplate rabbitTemplate;
  16.     @RequestMapping("/direct/{routingKey}")
  17.     public String direct(@PathVariable("routingKey") String routingKey) {
  18.         rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);
  19.         log.info("消息发送成功:{}", routingKey);
  20.         return "消息发送成功:" + routingKey;
  21.     }
  22. }
复制代码
消耗者
  1. package com.ljh.mq.springrabbitmqdemo.listener;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class DirectListener {
  9.     private static final Logger log = LoggerFactory.getLogger(DirectListener.class);
  10.     @RabbitListener(queues = Constants.DIRECT_QUEUE1)
  11.     public void process1(String message) {
  12.         log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);
  13.     }
  14.     @RabbitListener(queues = Constants.DIRECT_QUEUE2)
  15.     public void process2(String message) {
  16.         log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);
  17.     }
  18. }
复制代码
Topics(通配符模式)

声明队列和交换机
  1. package com.ljh.mq.springrabbitmqdemo.config;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     // * 通配符模式
  10.     @Bean("topicExchange")
  11.     public TopicExchange topicExchange() {
  12.         return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE)
  13.                 .durable(true)
  14.                 .build();
  15.     }
  16.     @Bean("topicQueue1")
  17.     public Queue topicQueue1() {
  18.         return QueueBuilder.durable(Constants.TOPIC_QUEUE1)
  19.                 .build();
  20.     }
  21.     @Bean("topicQueue2")
  22.     public Queue topicQueue2() {
  23.         return QueueBuilder.durable(Constants.TOPIC_QUEUE2)
  24.                 .build();
  25.     }
  26.     @Bean("bindingTopic1")
  27.     public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {
  28.         return BindingBuilder.bind(queue)
  29.                 .to(exchange)
  30.                 .with("*.orange.*");
  31.     }
  32.     @Bean("bindingTopic2")
  33.     public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
  34.         return BindingBuilder.bind(queue)
  35.                 .to(exchange)
  36.                 .with("*.*.rabbit");
  37.     }
  38.     @Bean("bindingTopic3")
  39.     public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
  40.         return BindingBuilder.bind(queue)
  41.                 .to(exchange)
  42.                 .with("lazy.#");
  43.     }
  44. }
复制代码
生产者
  1. package com.ljh.mq.springrabbitmqdemo.controller;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. @RequestMapping("/producer")
  11. @RestController
  12. public class ProducerController {
  13.     private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
  14.     @Autowired
  15.     RabbitTemplate rabbitTemplate;
  16.     @RequestMapping("/topic/{routingKey}")
  17.     public String topic(@PathVariable("routingKey") String routingKey) {
  18.         rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);
  19.         log.info("消息发送成功:{}", routingKey);
  20.         return "消息发送成功:" + routingKey;
  21.     }
  22. }
复制代码
消耗者
  1. package com.ljh.mq.springrabbitmqdemo.listener;
  2. import com.ljh.mq.springrabbitmqdemo.constants.Constants;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class TopicListener {
  9.     private static final Logger log = LoggerFactory.getLogger(TopicListener.class);
  10.     @RabbitListener(queues = Constants.TOPIC_QUEUE1)
  11.     public void process1(String message) {
  12.         log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);
  13.     }
  14.     @RabbitListener(queues = Constants.TOPIC_QUEUE2)
  15.     public void process2(String message) {
  16.         log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);
  17.     }
  18. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表