Springboot-RabbitMQ 消息队列使用

花瓣小跑  金牌会员 | 2024-6-24 06:06:18 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 874|帖子 874|积分 2622

一、概念先容:

RabbitMQ中几个重要的概念先容:


  • Channels:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接本地虚拟连接,AMQP 下令都是通过信道发出去的,不管是发布消息、订阅队列还是吸收消息,这些动作都是通过信道完成。因为对于操作体系来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Exchanges:交换器,用来吸收生产者发送的消息并将这些消息路由给服务器中的队列。

    • 交换机范例重要有以下几种:
    • Direct Exchange(直连交换机):这种范例的交换机根据消息的Routing Key(路由键)进行准确匹配,只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息通报场景。
    • Fanout Exchange(扇形交换机):这种范例的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,不管消息的路由键是什么。适用于消息必要被多个消耗者处理的场景。
    • Topic Exchange(主题交换机):这种范例的交换机支持基于模式匹配的路由键,可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
    • Headers Exchange(头交换机):这种范例的交换机不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。适用于必要在消息头中携带额外信息的场景。

  • Queues:消息队列,用来保存消息直到发送给消耗者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列内里,等待消耗者连接到这个队列将其取走。
二、引入依赖:

  1. <dependency>
  2.       <groupId>org.springframework.boot</groupId>
  3.       <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
三、添加配置信息

  1. spring:
  2.   rabbitmq:
  3.     host: 127.0.0.1
  4.     port: 5672
  5.     username: guest
  6.     password: guest
  7.     listener:
  8.       simple:
  9.         acknowledge-mode: manual  # 手动提交
复制代码
四、Direct Exchange(直连交换机)模式

1、新建配置文件 RabbitDirectConfig类

  1. package com.example.direct;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @author wasin
  10. * @version 1.0
  11. * @date 2024/6/4
  12. * @description: 直连交换机--这种类型的交换机根据消息的Routing Key(路由键)进行精确匹配,
  13. * 只有绑定了相同路由键的队列才会收到消息。适用于点对点的消息传递场景
  14. */
  15. @Configuration
  16. public class RabbitDirectConfig {
  17.     /**
  18.      * 队列名称
  19.      */
  20.     public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";
  21.     public static final String QUEUE_USER ="QUEUE_USER";
  22.     /**
  23.      * 交换机
  24.      */
  25.     public static final String EXCHANGE="EXCHANGE_01";
  26.     /**
  27.      * 路由
  28.      */
  29.     public static final String ROUTING_KEY="ROUTING_KEY_01";
  30.     @Bean
  31.     public Queue queue01() {
  32.         return new Queue(QUEUE_MESSAGE, //队列名称
  33.                 true, //是否持久化
  34.                 false, //是否排他
  35.                 false //是否自动删除
  36.         );
  37.     }
  38.     @Bean
  39.     public Queue queue02() {
  40.         return new Queue(QUEUE_USER, //队列名称
  41.                 true, //是否持久化
  42.                 false, //是否排他
  43.                 false //是否自动删除
  44.         );
  45.     }
  46.     @Bean
  47.     public DirectExchange exchange01() {
  48.         return new DirectExchange(EXCHANGE,
  49.                 true, //是否持久化
  50.                 false //是否排他
  51.         );
  52.     }
  53.     @Bean
  54.     public Binding demoBinding() {
  55.         return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);
  56.     }
  57.     @Bean
  58.     public Binding demoBinding2() {
  59.         return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);
  60.     }
  61. }
复制代码
2、添加消息生产者 Producer类

  1. package com.example.direct;
  2. import com.example.entity.User;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.stereotype.Component;
  5. import javax.annotation.Resource;
  6. /**
  7. * @author wasin
  8. * @version 1.0
  9. * @date 2024/6/4
  10. * @description:
  11. */
  12. @Component
  13. public class Producer {
  14.     @Resource
  15.     RabbitTemplate rabbitTemplate;
  16.     public void sendMessageByExchangeANdRoute(String message){
  17.         rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);
  18.     }
  19.     /**
  20.      * 默认交换器,隐式地绑定到每个队列,路由键等于队列名称。
  21.      * @param message
  22.      */
  23.     public void sendMessageByQueue(String message){
  24.         rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);
  25.     }
  26.     public void sendMessage(User user){
  27.         rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);
  28.     }
  29. }
复制代码
3、添加消息消耗者

  1. package com.example.direct;
  2. import com.example.entity.User;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author wasin
  7. * @version 1.0
  8. * @date 2024/6/4
  9. * @description:
  10. */
  11. @Component
  12. public class Consumer {
  13.     @RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)
  14.     public void onMessage(User user){
  15.         System.out.println("收到的实体bean消息:"+user);
  16.     }
  17.     @RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)
  18.     public void onMessage2(String message){
  19.         System.out.println("收到的字符串消息:"+message);
  20.     }
  21. }
复制代码
4、 测试

  1. package com.example;
  2. import com.example.entity.User;
  3. import com.example.direct.Producer;
  4. import com.example.fanout.FanoutProducer;
  5. import com.example.topic.TopicProducer;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import javax.annotation.Resource;
  9. @SpringBootTest
  10. class SpringbootRabbitMqApplicationTests {
  11.     @Resource
  12.     Producer producer;
  13.    
  14.     @Test
  15.     public void sendMessage() throws InterruptedException {
  16.         producer.sendMessageByQueue("哈哈");
  17.         producer.sendMessage(new User().setAge(10).setName("wasin"));
  18.     }
  19. }
复制代码
五、Topic Exchange(主题交换机)模式

1、新建RabbitTopicConfig类

  1. package com.example.topic;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @author wasin
  7. * @version 1.0
  8. * @date 2024/6/4
  9. * @description: 主题交换机--这种类型的交换机支持基于模式匹配的路由键,
  10. * 可以使用通配符*(匹配一个单词)和#(匹配零个或多个单词)进行匹配。适用于实现更复杂的消息路由逻辑。
  11. */
  12. @Configuration
  13. public class RabbitTopicConfig {
  14.     /**
  15.      * 交换机
  16.      */
  17.     public static final String EXCHANGE = "EXCHANGE_TOPIC1";
  18.     /**
  19.      * 队列名称
  20.      */
  21.     public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";
  22.     /**
  23.      * 路由
  24.      * "*" 与 "#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)
  25.      * 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....
  26.      * aa.bb.wasin.cc 无法匹配
  27.      */
  28.     public static final String ROUTING_KEY1 = "*.wasin.#";
  29.     @Bean
  30.     public Queue queue() {
  31.         return new Queue(QUEUE_TOPIC1, //队列名称
  32.                 true, //是否持久化
  33.                 false, //是否排他
  34.                 false //是否自动删除
  35.         );
  36.     }
  37.     @Bean
  38.     public TopicExchange exchange() {
  39.         return new TopicExchange(EXCHANGE,
  40.                 true, //是否持久化
  41.                 false //是否排他
  42.         );
  43.     }
  44.     @Bean
  45.     public Binding binding() {
  46.         return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);
  47.     }
  48. }
复制代码
2、新建 消息生产者和发送者



  • TopicProducer类
  1. package com.example.topic;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.Resource;
  5. /**
  6. * @author wasin
  7. * @version 1.0
  8. * @date 2024/6/4
  9. * @description:
  10. */
  11. @Component
  12. public class TopicProducer {
  13.     @Resource
  14.     RabbitTemplate rabbitTemplate;
  15.     /**
  16.      * @param routeKey 路由
  17.      * @param message 消息
  18.      */
  19.     public void sendMessageByQueue(String routeKey, String message){
  20.         rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);
  21.     }
  22. }
复制代码


  • TopicConsumer类
  1. package com.example.topic;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author wasin
  7. * @version 1.0
  8. * @date 2024/6/4
  9. * @description:
  10. */
  11. @Slf4j
  12. @Component
  13. public class TopicConsumer {
  14.     @RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)
  15.     public void onMessage2(String message){
  16.         log.info("topic收到的字符串消息:{}",message);
  17.     }
  18. }
复制代码
六、Fanout Exchange(扇形交换机)模式

1、 新建 RabbitFanoutConfig类

  1. package com.example.fanout;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @author wasin
  7. * @version 1.0
  8. * @date 2024/6/4
  9. * @description: 扇形交换机--这种类型的交换机采用广播模式,它会将消息发送给所有绑定到该交换机的队列,
  10. * 不管消息的路由键是什么。适用于消息需要被多个消费者处理的场景。
  11. */
  12. @Configuration
  13. public class RabbitFanoutConfig {
  14.     /**
  15.      * 交换机
  16.      */
  17.     public static final String EXCHANGE = "EXCHANGE_FANOUT";
  18.     /**
  19.      * 队列名称
  20.      */
  21.     public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";
  22.     /**
  23.      * 队列名称
  24.      */
  25.     public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";
  26.     @Bean
  27.     public Queue queueFanout1() {
  28.         return new Queue(QUEUE_FANOUT1, //队列名称
  29.                 true, //是否持久化
  30.                 false, //是否排他
  31.                 false //是否自动删除
  32.         );
  33.     }
  34.     @Bean
  35.     public Queue queueFanout2() {
  36.         return new Queue(QUEUE_FANOUT2, //队列名称
  37.                 true, //是否持久化
  38.                 false, //是否排他
  39.                 false //是否自动删除
  40.         );
  41.     }
  42.     @Bean
  43.     public FanoutExchange exchangeFanout() {
  44.         return new FanoutExchange(EXCHANGE,
  45.                 true, //是否持久化
  46.                 false //是否排他
  47.         );
  48.     }
  49.     @Bean
  50.     public Binding bindingFanout() {
  51.         return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
  52.     }
  53.     @Bean
  54.     public Binding bindingFanout2() {
  55.         return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
  56.     }
  57. }
复制代码
2、新建 消息生产者和发送者



  • FanoutProducer类:
  1. package com.example.fanout;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.Resource;
  5. /**
  6. * @author wasin
  7. * @version 1.0
  8. * @date 2024/6/4
  9. * @description:
  10. */
  11. @Component
  12. public class FanoutProducer {
  13.     @Resource
  14.     RabbitTemplate rabbitTemplate;
  15.    
  16.     /**
  17.      * @param message 消息
  18.      */
  19.     public void sendMessageByQueue(String message) {
  20.         rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);
  21.     }
  22. }
复制代码


  • FanoutConsumer类
  1. package com.example.fanout;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. /**
  10. * @author wasin
  11. * @version 1.0
  12. * @date 2024/6/4
  13. * @description:
  14. */
  15. @Slf4j
  16. @Component
  17. public class FanoutConsumer {
  18.     /**
  19.      * 手动提交
  20.      * @param message
  21.      * @param channel
  22.      * @param tag
  23.      * @throws IOException
  24.      */
  25.     @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)
  26.     public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  27.         log.info("fanout1收到的字符串消息:{}",message);
  28.         channel.basicAck(tag,false);
  29.     }
  30.     @RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)
  31.     public void onMessage2(String message){
  32.         log.info("fanout2到的字符串消息:{}",message);
  33.     }
  34. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

花瓣小跑

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表