Springboot中使用RabbitMq

打印 上一主题 下一主题

主题 875|帖子 875|积分 2625

代码地址: https://gitee.com/Aes_yt/middleware-demo/tree/master/rabbitmq
安装RabbitMq

1. docker拉取镜像
  1. docker pull rabbitmq:3.9.29-management
复制代码
2. 创建rabbitmq容器
  1. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9.29-management
复制代码
3. 访问地址

http://{ip地址}:15672/,可以看到RabbitMq的管理后台界面。账号密码默认 guest
消息生产和消费

rabbitmq-producer


  • 新建module,引入依赖
    1. <dependency>
    2.         <groupId>org.springframework.boot</groupId>
    3.         <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>
    复制代码
  • yaml配置地址
    1. spring:
    2.   rabbitmq:
    3.     host: 192.168.67.131
    4.     port: 5672
    5.     username: guest
    6.     password: guest
    7.     virtual-host: /
    复制代码
  • 配置交换机
    1. @Configuration
    2. public class RabbitMqConfig {
    3.     /*定义交换机名称*/
    4.     public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange";
    5.     @Bean
    6.     public Exchange userInfoExchange() {
    7.         return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build();
    8.     }
    9. }
    复制代码
  • 发送消息
    1.     @Test
    2.     public void sendMessage() {
    3.         String registerMsg = "user register..." + new Date();
    4.         // 1. 发送一条注册消息
    5.         rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME,
    6.                 "user.register.user1", registerMsg, msg -> {
    7.                     msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    8.                     return msg;
    9.                 });
    10.         log.info("消息发送完成:{}", registerMsg);
    11.         String loginMsg = "user login..." + new Date();
    12.         // 2. 发送一条登录消息
    13.         rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME,
    14.                 "user.login.user1", loginMsg, msg -> {
    15.                     msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    16.                     return msg;
    17.                 });
    18.         log.info("消息发送完成:{}", loginMsg);
    19.     }
    复制代码
rabbitmq-consumer


  • 新建module,引入依赖
    1. <dependency>
    2.         <groupId>org.springframework.boot</groupId>
    3.         <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>
    复制代码
  • yaml配置地址
    1. spring:
    2.   rabbitmq:
    3.     host: 192.168.67.131
    4.     port: 5672
    5.     username: guest
    6.     password: guest
    7.     virtual-host: /
    复制代码
  • 配置队列绑定关系
    1. @Configuration
    2. public class RabbitMqConfig {
    3.     @Value("${spring.rabbitmq.host}")
    4.     private String host;
    5.     @Value("${spring.rabbitmq.port}")
    6.     private Integer port;
    7.     @Value("${spring.rabbitmq.username}")
    8.     private String username;
    9.     @Value("${spring.rabbitmq.password}")
    10.     private String password;
    11.     /*定义交换机名称*/
    12.     public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange";
    13.     /*定义队列名称*/
    14.     public static final String USER_REGISTER_QUEUE_NAME = "user_register_queue";
    15.     public static final String USER_LOGIN_QUEUE_NAME = "user_login_queue";
    16.     public static final String USER_REGISTER_ROUTING_KEY = "user.register.#";
    17.     public static final String USER_LOGIN_ROUTING_KEY = "user.login.#";
    18.     @Bean
    19.     public Exchange userInfoExchange() {
    20.         return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build();
    21.     }
    22.     @Bean
    23.     public Queue userRegisterQueue() {
    24.         return QueueBuilder.durable(USER_REGISTER_QUEUE_NAME).build();
    25.     }
    26.     @Bean
    27.     public Queue userLoginQueue() {
    28.         return QueueBuilder.durable(USER_LOGIN_QUEUE_NAME).build();
    29.     }
    30.     @Bean
    31.     public Binding userRegisterBinding() {
    32.         return BindingBuilder.bind(userRegisterQueue()).to(userInfoExchange()).with(USER_REGISTER_ROUTING_KEY).noargs();
    33.     }
    34.     @Bean
    35.     public Binding userLoginBinding() {
    36.         return BindingBuilder.bind(userLoginQueue()).to(userInfoExchange()).with(USER_LOGIN_ROUTING_KEY).noargs();
    37.     }
    38. }
    复制代码
  • 监听器
    1. @Component
    2. @Slf4j
    3. public class UserInfoListener {
    4.     @RabbitListener(queues = RabbitMqConfig.USER_REGISTER_QUEUE_NAME)
    5.     public void userRegister(String msg){
    6.         log.info(msg);
    7.     }
    8.     @RabbitListener(queues = RabbitMqConfig.USER_LOGIN_QUEUE_NAME)
    9.     public void userLogin(String msg){
    10.         log.info(msg);
    11.     }
    12. }
    复制代码
  • 结果
    启动producer项目和consumer项目,producer发送消息,consumer接收到消息:
    producer:
    1. 2023-07-08 10:26:10.660  INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息发送完成:user register...Sat Jul 08 10:26:10 CST 2023
    2. 2023-07-08 10:26:10.663  INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息发送完成:user login...Sat Jul 08 10:26:10 CST 2023
    复制代码
    consumer:
    1. 2023-07-08 10:26:10.661  INFO 25108 --- [ntContainer#1-1] c.yt.rabbitmq.listener.UserInfoListener  : user register...Sat Jul 08 10:26:10 CST 2023
    2. 2023-07-08 10:26:10.665  INFO 25108 --- [ntContainer#0-1] c.yt.rabbitmq.listener.UserInfoListener  : user login...Sat Jul 08 10:26:10 CST 2023
    复制代码
交换器类型

交换器类型有四种,fanout,topic,direct,headers。
接下来在代码中创建三种交换器类型,对应的routingKey和queue绑定如表格所示。发送对应消息,看看是否能接收到 [Y/N]。headers类型不演示。
ExchangeExchangeTypeRoutingKeyMessageKeyQueueReceivefanout_exchangefanoutfanout.test.key1
fanout.#xxx.yyy.zzzfanout_test_queue1
fanout_test_queue2Y
Ytopic_exchangetopictopic.test.#
topic.#
topic.*topic.test.key1
topic_test_queue1
topic_test_queue2
topic_test_queue3Y
Y
Ndirect_exchangedirectdirect.test.key1
direct.test.#
direct.test.key3direct.test.key1
direct.test.key2
direct.test.key3direct_test_queue1
direct_test_queue2
direct_test_queue3 && direct_test_queue4Y
N
Y && Ydirect 的Routingkey是全匹配,通配符不起作用,所以direct_test_queue2没有接收到消息。
topic 的通配符,*正好匹配一个词,#可以匹配一个或多个词,所以topic_test_queue3没有接收到消息。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表