1.用户角色配置

自带的guest/guest 超级管理员
五中不同角色配置:
- 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
- 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。
- 监控者 (monitoring):登录管理控制台 查看所有的信息(Rabbit的相关节点的信息,内存使用信息,磁盘的情况)
- 超级管理员 administrator:登录管理控制台 查看所有的信息 对所有用户的策略进行操作
- 其他:无法登陆管理控制台,通常就是普通的生产者和消费者(仅仅接收信息或者发送消息)。
2.Virtual Hosts配置
每一个 Virtual Hosts相互隔离, 就相当于一个相对独立的RabbitMQ,===> exchange queue message不是互通
可以理解为:一个Mysql又很多数据库,每一个数据库就相当于一个Virtual Hosts
2.1创建Virtual Hosts

2.2权限的分配
点击对应Virtual Hostsv 名字,进入配置页面


该权限配置参数说明:
- user:用户名
- configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
- write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
- read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限
3.入门案例
使用思路
生产者: --->按照JDBC思路
1. 创建连接工厂
2. 设置的rabbitMq的服务的主机 默认localhost
3. 设置的rabbitMq的服务的端口 默认5672
4. 设置的rabbitMq的服务的虚拟主机
5. 设置用户和密码
6. 创建连接
7. 创建频道
8. 声明队列
9. 创建消息
10. 发送消息
11.关闭资源
(观察者模式)
RabbitMQ 使用的是发布订阅模式

所需依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.6.0</version>
- </dependency>
复制代码 对Rabbit的连接进行封装
- public class RabbitMQConnectionUtils {
- /**
- * 获得Rabbit的连接
- * 类比JDBC
- */
- public static Connection getConection() throws IOException, TimeoutException {
- //1.创建连接工厂
- ConnectionFactory connectionFactory=new ConnectionFactory();
- //2.设置的rabbitMq的服务的主机,默认localhost
- connectionFactory.setHost("localhost");
- //3.设置的rabbitMq的服务的端口,默认为5672
- connectionFactory.setPort(5672);
- //4.设置的rabbitMq的服务的虚拟主机
- connectionFactory.setVirtualHost("testmq");
- //5.设置用户和密码
- connectionFactory.setUsername("chenbuting");
- connectionFactory.setPassword("123");
- //6.创建连接
- Connection connection=connectionFactory.newConnection();
- return connection;
- }
- }
复制代码 基本消息队列
生产者Product相关代码
- public class Product {
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection=RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel= connection.createChannel();
- //8.声明队列(创建队列)
- /*
- * 参数1:指定队列的名称
- * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
- * 参数3:指定是否独占通道
- * 参数4:是否自动删除
- * 参数5:指定额外参数
- */
- channel.queueDeclare("simple_01",true,false,false,null);
- //9.创建消息
- String message="hello ,my name is chenbuting";
- //10.发送消息
- /*
- * 参数1:指定交换机 简单模式中使用默认交换机 指定空字符串
- * 参数2: 指定routingkey 简单模式 只需要指定队列名称即可
- * 参数3: 指定携带的额外的参数 null
- * 参数4:要发送的消息本身(字节数组)
- */
- channel.basicPublish("","simple_01",null,message.getBytes());
- //11.关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 消费者Consumer相关代码
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection= RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel=connection.createChannel();
- //8.声明队列
- channel.queueDeclare("simple_01",true,false,false,null);
- //9.接受消息
- //处理消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- /*
- *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
- * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
- * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
- * body: 消息的正文,以字节数组的形式提供。
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消息本身"+new String(body,"utf-8"));
- System.out.println("exchange"+envelope.getExchange());
- System.out.println("RoutingKey"+envelope.getRoutingKey());
- System.out.println("消息的序号"+envelope.getDeliveryTag());
- }
- };
- //监听器----->观察者设计模式
- channel.basicConsume("simple_01",true,consumer);
- //10.建议不要关闭资源 建议一直监听消息
- }
- }
复制代码
idea允许开启多个实例的方式

工作消息队列
生产者相关代码
- public class WorkProduct {
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection=RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel= connection.createChannel();
- //8.声明队列(创建队列)
- /*
- * 参数1:指定队列的名称
- * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
- * 参数3:指定是否独占通道
- * 参数4:是否自动删除
- * 参数5:指定额外参数
- */
- channel.queueDeclare("working_02",true,false,false,null);
- for (int i=0;i<30;i++) {
- //9.创建消息
- String message = "hello ,my name is CHENBUTING"+i;
- //10.发送消息
- /*
- * 参数1:指定交换机 简单模式中使用默认交换机 指定空字符串
- * 参数2: 指定routingkey 简单模式 只需要指定队列名称即可
- * 参数3: 指定携带的额外的参数 null
- * 参数4:要发送的消息本身(字节数组)
- */
- channel.basicPublish("", "working_02", null, message.getBytes());
- }
- //11.关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 结果
 
订阅模式
Fanout模式
Fanout生产者的相关代码- public class WorkConsumer_1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection= RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel=connection.createChannel();
- //8.声明队列
- channel.queueDeclare("working_02",true,false,false,null);
- //9.接受消息
- //处理消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- /*
- *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
- * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
- * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
- * body: 消息的正文,以字节数组的形式提供。
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消息本身"+new String(body,"utf-8"));
- System.out.println("exchange"+envelope.getExchange());
- System.out.println("RoutingKey"+envelope.getRoutingKey());
- System.out.println("消息的序号"+envelope.getDeliveryTag());
- }
- };
- //监听器----->观察者设计模式
- channel.basicConsume("working_02",true,consumer);
- //10.建议不要关闭资源 建议一直监听消息
- }
- }
复制代码 Fanout消费者的相关代码- public class FanoutProduct {
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection=RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel= connection.createChannel();
- //创建交换机
- channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT);
- //8.声明队列(创建队列)
- /*
- * 参数1:指定队列的名称
- * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
- * 参数3:指定是否独占通道
- * 参数4:是否自动删除
- * 参数5:指定额外参数
- */
- channel.queueDeclare("fanout_queue1",true,false,false,null);
- channel.queueDeclare("fanout_queue2",true,false,false,null);
- //9.创建消息
- String message = "my name is Fanout";
- //10.发送消息
- /*
- *这段代码使用 RabbitMQ 的 Java 客户端库中的 `queueBind` 方法来将一个队列绑定到一个 fanout 类型的交换机上。下面是各个参数的介绍:
- 1. `fanout_queue1`: 队列名称,表示要绑定的队列的名称。
- 2. `exchange_fanout`: 交换机名称,表示要进行绑定的交换机的名称。
- 3. `""`: 路由键,表示要使用的路由键。
- */
- channel.queueBind("fanout_queue1","exchange_fanout","");
- channel.queueBind("fanout_queue2","exchange_fanout","");
- /*
- * 参数1:指定交换机 简单模式中使用默认交换机 指定空字符串
- * 参数2: 指定routingkey 简单模式 只需要指定队列名称即可
- * 参数3: 指定携带的额外的参数 null
- * 参数4:要发送的消息本身(字节数组)
- */
- channel.basicPublish("exchange_fanout", "", null, message.getBytes());
- //11.关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 direct模式
direct生产者模式- public class FanoutConsumer_1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection= RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel=connection.createChannel();
- //8.声明队列
- channel.queueDeclare("fanout_queue1",true,false,false,null);
- //9.接受消息
- //处理消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- /*
- *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
- * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
- * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
- * body: 消息的正文,以字节数组的形式提供。
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消息本身"+new String(body,"utf-8"));
- System.out.println("exchange"+envelope.getExchange());
- System.out.println("RoutingKey"+envelope.getRoutingKey());
- System.out.println("消息的序号"+envelope.getDeliveryTag());
- }
- };
- //监听器----->观察者设计模式
- channel.basicConsume("fanout_queue1",true,consumer);
- //10.建议不要关闭资源 建议一直监听消息
- }
- }
复制代码 direct消费者模式- public class DirectProduct {
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection=RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel= connection.createChannel();
- //创建交换机
- channel.exchangeDeclare("exchange_direct", BuiltinExchangeType.DIRECT);
- //8.声明队列(创建队列)
- /*
- * 参数1:指定队列的名称
- * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
- * 参数3:指定是否独占通道
- * 参数4:是否自动删除
- * 参数5:指定额外参数
- */
- channel.queueDeclare("direct_queue1",true,false,false,null);
- channel.queueDeclare("direct_queue2",true,false,false,null);
- //9.创建消息
- String message1 = "this is add";
- String message2 = "this is select";
- //10.发送消息
- /*
- *这段代码使用 RabbitMQ 的 Java 客户端库中的 `queueBind` 方法来将一个队列绑定到一个 fanout 类型的交换机上。下面是各个参数的介绍:
- 1. `fanout_queue1`: 队列名称,表示要绑定的队列的名称。
- 2. `exchange_fanout`: 交换机名称,表示要进行绑定的交换机的名称。
- 3. `""`: 路由键,表示要使用的路由键。
- */
- channel.queueBind("direct_queue1","exchange_direct","user.add");
- channel.queueBind("direct_queue2","exchange_direct","user.select");
- /*
- * 参数1:指定交换机 简单模式中使用默认交换机 指定空字符串
- * 参数2: 指定routingkey 简单模式 只需要指定队列名称即可
- * 参数3: 指定携带的额外的参数 null
- * 参数4:要发送的消息本身(字节数组)
- */
- channel.basicPublish("exchange_direct", "user.add", null, message1.getBytes());
- channel.basicPublish("exchange_direct", "user.select", null, message2.getBytes());
- //11.关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 Topic模式
生产模式相关代码- public class DirectConsumer_1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection= RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel=connection.createChannel();
- //8.声明队列
- channel.queueDeclare("direct_queue1",true,false,false,null);
- //9.接受消息
- //处理消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- /*
- *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
- * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
- * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
- * body: 消息的正文,以字节数组的形式提供。
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消息本身"+new String(body,"utf-8"));
- System.out.println("exchange"+envelope.getExchange());
- System.out.println("RoutingKey"+envelope.getRoutingKey());
- System.out.println("消息的序号"+envelope.getDeliveryTag());
- }
- };
- //监听器----->观察者设计模式
- channel.basicConsume("direct_queue1",true,consumer);
- //10.建议不要关闭资源 建议一直监听消息
- }
- }
复制代码 消费者相关代码- public class TopicProduct {
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection=RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel= connection.createChannel();
- //创建交换机
- channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC);
- //8.声明队列(创建队列)
- /*
- * 参数1:指定队列的名称
- * 参数2:指定消息是否持久化,一般设置为true,是否保存到磁盘当中去
- * 参数3:指定是否独占通道
- * 参数4:是否自动删除
- * 参数5:指定额外参数
- */
- channel.queueDeclare("topic_queue1",true,false,false,null);
- channel.queueDeclare("topic_queue2",true,false,false,null);
- //9.创建消息
- String message1 = "this is add";
- String message2 = "this is select";
- String message3 = "this is update";
- String message4 = "this is delete";
- //10.发送消息
- /*
- *这段代码使用 RabbitMQ 的 Java 客户端库中的 `queueBind` 方法来将一个队列绑定到一个 fanout 类型的交换机上。下面是各个参数的介绍:
- 1. `fanout_queue1`: 队列名称,表示要绑定的队列的名称。
- 2. `exchange_fanout`: 交换机名称,表示要进行绑定的交换机的名称。
- 3. `""`: 路由键,表示要使用的路由键。
- */
- channel.queueBind("topic_queue1","exchange_topic","user.*");
- channel.queueBind("topic_queue2","exchange_topic","item.*");
- /*
- * 参数1:指定交换机 简单模式中使用默认交换机 指定空字符串
- * 参数2: 指定routingkey 简单模式 只需要指定队列名称即可
- * 参数3: 指定携带的额外的参数 null
- * 参数4:要发送的消息本身(字节数组)
- */
- channel.basicPublish("exchange_topic", "user.add", null, message1.getBytes());
- channel.basicPublish("exchange_topic", "user.select", null, message2.getBytes());
- channel.basicPublish("exchange_topic", "item.update", null, message3.getBytes());
- channel.basicPublish("exchange_topic", "item.delete", null, message4.getBytes());
- //11.关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 主意:

不要忘记在使用订阅模式时修改该处的交换机分发策略
4. springboot整合RabbitMQ
本次整合需要创建2个springboot项目,一个为生产者,一个为消费者。
direct exchange(直连型交换机)
生产者相关的整合
相关依赖- public class TopicConsumer_1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection= RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel=connection.createChannel();
- //8.声明队列
- channel.queueDeclare("topic_queue1",true,false,false,null);
- //9.接受消息
- //处理消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- /*
- *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
- * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
- * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
- * body: 消息的正文,以字节数组的形式提供。
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消息本身"+new String(body,"utf-8"));
- System.out.println("exchange"+envelope.getExchange());
- System.out.println("RoutingKey"+envelope.getRoutingKey());
- System.out.println("消息的序号"+envelope.getDeliveryTag());
- }
- };
- //监听器----->观察者设计模式
- channel.basicConsume("topic_queue1",true,consumer);
- //10.建议不要关闭资源 建议一直监听消息
- }
- }
复制代码 application相关配置- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
复制代码 配置类- spring:
- #给项目来个名字
- application:
- name: rabbitmq-provider
- #配置rabbitMq 服务器
- rabbitmq:
- host: localhost
- port: 5672
- username: chenbuting
- password: root
- #虚拟host 可以不设置,使用server默认host
- #virtual-host: JCcccHost
复制代码 controller层- @Configuration
- public class DirectRabbitConfig {
- //队列 起名:TestDirectQueue
- @Bean
- public Queue TestDirectQueue() {
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("TestDirectQueue",true);
- }
- //Direct交换机 起名:TestDirectExchange
- @Bean
- DirectExchange TestDirectExchange() {
- // return new DirectExchange("TestDirectExchange",true,true);
- return new DirectExchange("TestDirectExchange",true,false);
- }
- //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
- @Bean
- Binding bindingDirect() {
- return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
- }
- //单独的 Direct 类型的交换机
- @Bean
- DirectExchange lonelyDirectExchange() {
- return new DirectExchange("lonelyDirectExchange");
- }
- }
复制代码
消费者相关的整合
所需pom依赖- public class TopicConsumer_1 {
- public static void main(String[] args) throws IOException, TimeoutException {
- //获取连接
- Connection connection= RabbitMQConnectionUtils.getConection();
- //7.创建频道
- Channel channel=connection.createChannel();
- //8.声明队列
- channel.queueDeclare("topic_queue1",true,false,false,null);
- //9.接受消息
- //处理消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- /*
- *consumerTag: 消费者标签,用于标识特定的消费者。每个消费者都有一个唯一的标签,它可以用于取消订阅或标识消息是由哪个消费者处理的。
- * envelope: 信封对象,包含与消息传递相关的元数据,如交换机、路由键、传递标签等。
- * properties: AMQP 的基本属性,包含附加的消息属性,如消息持久性、优先级、时间戳等。
- * body: 消息的正文,以字节数组的形式提供。
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消息本身"+new String(body,"utf-8"));
- System.out.println("exchange"+envelope.getExchange());
- System.out.println("RoutingKey"+envelope.getRoutingKey());
- System.out.println("消息的序号"+envelope.getDeliveryTag());
- }
- };
- //监听器----->观察者设计模式
- channel.basicConsume("topic_queue1",true,consumer);
- //10.建议不要关闭资源 建议一直监听消息
- }
- }
复制代码 application相关配置- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
复制代码 相关配置类(消费者单独使用,可以不用添加配置,直接监听就行,直接使用注释监听器监听对应的队列即可,配置的话,则消费者也可当做生产者的身份,这样也就可以推送消息了)- spring:
- #给项目来个名字
- application:
- name: rabbitmq-provider
- #配置rabbitMq 服务器
- rabbitmq:
- host: localhost
- port: 5672
- username: chenbuting
- password: root
- #虚拟host 可以不设置,使用server默认host
- #virtual-host: JCcccHost
复制代码 创建消息监听- @Configuration
- public class DirectRabbitConfig {
- //队列 起名:TestDirectQueue
- @Bean
- public Queue TestDirectQueue() {
- return new Queue("TestDirectQueue",true);
- }
- //Direct交换机 起名:TestDirectExchange
- @Bean
- DirectExchange TestDirectExchange() {
- return new DirectExchange("TestDirectExchange");
- }
- //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
- @Bean
- Binding bindingDirect() {
- return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
- }
- }
复制代码 Topic Exchange主题交换机
生产者相关整合
相关配置类- @Component
- @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
- public class DirectReceiver {
-
- @RabbitHandler
- public void process(Map testMessage) {
- System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
- }
-
- }
复制代码 添加两个接口,用于将消息推送到交换机上- @Configuration
- public class TopicRabbitConfig {
- //绑定键
- public final static String man = "topic.man";
- public final static String woman = "topic.woman";
-
- @Bean
- public Queue firstQueue() {
- return new Queue(TopicRabbitConfig.man);
- }
-
- @Bean
- public Queue secondQueue() {
- return new Queue(TopicRabbitConfig.woman);
- }
-
- @Bean
- TopicExchange exchange() {
- return new TopicExchange("topicExchange");
- }
-
-
- //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
- //这样只要是消息携带的路由键是topic.man,才会分发到该队列
- @Bean
- Binding bindingExchangeMessage() {
- return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
- }
-
- //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
- // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
- @Bean
- Binding bindingExchangeMessage2() {
- return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
- }
-
- }
复制代码 消费者相关整合
创建TopicManReceiver- @GetMapping("/sendTopicMessage1")
- public String sendTopicMessage1() {
- String messageId = String.valueOf(UUID.randomUUID());
- String messageData = "message: M A N ";
- String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- Map<String, Object> manMap = new HashMap<>();
- manMap.put("messageId", messageId);
- manMap.put("messageData", messageData);
- manMap.put("createTime", createTime);
- rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
- return "ok";
- }
-
- @GetMapping("/sendTopicMessage2")
- public String sendTopicMessage2() {
- String messageId = String.valueOf(UUID.randomUUID());
- String messageData = "message: woman is all ";
- String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- Map<String, Object> womanMap = new HashMap<>();
- womanMap.put("messageId", messageId);
- womanMap.put("messageData", messageData);
- womanMap.put("createTime", createTime);
- rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
- return "ok";
- }
- }
复制代码 创建TopicTotalReceiver- @Component
- @RabbitListener(queues = "topic.man")
- public class TopicManReceiver {
-
- @RabbitHandler
- public void process(Map testMessage) {
- System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString());
- }
- }
复制代码 其他类似
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |