渣渣兔 发表于 2023-10-31 09:17:16

Springboot中使用RabbitMq

代码地址: https://gitee.com/Aes_yt/middleware-demo/tree/master/rabbitmq
安装RabbitMq

1. docker拉取镜像

docker pull rabbitmq:3.9.29-management2. 创建rabbitmq容器

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9.29-management3. 访问地址

http://{ip地址}:15672/,可以看到RabbitMq的管理后台界面。账号密码默认 guest
消息生产和消费

rabbitmq-producer


[*]新建module,引入依赖
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
[*]yaml配置地址
spring:
rabbitmq:
    host: 192.168.67.131
    port: 5672
    username: guest
    password: guest
    virtual-host: /
[*]配置交换机
@Configuration
public class RabbitMqConfig {
    /*定义交换机名称*/
    public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange";

    @Bean
    public Exchange userInfoExchange() {
      return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build();
    }
}
[*]发送消息
    @Test
    public void sendMessage() {

      String registerMsg = "user register..." + new Date();
      // 1. 发送一条注册消息
      rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME,
                "user.register.user1", registerMsg, msg -> {
                  msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                  return msg;
                });
      log.info("消息发送完成:{}", registerMsg);


      String loginMsg = "user login..." + new Date();
      // 2. 发送一条登录消息
      rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME,
                "user.login.user1", loginMsg, msg -> {
                  msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                  return msg;
                });
      log.info("消息发送完成:{}", loginMsg);

    }
rabbitmq-consumer


[*]新建module,引入依赖
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
[*]yaml配置地址
spring:
rabbitmq:
    host: 192.168.67.131
    port: 5672
    username: guest
    password: guest
    virtual-host: /
[*]配置队列绑定关系
@Configuration
public class RabbitMqConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private Integer port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;

    /*定义交换机名称*/
    public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange";
    /*定义队列名称*/
    public static final String USER_REGISTER_QUEUE_NAME = "user_register_queue";
    public static final String USER_LOGIN_QUEUE_NAME = "user_login_queue";

    public static final String USER_REGISTER_ROUTING_KEY = "user.register.#";
    public static final String USER_LOGIN_ROUTING_KEY = "user.login.#";

    @Bean
    public Exchange userInfoExchange() {
      return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Queue userRegisterQueue() {
      return QueueBuilder.durable(USER_REGISTER_QUEUE_NAME).build();
    }

    @Bean
    public Queue userLoginQueue() {
      return QueueBuilder.durable(USER_LOGIN_QUEUE_NAME).build();
    }

    @Bean
    public Binding userRegisterBinding() {
      return BindingBuilder.bind(userRegisterQueue()).to(userInfoExchange()).with(USER_REGISTER_ROUTING_KEY).noargs();
    }

    @Bean
    public Binding userLoginBinding() {
      return BindingBuilder.bind(userLoginQueue()).to(userInfoExchange()).with(USER_LOGIN_ROUTING_KEY).noargs();
    }

}
[*]监听器
@Component
@Slf4j
public class UserInfoListener {

    @RabbitListener(queues = RabbitMqConfig.USER_REGISTER_QUEUE_NAME)
    public void userRegister(String msg){
      log.info(msg);
    }

    @RabbitListener(queues = RabbitMqConfig.USER_LOGIN_QUEUE_NAME)
    public void userLogin(String msg){
      log.info(msg);
    }
}
[*]结果
启动producer项目和consumer项目,producer发送消息,consumer接收到消息:
producer:
2023-07-08 10:26:10.660INFO 7432 --- com.yt.rabbit.RabbitProducerTest: 消息发送完成:user register...Sat Jul 08 10:26:10 CST 2023
2023-07-08 10:26:10.663INFO 7432 --- com.yt.rabbit.RabbitProducerTest: 消息发送完成:user login...Sat Jul 08 10:26:10 CST 2023consumer:
2023-07-08 10:26:10.661INFO 25108 --- c.yt.rabbitmq.listener.UserInfoListener: user register...Sat Jul 08 10:26:10 CST 2023
2023-07-08 10:26:10.665INFO 25108 --- c.yt.rabbitmq.listener.UserInfoListener: user login...Sat Jul 08 10:26:10 CST 2023
交换器类型

交换器类型有四种,fanout,topic,direct,headers。
接下来在代码中创建三种交换器类型,对应的routingKey和queue绑定如表格所示。发送对应消息,看看是否能接收到 。headers类型不演示。
ExchangeExchangeTypeRoutingKeyMessageKeyQueueReceivefanout_exchangefanoutfanout.test.key1
fanout.#xxx.yyy.zzzfanout_test_queue1
fanout_test_queue2Y
Ytopic_exchangetopictopic.test.#
topic.#
topic.*topic.test.key1
topic_test_queue1
topic_test_queue2
topic_test_queue3Y
Y
Ndirect_exchangedirectdirect.test.key1
direct.test.#
direct.test.key3direct.test.key1
direct.test.key2
direct.test.key3direct_test_queue1
direct_test_queue2
direct_test_queue3 && direct_test_queue4Y
N
Y && Ydirect 的Routingkey是全匹配,通配符不起作用,所以direct_test_queue2没有接收到消息。
topic 的通配符,*正好匹配一个词,#可以匹配一个或多个词,所以topic_test_queue3没有接收到消息。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Springboot中使用RabbitMq