Spring使用RabbitMQ
创建 Spring 项目后,引入依赖:
- <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 配置文件 application.yml:
- spring:
- application:
- name: spring-rabbitmq-demo
- rabbitmq:
- # host: 47.94.9.33
- # port: 5672
- # username: admin
- # password: admin
- # virtual-host: /
- addresses: amqp://admin:admin@47.94.9.33:5672/
复制代码 Work-Queue(工作队列模式)
声明队列
- package com.ljh.mq.springrabbitmqdemo.config;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitMQConfig {
- // * 工作队列模式
- @Bean
- public Queue workQueue() {
- return QueueBuilder.durable(Constants.WORK_QUEUE)
- .build();
- }
- }
复制代码 生产者
- package com.ljh.mq.springrabbitmqdemo.controller;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RequestMapping("/producer")
- @RestController
- public class ProducerController {
- private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @RequestMapping("/work")
- public String work() {
- for (int i = 0; i < 10; i++) {
- String msg = "hello work queue mode~ " + i;
- // ? 当使用默认交换机时,routingKey 和队列名称保持一致
- rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);
- }
- log.info("消息发送成功");
- return "消息发送成功";
- }
- }
复制代码 消耗者
- package com.ljh.mq.springrabbitmqdemo.listener;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class WorkListener {
- private static final Logger log = LoggerFactory.getLogger(WorkListener.class);
- @RabbitListener(queues = Constants.WORK_QUEUE)
- public void process1(Message message, Channel channel) {
- log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
- }
- @RabbitListener(queues = Constants.WORK_QUEUE)
- public void process2(String message) {
- log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);
- }
- }
复制代码 Publish/Subscribe(发布/订阅模式)
声明队列和交换机
- package com.ljh.mq.springrabbitmqdemo.config;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitMQConfig {
- // * 发布订阅模式
- @Bean("fanoutExchange")
- public FanoutExchange fanoutExchange() {
- return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE)
- .durable(true)
- .build();
- }
- @Bean("fanoutQueue1")
- public Queue fanoutQueue1 () {
- return QueueBuilder.durable(Constants.FANOUT_QUEUE1)
- .build();
- }
- @Bean("fanoutQueue2")
- public Queue fanoutQueue2 () {
- return QueueBuilder.durable(Constants.FANOUT_QUEUE2)
- .build();
- }
- @Bean("bindingFanout1")
- public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange);
- }
- @Bean("bindingFanout2")
- public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange);
- }
- }
复制代码 生产者
- package com.ljh.mq.springrabbitmqdemo.controller;
- package com.ljh.mq.springrabbitmqdemo.controller;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RequestMapping("/producer")
- @RestController
- public class ProducerController {
- private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @RequestMapping("/fanout")
- public String fanout() {
- for (int i = 0; i < 10; i++) {
- String msg = "hello publish fanout mode~ " + i;
- // ? 当使用默认交换机时,routingKey 和队列名称保持一致
- rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);
- }
- log.info("消息发送成功");
- return "消息发送成功";
- }
- }
复制代码 消耗者
- package com.ljh.mq.springrabbitmqdemo.listener;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class FanoutListener {
- private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);
- @RabbitListener(queues = Constants.FANOUT_QUEUE1)
- public void process1(String message) {
- log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);
- }
- @RabbitListener(queues = Constants.FANOUT_QUEUE2)
- public void process2(String message) {
- log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);
- }
- }
复制代码 Routing(路由模式)
声明队列和交换机
- package com.ljh.mq.springrabbitmqdemo.config;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitMQConfig {
- // * 路由模式
- @Bean("directExchange")
- public DirectExchange directExchange() {
- return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE)
- .durable(true)
- .build();
- }
- @Bean("directQueue1")
- public Queue directQueue1() {
- return QueueBuilder.durable(Constants.DIRECT_QUEUE1)
- .build();
- }
- @Bean("directQueue2")
- public Queue directQueue2() {
- return QueueBuilder.durable(Constants.DIRECT_QUEUE2)
- .build();
- }
- @Bean("bindingDirect1")
- public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("orange");
- }
- @Bean("bindingDirect2")
- public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("orange");
- }
- @Bean("bindingDirect3")
- public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("black");
- }
- }
复制代码 生产者
- package com.ljh.mq.springrabbitmqdemo.controller;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RequestMapping("/producer")
- @RestController
- public class ProducerController {
- private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @RequestMapping("/direct/{routingKey}")
- public String direct(@PathVariable("routingKey") String routingKey) {
- rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);
- log.info("消息发送成功:{}", routingKey);
- return "消息发送成功:" + routingKey;
- }
- }
复制代码 消耗者
- package com.ljh.mq.springrabbitmqdemo.listener;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class DirectListener {
- private static final Logger log = LoggerFactory.getLogger(DirectListener.class);
- @RabbitListener(queues = Constants.DIRECT_QUEUE1)
- public void process1(String message) {
- log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);
- }
- @RabbitListener(queues = Constants.DIRECT_QUEUE2)
- public void process2(String message) {
- log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);
- }
- }
复制代码 Topics(通配符模式)
声明队列和交换机
- package com.ljh.mq.springrabbitmqdemo.config;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class RabbitMQConfig {
- // * 通配符模式
- @Bean("topicExchange")
- public TopicExchange topicExchange() {
- return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE)
- .durable(true)
- .build();
- }
- @Bean("topicQueue1")
- public Queue topicQueue1() {
- return QueueBuilder.durable(Constants.TOPIC_QUEUE1)
- .build();
- }
- @Bean("topicQueue2")
- public Queue topicQueue2() {
- return QueueBuilder.durable(Constants.TOPIC_QUEUE2)
- .build();
- }
- @Bean("bindingTopic1")
- public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("*.orange.*");
- }
- @Bean("bindingTopic2")
- public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("*.*.rabbit");
- }
- @Bean("bindingTopic3")
- public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("lazy.#");
- }
- }
复制代码 生产者
- package com.ljh.mq.springrabbitmqdemo.controller;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RequestMapping("/producer")
- @RestController
- public class ProducerController {
- private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
- @Autowired
- RabbitTemplate rabbitTemplate;
- @RequestMapping("/topic/{routingKey}")
- public String topic(@PathVariable("routingKey") String routingKey) {
- rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);
- log.info("消息发送成功:{}", routingKey);
- return "消息发送成功:" + routingKey;
- }
- }
复制代码 消耗者
- package com.ljh.mq.springrabbitmqdemo.listener;
- import com.ljh.mq.springrabbitmqdemo.constants.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class TopicListener {
- private static final Logger log = LoggerFactory.getLogger(TopicListener.class);
- @RabbitListener(queues = Constants.TOPIC_QUEUE1)
- public void process1(String message) {
- log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);
- }
- @RabbitListener(queues = Constants.TOPIC_QUEUE2)
- public void process2(String message) {
- log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |