RabbitMQ基本配置

打印 上一主题 下一主题

主题 864|帖子 864|积分 2592

1.用户角色配置


自带的guest/guest 超级管理员
五中不同角色配置:

  • 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  • 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。
  • 监控者 (monitoring):登录管理控制台  查看所有的信息(Rabbit的相关节点的信息,内存使用信息,磁盘的情况)
  • 超级管理员 administrator:登录管理控制台  查看所有的信息  对所有用户的策略进行操作
  • 其他:无法登陆管理控制台,通常就是普通的生产者和消费者(仅仅接收信息或者发送消息)。

2.Virtual Hosts配置

每一个 Virtual Hosts相互隔离, 就相当于一个相对独立的RabbitMQ,===> exchange queue  message不是互通
可以理解为:一个Mysql又很多数据库,每一个数据库就相当于一个Virtual Hosts

2.1创建Virtual Hosts



2.2权限的分配

点击对应Virtual Hostsv 名字,进入配置页面


该权限配置参数说明:

  • user:用户名
  • configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
  • write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
  • read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限

3.入门案例


使用思路

生产者:  --->按照JDBC思路
1. 创建连接工厂
2. 设置的rabbitMq的服务的主机 默认localhost
3. 设置的rabbitMq的服务的端口 默认5672
4. 设置的rabbitMq的服务的虚拟主机
5. 设置用户和密码
6. 创建连接
7. 创建频道
8. 声明队列
9. 创建消息
10. 发送消息
11.关闭资源
(观察者模式)
RabbitMQ 使用的是发布订阅模式


所需依赖
  1. <dependency>
  2.   <groupId>com.rabbitmq</groupId>
  3.   <artifactId>amqp-client</artifactId>
  4.   <version>5.6.0</version>
  5. </dependency>
复制代码
对Rabbit的连接进行封装
  1. public class RabbitMQConnectionUtils {
  2.     /**
  3.      * 获得Rabbit的连接
  4.      * 类比JDBC
  5.      */
  6.     public static Connection getConection() throws IOException, TimeoutException {
  7.         //1.创建连接工厂
  8.         ConnectionFactory connectionFactory=new ConnectionFactory();
  9.         //2.设置的rabbitMq的服务的主机,默认localhost
  10.         connectionFactory.setHost("localhost");
  11.         //3.设置的rabbitMq的服务的端口,默认为5672
  12.         connectionFactory.setPort(5672);
  13.         //4.设置的rabbitMq的服务的虚拟主机
  14.         connectionFactory.setVirtualHost("testmq");
  15.         //5.设置用户和密码
  16.         connectionFactory.setUsername("chenbuting");
  17.         connectionFactory.setPassword("123");
  18.         //6.创建连接
  19.         Connection connection=connectionFactory.newConnection();
  20.         return connection;
  21.     }
  22. }
复制代码
基本消息队列


生产者Product相关代码
  1. public class Product {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         Connection connection=RabbitMQConnectionUtils.getConection();
  4.         //7.创建频道
  5.         Channel channel= connection.createChannel();
  6.         //8.声明队列(创建队列)
  7.         /*
  8.          * 参数1:指定队列的名称
  9.          * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
  10.          * 参数3:指定是否独占通道
  11.          * 参数4:是否自动删除
  12.          * 参数5:指定额外参数
  13.          */
  14.         channel.queueDeclare("simple_01",true,false,false,null);
  15.         //9.创建消息
  16.         String message="hello ,my name is chenbuting";
  17.         //10.发送消息
  18.         /*
  19.          * 参数1:指定交换机  简单模式中使用默认交换机  指定空字符串
  20.          * 参数2: 指定routingkey  简单模式  只需要指定队列名称即可
  21.          * 参数3: 指定携带的额外的参数  null
  22.          * 参数4:要发送的消息本身(字节数组)
  23.          */
  24.         channel.basicPublish("","simple_01",null,message.getBytes());
  25.         //11.关闭资源
  26.         channel.close();
  27.         connection.close();
  28.     }
  29. }
复制代码
消费者Consumer相关代码
  1. public class Consumer {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         //获取连接
  4.         Connection connection= RabbitMQConnectionUtils.getConection();
  5.         //7.创建频道
  6.         Channel channel=connection.createChannel();
  7.         //8.声明队列
  8.         channel.queueDeclare("simple_01",true,false,false,null);
  9.         //9.接受消息
  10.         //处理消息
  11.         DefaultConsumer consumer = new DefaultConsumer(channel){
  12.             /*
  13.              *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
  14.              * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
  15.              * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
  16.              * body: 消息的正文,以字节数组的形式提供。
  17.              */
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 System.out.println("消息本身"+new String(body,"utf-8"));
  21.                 System.out.println("exchange"+envelope.getExchange());
  22.                 System.out.println("RoutingKey"+envelope.getRoutingKey());
  23.                 System.out.println("消息的序号"+envelope.getDeliveryTag());
  24.             }
  25.         };
  26.         //监听器----->观察者设计模式
  27.         channel.basicConsume("simple_01",true,consumer);
  28.         //10.建议不要关闭资源     建议一直监听消息
  29.     }
  30. }
复制代码


idea允许开启多个实例的方式



工作消息队列


生产者相关代码
  1. public class WorkProduct {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         Connection connection=RabbitMQConnectionUtils.getConection();
  4.         //7.创建频道
  5.         Channel channel= connection.createChannel();
  6.         //8.声明队列(创建队列)
  7.         /*
  8.          * 参数1:指定队列的名称
  9.          * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
  10.          * 参数3:指定是否独占通道
  11.          * 参数4:是否自动删除
  12.          * 参数5:指定额外参数
  13.          */
  14.         channel.queueDeclare("working_02",true,false,false,null);
  15.         for (int i=0;i<30;i++) {
  16.             //9.创建消息
  17.             String message = "hello ,my name is CHENBUTING"+i;
  18.             //10.发送消息
  19.             /*
  20.              * 参数1:指定交换机  简单模式中使用默认交换机  指定空字符串
  21.              * 参数2: 指定routingkey  简单模式  只需要指定队列名称即可
  22.              * 参数3: 指定携带的额外的参数  null
  23.              * 参数4:要发送的消息本身(字节数组)
  24.              */
  25.             channel.basicPublish("", "working_02", null, message.getBytes());
  26.         }
  27.         //11.关闭资源
  28.         channel.close();
  29.         connection.close();
  30.     }
  31. }
复制代码
结果


订阅模式


Fanout模式

Fanout生产者的相关代码
  1. public class WorkConsumer_1 {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         //获取连接
  4.         Connection connection= RabbitMQConnectionUtils.getConection();
  5.         //7.创建频道
  6.         Channel channel=connection.createChannel();
  7.         //8.声明队列
  8.         channel.queueDeclare("working_02",true,false,false,null);
  9.         //9.接受消息
  10.         //处理消息
  11.         DefaultConsumer consumer = new DefaultConsumer(channel){
  12.             /*
  13.              *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
  14.              * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
  15.              * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
  16.              * body: 消息的正文,以字节数组的形式提供。
  17.              */
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 System.out.println("消息本身"+new String(body,"utf-8"));
  21.                 System.out.println("exchange"+envelope.getExchange());
  22.                 System.out.println("RoutingKey"+envelope.getRoutingKey());
  23.                 System.out.println("消息的序号"+envelope.getDeliveryTag());
  24.             }
  25.         };
  26.         //监听器----->观察者设计模式
  27.         channel.basicConsume("working_02",true,consumer);
  28.         //10.建议不要关闭资源     建议一直监听消息
  29.     }
  30. }
复制代码
Fanout消费者的相关代码
  1. public class FanoutProduct {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         Connection connection=RabbitMQConnectionUtils.getConection();
  4.         //7.创建频道
  5.         Channel channel= connection.createChannel();
  6.         //创建交换机
  7.         channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT);
  8.         //8.声明队列(创建队列)
  9.         /*
  10.          * 参数1:指定队列的名称
  11.          * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
  12.          * 参数3:指定是否独占通道
  13.          * 参数4:是否自动删除
  14.          * 参数5:指定额外参数
  15.          */
  16.         channel.queueDeclare("fanout_queue1",true,false,false,null);
  17.         channel.queueDeclare("fanout_queue2",true,false,false,null);
  18.         //9.创建消息
  19.         String message = "my name is Fanout";
  20.         //10.发送消息
  21.         /*
  22.          *这段代码使用 RabbitMQ 的 Java 客户端库中的 `queueBind` 方法来将一个队列绑定到一个 fanout 类型的交换机上。下面是各个参数的介绍:
  23.                 1. `fanout_queue1`: 队列名称,表示要绑定的队列的名称。
  24.                 2. `exchange_fanout`: 交换机名称,表示要进行绑定的交换机的名称。
  25.                 3. `""`: 路由键,表示要使用的路由键。
  26.          */
  27.         channel.queueBind("fanout_queue1","exchange_fanout","");
  28.         channel.queueBind("fanout_queue2","exchange_fanout","");
  29.         /*
  30.          * 参数1:指定交换机  简单模式中使用默认交换机  指定空字符串
  31.          * 参数2: 指定routingkey  简单模式  只需要指定队列名称即可
  32.          * 参数3: 指定携带的额外的参数  null
  33.          * 参数4:要发送的消息本身(字节数组)
  34.          */
  35.         channel.basicPublish("exchange_fanout", "", null, message.getBytes());
  36.         //11.关闭资源
  37.         channel.close();
  38.         connection.close();
  39.     }
  40. }
复制代码
direct模式

direct生产者模式
  1. public class FanoutConsumer_1 {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         //获取连接
  4.         Connection connection= RabbitMQConnectionUtils.getConection();
  5.         //7.创建频道
  6.         Channel channel=connection.createChannel();
  7.         //8.声明队列
  8.         channel.queueDeclare("fanout_queue1",true,false,false,null);
  9.         //9.接受消息
  10.         //处理消息
  11.         DefaultConsumer consumer = new DefaultConsumer(channel){
  12.             /*
  13.              *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
  14.              * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
  15.              * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
  16.              * body: 消息的正文,以字节数组的形式提供。
  17.              */
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 System.out.println("消息本身"+new String(body,"utf-8"));
  21.                 System.out.println("exchange"+envelope.getExchange());
  22.                 System.out.println("RoutingKey"+envelope.getRoutingKey());
  23.                 System.out.println("消息的序号"+envelope.getDeliveryTag());
  24.             }
  25.         };
  26.         //监听器----->观察者设计模式
  27.         channel.basicConsume("fanout_queue1",true,consumer);
  28.         //10.建议不要关闭资源     建议一直监听消息
  29.     }
  30. }
复制代码
direct消费者模式
  1. public class DirectProduct {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         Connection connection=RabbitMQConnectionUtils.getConection();
  4.         //7.创建频道
  5.         Channel channel= connection.createChannel();
  6.         //创建交换机
  7.         channel.exchangeDeclare("exchange_direct", BuiltinExchangeType.DIRECT);
  8.         //8.声明队列(创建队列)
  9.         /*
  10.          * 参数1:指定队列的名称
  11.          * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
  12.          * 参数3:指定是否独占通道
  13.          * 参数4:是否自动删除
  14.          * 参数5:指定额外参数
  15.          */
  16.         channel.queueDeclare("direct_queue1",true,false,false,null);
  17.         channel.queueDeclare("direct_queue2",true,false,false,null);
  18.         //9.创建消息
  19.         String message1 = "this is add";
  20.         String message2 = "this is select";
  21.         //10.发送消息
  22.         /*
  23.          *这段代码使用 RabbitMQ 的 Java 客户端库中的 `queueBind` 方法来将一个队列绑定到一个 fanout 类型的交换机上。下面是各个参数的介绍:
  24.                 1. `fanout_queue1`: 队列名称,表示要绑定的队列的名称。
  25.                 2. `exchange_fanout`: 交换机名称,表示要进行绑定的交换机的名称。
  26.                 3. `""`: 路由键,表示要使用的路由键。
  27.          */
  28.         channel.queueBind("direct_queue1","exchange_direct","user.add");
  29.         channel.queueBind("direct_queue2","exchange_direct","user.select");
  30.         /*
  31.          * 参数1:指定交换机  简单模式中使用默认交换机  指定空字符串
  32.          * 参数2: 指定routingkey  简单模式  只需要指定队列名称即可
  33.          * 参数3: 指定携带的额外的参数  null
  34.          * 参数4:要发送的消息本身(字节数组)
  35.          */
  36.         channel.basicPublish("exchange_direct", "user.add", null, message1.getBytes());
  37.         channel.basicPublish("exchange_direct", "user.select", null, message2.getBytes());
  38.         //11.关闭资源
  39.         channel.close();
  40.         connection.close();
  41.     }
  42. }
复制代码
Topic模式

生产模式相关代码
  1. public class DirectConsumer_1 {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         //获取连接
  4.         Connection connection= RabbitMQConnectionUtils.getConection();
  5.         //7.创建频道
  6.         Channel channel=connection.createChannel();
  7.         //8.声明队列
  8.         channel.queueDeclare("direct_queue1",true,false,false,null);
  9.         //9.接受消息
  10.         //处理消息
  11.         DefaultConsumer consumer = new DefaultConsumer(channel){
  12.             /*
  13.              *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
  14.              * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
  15.              * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
  16.              * body: 消息的正文,以字节数组的形式提供。
  17.              */
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 System.out.println("消息本身"+new String(body,"utf-8"));
  21.                 System.out.println("exchange"+envelope.getExchange());
  22.                 System.out.println("RoutingKey"+envelope.getRoutingKey());
  23.                 System.out.println("消息的序号"+envelope.getDeliveryTag());
  24.             }
  25.         };
  26.         //监听器----->观察者设计模式
  27.         channel.basicConsume("direct_queue1",true,consumer);
  28.         //10.建议不要关闭资源     建议一直监听消息
  29.     }
  30. }
复制代码
消费者相关代码
  1. public class TopicProduct {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         Connection connection=RabbitMQConnectionUtils.getConection();
  4.         //7.创建频道
  5.         Channel channel= connection.createChannel();
  6.         //创建交换机
  7.         channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC);
  8.         //8.声明队列(创建队列)
  9.         /*
  10.          * 参数1:指定队列的名称
  11.          * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
  12.          * 参数3:指定是否独占通道
  13.          * 参数4:是否自动删除
  14.          * 参数5:指定额外参数
  15.          */
  16.         channel.queueDeclare("topic_queue1",true,false,false,null);
  17.         channel.queueDeclare("topic_queue2",true,false,false,null);
  18.         //9.创建消息
  19.         String message1 = "this is add";
  20.         String message2 = "this is select";
  21.         String message3 = "this is update";
  22.         String message4 = "this is delete";
  23.         //10.发送消息
  24.         /*
  25.          *这段代码使用 RabbitMQ 的 Java 客户端库中的 `queueBind` 方法来将一个队列绑定到一个 fanout 类型的交换机上。下面是各个参数的介绍:
  26.                 1. `fanout_queue1`: 队列名称,表示要绑定的队列的名称。
  27.                 2. `exchange_fanout`: 交换机名称,表示要进行绑定的交换机的名称。
  28.                 3. `""`: 路由键,表示要使用的路由键。
  29.          */
  30.         channel.queueBind("topic_queue1","exchange_topic","user.*");
  31.         channel.queueBind("topic_queue2","exchange_topic","item.*");
  32.         /*
  33.          * 参数1:指定交换机  简单模式中使用默认交换机  指定空字符串
  34.          * 参数2: 指定routingkey  简单模式  只需要指定队列名称即可
  35.          * 参数3: 指定携带的额外的参数  null
  36.          * 参数4:要发送的消息本身(字节数组)
  37.          */
  38.         channel.basicPublish("exchange_topic", "user.add", null, message1.getBytes());
  39.         channel.basicPublish("exchange_topic", "user.select", null, message2.getBytes());
  40.         channel.basicPublish("exchange_topic", "item.update", null, message3.getBytes());
  41.         channel.basicPublish("exchange_topic", "item.delete", null, message4.getBytes());
  42.         //11.关闭资源
  43.         channel.close();
  44.         connection.close();
  45.     }
  46. }
复制代码
主意:


不要忘记在使用订阅模式时修改该处的交换机分发策略

4.        springboot整合RabbitMQ

本次整合需要创建2个springboot项目,一个为生产者,一个为消费者。

direct exchange(直连型交换机)


生产者相关的整合

相关依赖
  1. public class TopicConsumer_1 {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         //获取连接
  4.         Connection connection= RabbitMQConnectionUtils.getConection();
  5.         //7.创建频道
  6.         Channel channel=connection.createChannel();
  7.         //8.声明队列
  8.         channel.queueDeclare("topic_queue1",true,false,false,null);
  9.         //9.接受消息
  10.         //处理消息
  11.         DefaultConsumer consumer = new DefaultConsumer(channel){
  12.             /*
  13.              *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
  14.              * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
  15.              * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
  16.              * body: 消息的正文,以字节数组的形式提供。
  17.              */
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 System.out.println("消息本身"+new String(body,"utf-8"));
  21.                 System.out.println("exchange"+envelope.getExchange());
  22.                 System.out.println("RoutingKey"+envelope.getRoutingKey());
  23.                 System.out.println("消息的序号"+envelope.getDeliveryTag());
  24.             }
  25.         };
  26.         //监听器----->观察者设计模式
  27.         channel.basicConsume("topic_queue1",true,consumer);
  28.         //10.建议不要关闭资源     建议一直监听消息
  29.     }
  30. }
复制代码
application相关配置
  1. <dependency>
  2.   <groupId>org.springframework.boot</groupId>
  3.   <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6.   <groupId>org.springframework.boot</groupId>
  7.   <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
复制代码
配置类
  1. spring:
  2.   #给项目来个名字
  3.   application:
  4.     name: rabbitmq-provider
  5.   #配置rabbitMq 服务器
  6.   rabbitmq:
  7.     host: localhost
  8.     port: 5672
  9.     username: chenbuting
  10.     password: root
  11.     #虚拟host 可以不设置,使用server默认host
  12.     #virtual-host: JCcccHost
复制代码
controller层
  1. @Configuration
  2. public class DirectRabbitConfig {
  3.     //队列 起名:TestDirectQueue
  4.     @Bean
  5.     public Queue TestDirectQueue() {
  6.         // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  7.         // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  8.         // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  9.         //   return new Queue("TestDirectQueue",true,true,false);
  10.         //一般设置一下队列的持久化就好,其余两个就是默认false
  11.         return new Queue("TestDirectQueue",true);
  12.     }
  13.     //Direct交换机 起名:TestDirectExchange
  14.     @Bean
  15.     DirectExchange TestDirectExchange() {
  16.         //  return new DirectExchange("TestDirectExchange",true,true);
  17.         return new DirectExchange("TestDirectExchange",true,false);
  18.     }
  19.     //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  20.     @Bean
  21.     Binding bindingDirect() {
  22.         return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  23.     }
  24.     //单独的 Direct 类型的交换机
  25.     @Bean
  26.     DirectExchange lonelyDirectExchange() {
  27.         return new DirectExchange("lonelyDirectExchange");
  28.     }
  29. }
复制代码


消费者相关的整合

所需pom依赖
  1. public class TopicConsumer_1 {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         //获取连接
  4.         Connection connection= RabbitMQConnectionUtils.getConection();
  5.         //7.创建频道
  6.         Channel channel=connection.createChannel();
  7.         //8.声明队列
  8.         channel.queueDeclare("topic_queue1",true,false,false,null);
  9.         //9.接受消息
  10.         //处理消息
  11.         DefaultConsumer consumer = new DefaultConsumer(channel){
  12.             /*
  13.              *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
  14.              * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
  15.              * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
  16.              * body: 消息的正文,以字节数组的形式提供。
  17.              */
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 System.out.println("消息本身"+new String(body,"utf-8"));
  21.                 System.out.println("exchange"+envelope.getExchange());
  22.                 System.out.println("RoutingKey"+envelope.getRoutingKey());
  23.                 System.out.println("消息的序号"+envelope.getDeliveryTag());
  24.             }
  25.         };
  26.         //监听器----->观察者设计模式
  27.         channel.basicConsume("topic_queue1",true,consumer);
  28.         //10.建议不要关闭资源     建议一直监听消息
  29.     }
  30. }
复制代码
application相关配置
  1. <dependency>
  2.   <groupId>org.springframework.boot</groupId>
  3.   <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6.   <groupId>org.springframework.boot</groupId>
  7.   <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
复制代码
相关配置类(消费者单独使用,可以不用添加配置,直接监听就行,直接使用注释监听器监听对应的队列即可,配置的话,则消费者也可当做生产者的身份,这样也就可以推送消息了
  1. spring:
  2.   #给项目来个名字
  3.   application:
  4.     name: rabbitmq-provider
  5.   #配置rabbitMq 服务器
  6.   rabbitmq:
  7.     host: localhost
  8.     port: 5672
  9.     username: chenbuting
  10.     password: root
  11.     #虚拟host 可以不设置,使用server默认host
  12.     #virtual-host: JCcccHost
复制代码
创建消息监听
  1. @Configuration
  2. public class DirectRabbitConfig {
  3.     //队列 起名:TestDirectQueue
  4.     @Bean
  5.     public Queue TestDirectQueue() {
  6.         return new Queue("TestDirectQueue",true);
  7.     }
  8.     //Direct交换机 起名:TestDirectExchange
  9.     @Bean
  10.     DirectExchange TestDirectExchange() {
  11.         return new DirectExchange("TestDirectExchange");
  12.     }
  13.     //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  14.     @Bean
  15.     Binding bindingDirect() {
  16.         return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  17.     }
  18. }
复制代码
Topic Exchange主题交换机


生产者相关整合

相关配置类
  1. @Component
  2. @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
  3. public class DirectReceiver {
  4.     @RabbitHandler
  5.     public void process(Map testMessage) {
  6.         System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
  7.     }
  8. }
复制代码
添加两个接口,用于将消息推送到交换机上
  1. @Configuration
  2. public class TopicRabbitConfig {
  3.     //绑定键
  4.     public final static String man = "topic.man";
  5.     public final static String woman = "topic.woman";
  6.     @Bean
  7.     public Queue firstQueue() {
  8.         return new Queue(TopicRabbitConfig.man);
  9.     }
  10.     @Bean
  11.     public Queue secondQueue() {
  12.         return new Queue(TopicRabbitConfig.woman);
  13.     }
  14.     @Bean
  15.     TopicExchange exchange() {
  16.         return new TopicExchange("topicExchange");
  17.     }
  18.     //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
  19.     //这样只要是消息携带的路由键是topic.man,才会分发到该队列
  20.     @Bean
  21.     Binding bindingExchangeMessage() {
  22.         return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
  23.     }
  24.     //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
  25.     // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
  26.     @Bean
  27.     Binding bindingExchangeMessage2() {
  28.         return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
  29.     }
  30. }
复制代码
消费者相关整合

创建TopicManReceiver
  1.     @GetMapping("/sendTopicMessage1")
  2.     public String sendTopicMessage1() {
  3.         String messageId = String.valueOf(UUID.randomUUID());
  4.         String messageData = "message: M A N ";
  5.         String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  6.         Map<String, Object> manMap = new HashMap<>();
  7.         manMap.put("messageId", messageId);
  8.         manMap.put("messageData", messageData);
  9.         manMap.put("createTime", createTime);
  10.         rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
  11.         return "ok";
  12.     }
  13.     @GetMapping("/sendTopicMessage2")
  14.     public String sendTopicMessage2() {
  15.         String messageId = String.valueOf(UUID.randomUUID());
  16.         String messageData = "message: woman is all ";
  17.         String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  18.         Map<String, Object> womanMap = new HashMap<>();
  19.         womanMap.put("messageId", messageId);
  20.         womanMap.put("messageData", messageData);
  21.         womanMap.put("createTime", createTime);
  22.         rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
  23.         return "ok";
  24.     }
  25. }
复制代码
创建TopicTotalReceiver
  1. @Component
  2. @RabbitListener(queues = "topic.man")
  3. public class TopicManReceiver {
  4.     @RabbitHandler
  5.     public void process(Map testMessage) {
  6.         System.out.println("TopicManReceiver消费者收到消息  : " + testMessage.toString());
  7.     }
  8. }
复制代码
其他类似

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

雁过留声

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表