瑞星 发表于 2024-8-30 21:31:30

RabbitMQ二、RabbitMQ的六种模式

一、RabbitMQ的六种模式


[*]RabbitMQ共有六种工作模式:

[*]简单模式(Simple)
[*]工作队列模式(Work Queue)
[*]发布订阅模式(Publish/Subscribe)
[*]路由模式(Routing)
[*]通配符模式(Topics)
[*]远程调用模式(RPC,不常用,不对此模式进行讲解)

1、RabbitMQ的简单模式

https://img-blog.csdnimg.cn/direct/8427ad17e65e437583cafe5fed6ab3f3.png

[*]rabbitmq简单模式的特点:

[*]
[*]一个生产者对应一个消费者,通过队列进行消息转达。

[*]
[*]该模式使用direct互换机,direct互换机是RabbitMQ默认互换机。


JMS


[*]由于MQ产品很多,操作方式各有差别,于是JAVA提供了一套规则——JMS,用于操作消息中间件。
[*]JMS即Java消息服务(JavaMessage Service)应用步伐接口,是一个Java平台中关于面向消息中间件的API。
[*]JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,比方ActiveMQ等产品。

[*]RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

java操作rabbitmq的简单模式


[*]操作之前记得启动rabbitmq服务(rabbitmq的客户端不用开启也行)
docker start rabbitmq

[*]注意:启动rabbitmq的web客户端是为了能访问它的客户端界面
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第一步:创建项目(使用简单的maven项目即可)并添加RabbitMQ依靠

https://img-blog.csdnimg.cn/direct/e276c331ae1f4f60a5d09388c83c3036.png


[*]依靠
<dependencies>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
</dependency>
</dependencies>
第二步:生产者和消费者代码的编写



[*]创建生产者(producer)代码
package com.knife.demo01.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
      // 1.创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      connectionFactory.setHost("192.168.70.130"); // 虚拟机ip
      connectionFactory.setPort(5672);
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("admin");
      connectionFactory.setVirtualHost("/");

      // 2.创建连接
      Connection connection = connectionFactory.newConnection();

      // 3.建立信道
      Channel channel = connection.createChannel();

      // 4.创建队列,如果队列已存在,则使用该队列
      /**
         * 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         */
      channel.queueDeclare("simple_queue",false,false,false,null);

      // 5.发送消息
      String message = "hello!rabbitmq!";
      /**
         * 参数1:交换机名,""表示默认交换机direct
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
      channel.basicPublish("","simple_queue",null,message.getBytes());

      // 6.关闭信道和连接
      channel.close();
      connection.close();
      System.out.println("===发送成功===");
    }
}


运行生产者代码效果:
https://img-blog.csdnimg.cn/direct/36cf5a6de5a749e1a0e5c38fe8915321.png
https://img-blog.csdnimg.cn/direct/c8b1fbca97154db5b32ac7b740fdbd4b.png


[*]创建消费者(consumer)代码
package com.knife.demo01.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
      // 1.创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      connectionFactory.setHost("192.168.70.130"); // 虚拟机ip
      connectionFactory.setPort(5672); // 端口号
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("admin");
      connectionFactory.setVirtualHost("/");
      // 2.创建连接
      Connection connection = connectionFactory.newConnection();
      // 3.建立信道
      Channel channel = connection.createChannel();
      // 4.监听队列
      /**
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
      channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                接收消息
                String message = new String(body, "UTF-8");
                System.out.println("接受消息,消息为:"+message);
            }
      });
    }
}


运行消费者代码效果:
https://img-blog.csdnimg.cn/direct/55afa71fb0c849a0bc746305e64fd342.png
2、RabbitMQ的工作队列模式(相比简单模式,处理消息的消费者增多了)

https://img-blog.csdnimg.cn/direct/c8df8ce350ae49728e7e298c88c52cd0.png

[*] 工作队列模式(Work Queue)与简单模式相比,多了一些消费者,该模式也使用direct互换机(默认使用的互换机),应用于处理消息较多的环境。特点如下:

[*]
[*]一个队列对应多个消费者。

[*]
[*]一条消息只会被一个消费者消费。

[*]
[*]消息队列默认采取轮询的方式将消息均匀发送给消费者。

[*]
[*]在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系


[*] Work Queues 对于使命过重或使命较多环境使用工作队列可以提高使命处理的速度。比方:短信服务部署多个,只需要有一个节点成功发送即可
https://img-blog.csdnimg.cn/direct/f4519aa095bd48cb9372ef87433d7a9b.png
java操作rabbitmq的工作队列模式


[*]Work Queues 的入门步伐与上面简单模式的代码几乎是一样的。可以完全复制,只是比简单模式的消费者多几个而已,可以让多个消费者同时对消费消息的测试。
[*]在简单模式的基础下编写代码(直接用同一个项目)
https://img-blog.csdnimg.cn/direct/3934fa1868a347a0b5b022ceae2acd9b.png


[*]消息生产者代码:
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//      创建工厂连接
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130"); //虚拟机地址
      cf.setPort(5672); //rabbitmq的端口号
      cf.setUsername("admin"); // 用户名
      cf.setPassword("admin"); // 密码
      cf.setVirtualHost("/");

//      创建连接
      Connection connection = cf.newConnection();

//      创建信道
      Channel channel = connection.createChannel();

//      创建队列
      /**
         * 参数1:队列名称
         * 参数二:是否持久化
         * 参数三:是否私有化
         * 参数4:队列使用完毕后是否自动删除
         */
      channel.queueDeclare("work_queue",true,false,false,null);

//      发送消息
      /**
         * 参数1:交换机名,""表示默认交换机direct
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
      for (int i = 1; i <= 10; i++) {
            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
                  ("你好,这是今天的第"+i+"条消息").getBytes());
      }

//      关闭资源
      channel.close();
      connection.close();
      System.out.println("消息发送成功====");
    }
}



[*]消息消费者代码(多个消费者可以代码复用,复用下面的代码,多创几个类)
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();

      //3.创建信道
      Channel channel = conn.createChannel();

//      4. 接收消息
      channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("消费者1 = " + msg);
            }
      });

      channel.close();
      conn.close();
    }

}
代码运行的效果和上面简单模式的雷同。
3、RabbitMQ的发布订阅模式(相比工作队列模式,互换机使用的范例是Fanout(广播范例))

https://img-blog.csdnimg.cn/direct/58e06c0b45114ca29d2efd1dee54b7f7.png


[*]图讲授明

[*]P:生产者,向 Exchange 发送消息
[*]X:Exchange(互换机),接收生产者的消息,然后把消息递交给与互换机绑定的队列
[*]C1、C2:消费者,其所在队列要与互换机进行绑定


[*]在开发过程中,有一些消息需要差别的消费者进行差别的处理

[*]如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)

[*]在订阅模型中,多了一个Exchange角色,而且过程略有变化:

[*]Producer:生产者,也就是要发送消息的步伐,但是不再发送到队列中,而是发给X(互换机)
[*]Consumer:消费者,消息的接收者,会一直等候消息到来
[*]Queue:消息队列,接收消息、缓存消息
[*]Exchange:互换机(X)。一方面,接收生产者发送的消息。另一方面,知道如那边理消息,比方递交给某个特殊队列、递交给所有队列、或是将消息抛弃。到底如何操作,取决于Exchange的范例。Exchange有常见以下3种范例:

[*]Fanout:广播范例,将消息交给所有绑定到互换机的队列
[*]Direct:定向范例,把消息交给符合指定routing key 的队列
[*]Topic:通配符范例,把消息交给符合routing pattern(路由模式) 的队列,Exchange(互换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!


[*]特点:

[*]
[*]生产者(Producer)将消息发送给互换机,互换机将消息转发到绑定此互换机的每个队列中(即可以转发到多个队列中)。

[*]
[*]工作队列模式的互换机只能将消息发送给一个队列,发布订阅模式的互换性能将消息发送给多个队列。发布订阅模式使用fanout互换机。


java操作rabbitmq的发布订阅模式


[*]在简单模式的基础下编写代码(直接用同一个项目)
https://img-blog.csdnimg.cn/direct/d29e00ce5e014a73a29d8964ea8efdff.png
其实代码照旧差不多的,只是使用到的互换机不一样了而已。


[*]Producer
public class Producer {
    // 生产者
    public static void main(String[] args) throws IOException, TimeoutException {
      // 1.创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      connectionFactory.setHost("192.168.70.130");
      connectionFactory.setPort(5672);
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("admin");
      connectionFactory.setVirtualHost("/");
      // 2.创建连接
      Connection connection = connectionFactory.newConnection();
      // 3.建立信道
      Channel channel = connection.createChannel();
      // 4.创建交换机
      /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
//      BuiltinExchangeType.FANOUT:表示广播模式
      channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);

      // 5.创建队列
//      队列1
      channel.queueDeclare("SEND_MAIL", true, false, false, null);
//      队列2
      channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
//      队列3
      channel.queueDeclare("SEND_STATION", true, false, false, null);

      // 6.交换机绑定队列
      /**
         * 参数1:队列的名称
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
      channel.queueBind("SEND_MAIL", "exchange_fanout", "");
      channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");
      channel.queueBind("SEND_STATION", "exchange_fanout", "");

      // 7.发送消息
      channel.basicPublish("exchange_fanout", "", null,
                ("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
//      for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                  ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//      }

      // 8.关闭资源
      channel.close();
      connection.close();
    }
}



[*]ConsumerMail(消费者代码都雷同)
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();

      //4.监听队列
      channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送邮件消息 = " + msg);
            }
      });
    }
}



[*]ConsumerMesage
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送短信消息 = " + msg);
            }
      });
    }
}



[*]ConsumerStation
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送站內信 = " + msg);
            }
      });
    }
}

4、RabbitMQ的路由模式(相比发布订阅模式,多了个Route Key)


[*]使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。好比电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节流成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
https://img-blog.csdnimg.cn/direct/1700534a687b4349b8df48229c79e897.png


[*]图讲授明

[*]P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
[*]X:Exchange(互换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
[*]C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
[*]C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息


[*]队列与互换机的绑定,不能是恣意绑定了,而是要指定一个 RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey,Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全同等,才会接收到消息。
java操作rabbitmq的路由模式


[*]在简单模式的基础下编写代码(直接用同一个项目)
https://img-blog.csdnimg.cn/direct/d5b0d262984b44b2af2c79b7b3177df0.png
其实代码照旧差不多的,只是使用到的互换机不一样了,多了一个route Key。


[*]Producer
public class Producer {
    // 生产者
    public static void main(String[] args) throws IOException, TimeoutException {
      // 1.创建连接工厂
      ConnectionFactory connectionFactory = new ConnectionFactory();
      connectionFactory.setHost("192.168.70.130");
      connectionFactory.setPort(5672);
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("admin");
      connectionFactory.setVirtualHost("/");
      // 2.创建连接
      Connection connection = connectionFactory.newConnection();
      // 3.建立信道
      Channel channel = connection.createChannel();
      // 4.创建交换机
      /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
//      BuiltinExchangeType.DIRECT
      channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT, true);

      // 5.创建队列
//      队列1
      channel.queueDeclare("SEND_MAILRoute", true, false, false, null);
//      队列2
      channel.queueDeclare("SEND_MESSAGERoute", true, false, false, null);
//      队列3
      channel.queueDeclare("SEND_STATIONRoute", true, false, false, null);

      // 6.交换机绑定队列
      /**
         * 参数1:队列的名称
         * 参数2:交换机名
         * 参数3:路由关键字(路由名称)
         */
      channel.queueBind("SEND_MAILRoute", "routing_exchange", "import");
      channel.queueBind("SEND_MESSAGERoute", "routing_exchange", "import");
      channel.queueBind("SEND_STATIONRoute", "routing_exchange", "import");
      channel.queueBind("SEND_STATIONRoute", "routing_exchange", "normal");

      // 7.发送消息
      channel.basicPublish("routing_exchange", "import", null,
                ("618商品开抢了!").getBytes(StandardCharsets.UTF_8));

      channel.basicPublish("routing_exchange", "normal", null,
                ("normal路由的息").getBytes(StandardCharsets.UTF_8));
//      for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                  ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//      }

      // 8.关闭资源
      channel.close();
      connection.close();
    }
}



[*]ConsumerMail(消费者代码都雷同)
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();

      //4.监听队列
      channel.basicConsume("SEND_MAILRoute",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送邮件消息 = " + msg);
            }
      });
    }
}


[*]ConsumerMesage
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_MESSAGERoute",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送短信消息 = " + msg);
            }
      });
    }
}


[*]ConsumerStation
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.70.130");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_STATIONRoute",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送站內信 = " + msg);
            }
      });
    }
}
5、RabbitMQ的通配符模式(相比路由模式,路由的组成中带上了通配符)

https://img-blog.csdnimg.cn/direct/c80dc3a78e5445a5a602d5d713797515.png

[*]通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic互换机。
[*]通配符规则:

[*]
[*]消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。

[*]
[*]队列设置RoutingKey时,#可以匹配恣意多个单词,*可以匹配恣意一个单词。
https://img-blog.csdnimg.cn/direct/ed9e639621db49648ccdf4c69512a78e.png




[*]赤色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
[*]黄色 Queue:绑定的是 #.news ,因此凡是以 .news 末端的 routing key 都会被匹配
java操作rabbitmq的通配符模式


[*]在简单模式的基础下编写代码(直接用同一个项目)
https://img-blog.csdnimg.cn/direct/21395e52e3da470fa389099bdaef523e.png
其实代码照旧差不多的,只是route Key的组成多了通配符。


[*]Producer
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工厂
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.126.10");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.创建交换机
      /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
      channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);
      /**
         * 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数
         */

      //5.创建队列(举例订单给手机发送,邮箱,站内)发送消息,3个队列
      channel.queueDeclare("SEND_MAIL3", true, false, false, null);
      channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);
      channel.queueDeclare("SEND_STATION3", true, false, false, null);
      //6.将队列和交换机绑定
      /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
      channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");
      channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");
      channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");
      //8.发送消息
      channel.basicPublish("exchange_topic", "mail.message.station", null, "618大促销活动".getBytes(StandardCharsets.UTF_8));
      channel.basicPublish("exchange_topic", "station", null, "618小促销活动".getBytes(StandardCharsets.UTF_8));
      //9.关闭资源
      channel.close();
      conn.close();
      System.out.println("发送消息成功");
    }
}



[*]ConsumerMail(消费者代码都雷同)
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.126.10");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_MAIL3",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送邮件消息 = " + msg);
            }
      });
    }
}


[*]ConsumerMesage
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.126.10");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_MESSAGE3",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送短信消息 = " + msg);
            }
      });
    }
}


[*]ConsumerStation
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
      //1.创建连接工程
      ConnectionFactory cf = new ConnectionFactory();
      cf.setHost("192.168.126.10");
      cf.setPort(5672);
      cf.setUsername("admin");
      cf.setPassword("admin");
      cf.setVirtualHost("/");
      //2.创建连接
      Connection conn = cf.newConnection();
      //3.创建信道
      Channel channel = conn.createChannel();
      //4.监听队列
      channel.basicConsume("SEND_STATION3",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, StandardCharsets.UTF_8);
                System.out.println("发送站內信 = " + msg);
            }
      });
    }
}

RabbitMQ一、RabbitMQ的先容与安装(docker)
RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ二、RabbitMQ的六种模式