【RabbitMQ】RabbitMQ的下载安装及利用

打印 上一主题 下一主题

主题 907|帖子 907|积分 2721

安装RabbitMQ

下载网站:https://www.rabbitmq.com/docs/install-windows

点击后,会直接定位到依赖介绍位置,告诉你需要安装Erlang

下载Erlang

   Erlang也是一种编程语言,只是比较小众,但其拥有极为出色的性能
  

这个网站是到GitHub上下载的,可能需要点魔法,也可以去Erlang官网下载(能下,但慢)

下载RabbitMQ

下载Erlang的同时,也顺便下载RabbitMQ吧

   大概直接利用别人下载好的包,比如我这提供的包
  安装Erlang

运行下载好的exe文件


  • 点击Next即可



  • 选择安装路径,点击Next继承



  • 点击Install安装



  • 安装完后,点击Close即可

安装RabbitMQ

运行下载好的exe文件


  • 点击Next即可



  • 选择安装路径,点击Install安装



  • 安装乐成后,点击Next继承



  • 点击Finish完成安装

安装插件

   找到RabbitMQ目录下的sbin目录,打开CMD控制台,输入rabbitmq-plugins.bat enable rabbitmq_management命令
  

重启RabbitMQ服务后访问http://localhost:15672

   默认账号暗码均为guest
  

利用RabbitMQ



  • 导入依赖
  1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  2. <dependency>
  3.     <groupId>com.rabbitmq</groupId>
  4.     <artifactId>amqp-client</artifactId>
  5.     <version>5.24.0</version>
  6. </dependency>
复制代码
一对一队列模子



  • 生产者发送消息
  1. /**
  2. * 一对一消息队列模型:生产者
  3. */
  4. public class SingleProducer {
  5.     private final static String QUEUE_NAME = "hello";
  6.     public static void main(String[] argv) throws Exception {
  7.         // 创建连接工厂,用于创建到RabbitMQ服务器的连接
  8.         ConnectionFactory factory = new ConnectionFactory();
  9.         // 设置RabbitMQ服务器地址
  10.         factory.setHost("localhost");
  11.         // 创建一个连接,用于和RabbitMQ服务器建立通信通道
  12.         try (Connection connection = factory.newConnection();
  13.              // 创建一个通道
  14.              Channel channel = connection.createChannel()) {
  15.             // 声明一个队列,队列名为hello
  16.             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17.             String message = "Hello World!";
  18.             // 将消息发布到指定队列中
  19.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  20.             System.out.println(" [x] Sent '" + message + "'");
  21.         }
  22.     }
  23. }
复制代码
运行后,RabbitMQ管理背景会增加一个队列



  • 消耗者消耗消息
  1. /**
  2. * 一对一消息队列模型:消费者
  3. */
  4. public class SingleConsumer {
  5.    // 定义我们正在监听的队列名称
  6.     private final static String QUEUE_NAME = "hello";
  7.     public static void main(String[] argv) throws Exception {
  8.         // 创建连接工厂并配置连接信息
  9.         ConnectionFactory factory = new ConnectionFactory();
  10.         factory.setHost("localhost");
  11.         // 从工厂中获取一个新的连接
  12.         Connection connection = factory.newConnection();
  13.         // 创建一个新的通道
  14.         Channel channel = connection.createChannel();
  15.         // 声明一个队列,在该通道中声明我们要监听的队列
  16.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  18.         // 创建一个回调函数,用于处理从队列中接收到的消息
  19.         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  20.             // 获取消息体并转换为字符串
  21.             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  22.             System.out.println(" [x] Received '" + message + "'");
  23.         };
  24.         // 在通道上开始消费队列中的消息,接收的消息会传递给deliverCallback进行处理,会持续阻塞
  25.         channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  26.     }
  27. }
复制代码
未运行代码前

运行代码后

此时在不中断消耗者代码运行的情况下,再去运行生产者代码,会发现消耗者会连续消耗生产者增加的消息

一对多队列模子



  • 生产者发送消息
  1. /**
  2. * 一对多消息队列:生产者
  3. */
  4. public class MultiProducer {
  5.   private static final String TASK_QUEUE_NAME = "task_queue";
  6.   public static void main(String[] argv) throws Exception {
  7.     ConnectionFactory factory = new ConnectionFactory();
  8.     factory.setHost("localhost");
  9.     try (Connection connection = factory.newConnection();
  10.          Channel channel = connection.createChannel()) {
  11.         // durable参数设置为 true,服务器重启后队列不丢失
  12.          channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
  13.                 // 此处使用scanner循环输入消息,模拟生产者不定时多次生产
  14.         Scanner scanner = new Scanner(System.in);
  15.         while (scanner.hasNext()){
  16.             String message = scanner.nextLine();
  17.             // 指定 MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化
  18.             channel.basicPublish("", TASK_QUEUE_NAME,
  19.                     MessageProperties.PERSISTENT_TEXT_PLAIN,
  20.                     message.getBytes("UTF-8"));
  21.             System.out.println(" [x] Sent '" + message + "'");
  22.         }
  23.     }
  24.   }
  25. }
复制代码
运行代码,可以模拟发送多条数据



  • 消耗者消耗信息
  1. /**
  2. * 一对多消息队列:消费者
  3. */
  4. public class MultiConsumer {
  5.   private static final String TASK_QUEUE_NAME = "task_queue";
  6.   public static void main(String[] argv) throws Exception {
  7.     ConnectionFactory factory = new ConnectionFactory();
  8.     factory.setHost("localhost");
  9.     final Connection connection = factory.newConnection();
  10.     final Channel channel = connection.createChannel();
  11.       // 设置持久化
  12.     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
  13.     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  14.     channel.basicQos(1);
  15.     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  16.         String message = new String(delivery.getBody(), "UTF-8");
  17.         try {
  18.             System.out.println(" [x] Received '" + message + "'");
  19.             doWork(message);
  20.         } finally {
  21.             System.out.println(" [x] Done");
  22.             // 手动确认消息
  23.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  24.         }
  25.     };
  26.     // 开始消费消息,传入队列名称。关闭自动确认,投递回调和消费者取消回调
  27.     channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  28.   }
  29.   // 模拟消息处理,消息中每有一个“.”就让线程暂停10s,模拟复杂的耗时工程
  30.   private static void doWork(String task) {
  31.     for (char ch : task.toCharArray()) {
  32.         if (ch == '.') {
  33.             try {
  34.                 Thread.sleep(10000);
  35.             } catch (InterruptedException _ignored) {
  36.                 Thread.currentThread().interrupt();
  37.             }
  38.         }
  39.     }
  40.   }
  41. }
复制代码
运行消耗者代码

队列中的消息不是一次性全部吸收的,而是需要等待消耗者完成消耗(处理完事件后)并自动确认完成后,才会继承发送下一条消息

这与一对多有什么关系呢?不急,我们停掉消耗者代码运行先,然后让生产者举行天生消息

然后,直接拷贝一份消耗者代码,命名为MultiConsumer2。此时运行两个消耗者看结果

会发现,两个消耗者会轮流吸收队列消息,消耗完(完成使命)后才会确认并吸收新的队列消息,直至队列所有消息被消耗完

下面交换机模子为初学者看法,可能存在理解错误,看看就好,我背面也会深入学习,但大概率不会再修改本文章,所以要专业的、正确的还是得看官方文档,这里会用就行,被误导了概不负责,谢谢
交换机模子

交换机是消息队列中的一个组件,有点雷同于中转站,上面两个模子都是生产者创建消息队列,然后由消耗者去吸收指定消息队列中的消息,而交换机模子中,生产者不再创建指定的消息队列,而是创建一个交换机,由消耗者去绑定交换机并创建消息队列,然后再吸收生产者的消息。由直接变间接。
这就有点像网络路由一样,最初,两台电脑要互发消息,就必须各自开一个网口连接网线,三台电脑要交互就各开两个网口,随着电脑接入的越多,一台电脑上要的网口就越多,网线交织也就越复杂,这时为了更好梳理网线和减少网口,就有了集线器、交互器、路由器等,而RabbitMQ中的交换机也是同样道理,为了方便管理多个消息队列及厥后续变更

交换机有direct, topic, headers 和 fanout四种类型,由于headers交换机的性能较差,不太保举利用,了解有该类型即可
Fanout交换机

fanout有扇出的意思,该类型交换机遇把消息一次性扇出(发布)给所有与该交换机绑定的消息队列,适用于广播消息,如更新文章后,广播消息给所有订阅文章的用户



  • 生产者发送消息
  1. /**
  2. * 交换机模型:生产者
  3. */
  4. public class FanoutProducer {
  5.     // 定义交换机的名称
  6.     private static final String EXCHANGE_NAME = "exchange";
  7.     public static void main(String[] argv) throws Exception {
  8.         // 创建连接工厂
  9.         ConnectionFactory factory = new ConnectionFactory();
  10.         // 设置工厂的主机地址
  11.         factory.setHost("localhost");
  12.         // 创建一个连接和通道
  13.         try (Connection connection = factory.newConnection();
  14.              Channel channel = connection.createChannel()) {
  15.             //  声明一个交换机,交换机名称为exchange,类型为fanout
  16.             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  17.             // 发布消息
  18.             Scanner scanner = new Scanner(System.in);
  19.             while(scanner.hasNext()){
  20.                 // 获取用户输入
  21.                 String message = scanner.nextLine();
  22.                 // 将消息发送到指定的交换机上,交换机名称为exchange,路由键为空,消息属性为null,消息内容为用户输入的字符串
  23.                 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
  24.                 System.out.println(" [x] Sent '" + message + "'");
  25.             }
  26.         }
  27.     }
  28. }
复制代码
此时运行生产者代码,并输入内容,会发现队列表中没有新增队列

但可以再Exchanges中看到名为exchange、类型为fanout的交换机信息



  • 消耗者消耗消息
  1. /**
  2. * 交换机模型:消费者
  3. */
  4. public class FanoutConsumer {
  5.   // 声明交换机的名称
  6.   private static final String EXCHANGE_NAME = "exchange";
  7.   public static void main(String[] argv) throws Exception {
  8.     // 创建连接工厂
  9.     ConnectionFactory factory = new ConnectionFactory();
  10.     // 设置工厂的主机地址
  11.     factory.setHost("localhost");
  12.     // 创建一个连接和通道
  13.     Connection connection = factory.newConnection();
  14.     Channel channel = connection.createChannel();
  15.     // 声明一个交换机,交换机名称为exchange,类型为fanout
  16.     channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  17.     // String queueName = channel.queueDeclare().getQueue();
  18.     // 创建队列,名称为fanout_queue,并绑定到交换机上
  19.     String queueName = "yg1_queue";
  20.     channel.queueDeclare(queueName, true, false, false, null);
  21.     channel.queueBind(queueName, EXCHANGE_NAME, "");
  22.     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  23.     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  24.         String message = new String(delivery.getBody(), "UTF-8");
  25.         System.out.println(" [员工1] Received '" + message + "'");
  26.     };
  27.     channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  28.   }
  29. }
复制代码
运行消耗者代码,会发现队列表中新增了yg1_queue队列

此时拷贝多一份消耗者代码,并修改队列名为yg2_queue

运行后,队列表会新增yg2_queue队列

此时生产者发送消息后,会同时发送给两位消耗者举行处理

   需要留意的是,假如生产者先发送消息,再创建消耗者,由于还没有创建存储的消息队列,所以是无法存储消息的,即消耗者无法吸收队列创建前的旧消息;但假如消耗者已经启动过一次了(RabbitMQ中已有其消息队列),那么生产者发送消息后再启动消耗者,还是能吸收到消息的;
  就比如你没有QQ号,那么别人发送不了消息给你,当你创建好QQ号后,无论你是否上线,别人都能发送消息给你
  以一对多队列模子为例


  • 我们删除队列表中所有队列



  • 然后只运行生产者代码,会发现它直接创建好了一个消息队列



  • 此时发送消息后再启动消耗者,消耗者是能吸收到队列消息的

由于消息只在消息队列中通报,交换机只是中间件。这里的生产者只创建交换机,不创建队列,队列有消耗者创建


为什么要用交换机呢? (个人理解)
打个不恰当的比喻:消息队列就是Q群,此时你有n个Q群,你要给每个群都发送一个拼手气红包,让群友去争抢;你自然可以手动一个一个去发,但更好的方式是选择采用脚本(交换机),通过脚本(交换机)去给该账号下的每个群(消息队列)都去发送一个拼手气红包。这样的利益在于,背面不论是有新群还是有群被解散,你都无需理会,你只需在意是否是自己q号上的群(是否绑定在交换机上)
Direct交换机

fanout就像AOE技能,无差别的范围攻击,而Direct就像是指定性单体技能,纵然有多个消息队列绑定在其上,也能根据路由键给指定消息队列发送消息,适用于指派使命,通过路由键分发使命给指定消息队列


  • 生产者发送消息
  1. /**
  2. * direct交换机模型:生产者
  3. */
  4. public class DirectProducer {
  5.   // 定义交换机名称
  6.   private static final String EXCHANGE_NAME = "direct_exchange";
  7.   public static void main(String[] argv) throws Exception {
  8.     // 创建连接工厂
  9.     ConnectionFactory factory = new ConnectionFactory();
  10.     // 设置连接工厂的主机地址为本地主机
  11.     factory.setHost("localhost");
  12.     // 建立连接并创建通道
  13.     try (Connection connection = factory.newConnection();
  14.          Channel channel = connection.createChannel()) {
  15.         // 使用通道声明交换机,类型为direct
  16.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  17.         Scanner scanner = new Scanner(System.in);
  18.         while(scanner.hasNext()){
  19.             // 读取用户输入内容,并以空格分隔
  20.             String userInput = scanner.nextLine();
  21.             String[] parts = userInput.split(" ");
  22.             if(parts.length < 1){
  23.                 continue;
  24.             }
  25.             String message = parts[0];
  26.             // 路由键,用于确定消息被路由到哪个队列
  27.             String severity = parts[1];
  28.             // 发布消息到交换机
  29.             channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
  30.             System.out.println(" [生产者] 发送 '" + severity + "':'" + message + "'");
  31.         }
  32.     }
  33.   }
  34. }
复制代码
运行生产者代码,同样不会天生消息队列而是创建类型为direct的交换机



  • 消耗者消耗消息
  1. /**
  2. * direct交换机模型:消费者
  3. */
  4. public class DirectConsumer {
  5.     // 定义我们正在监听的交换机名称
  6.     private static final String EXCHANGE_NAME = "direct_exchange";
  7.     public static void main(String[] argv) throws Exception {
  8.         // 创建连接工厂
  9.         ConnectionFactory factory = new ConnectionFactory();
  10.         // 设置连接工厂的主机地址为本地主机
  11.         factory.setHost("localhost");
  12.         // 建立与 RabbitMQ 服务器的连接
  13.         Connection connection = factory.newConnection();
  14.         // 创建一个通道
  15.         Channel channel = connection.createChannel();
  16.         // 声明一个 direct 类型的交换机
  17.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  18.         // 声明一个匿名队列,并获取队列名称
  19.         // String queueName = channel.queueDeclare().getQueue();
  20.         // 手动声明队列名称
  21.         String queueName = "cy_queue";
  22.         // 创建队列,并设置队列的持久化属性为 true
  23.         channel.queueDeclare(queueName, true, false, false, null);
  24.         // 绑定队列到交换机上,并指定绑定路由键为“cy”
  25.         channel.queueBind(queueName, EXCHANGE_NAME, "cy");
  26.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  27.         // 创建一个 DeliverCallback 实例来处理接收到的消息
  28.         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  29.             String message = new String(delivery.getBody(), "UTF-8");
  30.             System.out.println(" [残炎] 收到了 '" +
  31.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  32.         };
  33.         // 开始消费队列中的消息,设置自动确认消息已被消费
  34.         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
  35.         });
  36.     }
  37. }
复制代码
运行消耗者代码,会天生指定名称的队列

此时,生产者发送消息并指定路由键后,对应的消耗者会收到消息,而不属于它 的消息不会收到

同样的,拷贝一份消耗者代码并启动

生产者每次发送给不同目的发送消息,都会精准无误地转发到指定目的

其实上面也同样演示了一个题目,那就是发送消息给不存在的路由键目的,也就是还没拷贝第二份消耗者(fy)代码时生产者给fy发送的消息,是直接丢弃的。
direct就像是能输入详细发送目的类型的红包脚本,它答应我们自行选择要发送的目的群类型,而不是账号下的所有群,毕竟有些群可能是不干系的人拉你进的,群友与你没有任何瓜葛,你又何必给他们发呢?又大概你只想给自己所有的家属群发(只要你有标记哪些群是家属群)
direct能否给不同队列发送消息?
可以的,官网明确说了,不同消息队列答应绑定相同的路由键,而我们发送消息只关注路由键是否存在,并不在意有几个队列绑定在同一路由键上,所以我们可以将同类型的消息队列绑定在同一路由键上

Topic交换机

topic交换机与direct交换机类型,也是指定性,只不过它不再是单体指定,而是答应指定多个目的(留意,这里的目的指的是路由键而非详细的消息队列)。


  • 生产者发送消息
  1. /**
  2. * topic交换机模型:生产者
  3. */
  4. public class TopicProducer {
  5.     // 交换机名称
  6.     private static final String EXCHANGE_NAME = "topic_exchange";
  7.     public static void main(String[] argv) throws Exception {
  8.         // 创建连接工厂并设置连接参数
  9.         ConnectionFactory factory = new ConnectionFactory();
  10.         factory.setHost("localhost");
  11.         // 建立连接并创建通道
  12.         try (Connection connection = factory.newConnection();
  13.              Channel channel = connection.createChannel()) {
  14.             // 使用通道声明交换机,类型为topic
  15.             channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  16.             Scanner scanner = new Scanner(System.in);
  17.             while (scanner.hasNext()){
  18.                 String userInput = scanner.nextLine();
  19.                 String[] parts = userInput.split(" ");
  20.                 if (parts.length < 1) {
  21.                     continue;
  22.                 }
  23.                 String message = parts[0];
  24.                 String routingKey = parts[1];
  25.                 // 发布消息到指定的交换机和路由键
  26.                 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
  27.                 System.out.println(" [生产者] 发送 '" + routingKey + "':'" + message + "'");
  28.             }
  29.         }
  30.     }
  31. }
复制代码
运行生产者代码,同样不会天生消息队列而是创建类型为topic的交换机



  • 消耗者消耗消息
  1. /**
  2. * topic交换机模型:消费者
  3. */
  4. public class TopicConsumer {
  5.     // 定义监听的交换机名称
  6.     private static final String EXCHANGE_NAME = "topic_exchange";
  7.     public static void main(String[] argv) throws Exception {
  8.         ConnectionFactory factory = new ConnectionFactory();
  9.         factory.setHost("localhost");
  10.         // 创建连接和通道
  11.         Connection connection = factory.newConnection();
  12.         Channel channel = connection.createChannel();
  13.         // 声明使用的交换机,并指定类型为topic
  14.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  15.         // 创建cy队列
  16.         String queueName = "cy_queue";
  17.         // 声明队列,并设置队列未持久化、非排他、非自动删除
  18.         channel.queueDeclare(queueName, true, false, false, null);
  19.         // 绑定队列到交换机上,并指定路由键模式为“#.cy.#”
  20.         channel.queueBind(queueName, EXCHANGE_NAME, "#.cy.#");
  21.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22.         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23.             String message = new String(delivery.getBody(), "UTF-8");
  24.             System.out.println(" [cy组] 收到 '" +
  25.                     delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  26.         };
  27.         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
  28.         });
  29.     }
  30. }
复制代码
同样拷贝一份消耗者代码并运行

与direct对比,除了routing key采用格式不同外,表面上好像并没有太大的区别

但topic可以以组合的发送而direct并不能,如下,cy组和fy组可以以恣意情势组合,并发送对应消息给他们

这样,我们就可以一次性指定多个详细目的去处理指定消息
值得留意的一点,topic答应一个消息队列绑定多个绑定键,然后只要匹配其中一个即可收到消息
我们再拷贝一份消耗者代码,修改如下,这里我们绑定的路由键包含cy与fy

此时无论我们给cy发,还是fy发,cyfy组都能收到消息

且需要留意的是,纵然我们同时给cy和fy发消息(项目3 cy.fy),cyfy组都只会收到一条消息,不同出现重复吸收的情况

topic就像是比direct更高级的脚本(官方说的,topic是比direct更复杂),direct这个脚本只能指定一个群类型,假设我除了想给所有家属群发,还想给我创建的群发,大概我管理的群,很显然,这需要我重新设置目的类型并再一次启动脚本。

这自然没有任何题目,无非就是需要操作多次的题目,可究竟上,可能有些家属群是由我创建的大概由我管理的,那么就会出现一个题目,脚本会在某些群里发送多次红包,这很显然不符合我给指定目的群发送一个拼手气红包的目的。topic便是能处理这个题目的脚本,我能一次性设置家属群、我是群主、我是管理员三个类型,然后一次性给我Q号下满足这些类型的群发送一个拼手气红包,且实时有些群满足多个条件,也只会发送一个。
项目实战

项目中有很多种方法利用RabbitMQ,如利用官方的客户端,大概利用别人封装好的客户端,由于我的项目利用的 SpringBoot 框架,所以这里直接选择 Spring官方提供的封装好的客户端:Spring Boot RabbitMQ Starter


  • 导入依赖
  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5.     <version>2.7.2</version>
  6. </dependency>
复制代码


  • yml配置
  1. spring:
  2.     rabbitmq:
  3.       # rabbitmq 主机地址
  4.       host: localhost
  5.       # 端口
  6.       port: 5672
  7.       # 登录账号和密码
  8.       username: guest
  9.       password: guest
复制代码
在项目利用MQ前,需先创建好交换机和队列


  • MqInitMain
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. /**
  5. * 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)
  6. */
  7. public class MqInitMain {
  8.     public static void main(String[] args) {
  9.         try{
  10.             // 创建链接工厂
  11.             ConnectionFactory factory = new ConnectionFactory();
  12.             factory.setHost("localhost");
  13.             // 创建连接
  14.             Connection connection = factory.newConnection();
  15.             // 创建通道
  16.             Channel channel = connection.createChannel();
  17.             // 定义交换机的名称为“code_exchange”
  18.             String EXCHAMGE_NAME = "code_exchange";
  19.             // 声明交换机,并指定类型
  20.             channel.exchangeDeclare(EXCHAMGE_NAME, "direct");
  21.             // 创建队列,随机分配一个队列名称
  22.             String queueName = "code_queue";
  23.             // 声明队列,设置队列持久化、非独占、非自动删除、并传入额外参数为null
  24.             channel.queueDeclare(queueName, true, false, false, null);
  25.             // 将队列绑定到交换机,指定路由键为“my_routingKey”
  26.             channel.queueBind(queueName, EXCHAMGE_NAME, "my_routingKey");
  27.         }
  28.        catch (Exception e){
  29.             e.printStackTrace();
  30.         }
  31.     }
  32. }
复制代码
运行后关闭即可,RabbitMQ背景会增加对应交换机和队列



  • 生产者:MyMessageProducer
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.stereotype.Component;
  3. import javax.annotation.Resource;
  4. @Component
  5. public class MyMessageProducer {
  6.     @Resource
  7.     private RabbitTemplate rabbitTemplate;
  8.     /**
  9.      * 发送消息
  10.      * @param exchange  交换机名称,指定消息要发送到哪个交换机
  11.      * @param routingKey 路由键,指定消息要根据什么规则路由到对应的队列
  12.      * @param message 消息内容,要发生的具体消息
  13.      */
  14.     public void sendMessage(String exchange, String routingKey, Object message) {
  15.         // 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)
  16.         rabbitTemplate.convertAndSend(exchange, routingKey, message);
  17.     }
  18. }
复制代码


  • 消耗者:MyMessageConsumer
  1. @Component
  2. @Slf4j
  3. public class MyMessageConsumer {
  4.     /***
  5.      *
  6.      * @param message
  7.      * @param channel
  8.      * @param deliveryTag
  9.      */
  10.     @SneakyThrows
  11.     // 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
  12.     @RabbitListener(queues = {"code_queue"},ackMode = "MANUAL")
  13.     public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
  14.         log.info("接收到消息:{}", message);
  15.         // deliveryTag是一个数字标识,在消息消费者接收到消息后用于向RabbitMQ确认消息的处理状态,告知该消息已经被消费,可以进行确认和从队列中删除。
  16.         channel.basicAck(deliveryTag, false);
  17.     }
  18. }
复制代码
需要留意,填写的队列名肯定得在RabbitMQ服务中存在,否则会直接报错

所以保举把队列名、交换机名、路由键给抽出来作为常数管理
  1. public interface BiMqConstant {
  2.     // 交换机、队列、路由键信息
  3.     String BI_EXCHANGE_NAME = "bi_exchange";
  4.     String BI_QUEUE_NAME = "bi_queue";
  5.     String BI_ROUTING_KEY = "bi_routingKey";
  6. }
复制代码
测试
  1. @SpringBootTest
  2. class MyMessageProducerTest {
  3.     @Resource
  4.     private MyMessageProducer myMessageProducer;
  5.     @Test
  6.     void sendMessage() {
  7.         myMessageProducer.sendMessage("code_exchange", "my_routingKey", "Hello, RabbitMQ!");
  8.     }
  9. }
复制代码
消耗者监听到队列有消息,就会吸收并处理

当我们路由键写错,RabbitMQ没有对应路由键,RabbitMQ会直接丢弃该消息

假如本项目没有对应消息队列的消耗者,那么也不会去消耗该消息队列的消息
项目中利用


  • Controller/server层
  1. @Resource
  2. private BiMessageProducer biMessageProducer;
  3. public BaseResponse<BiResponse> genChartByAiAsyncMq(@RequestPart("file") MultipartFile multipartFile,
  4.                                                   GenChartByAiRequest genChartByAiRequest, HttpServletRequest request){
  5.     // TODO 处理代码逻辑
  6.     ......
  7.     // 这里是先将用户信息处理好后,先存入到数据库中,并标准执行状态为等待
  8.         
  9.     // 在返回结果前先提交一个任务
  10.     // 建议处理任务队列满之后,抛异常的情况(因为提交任务报错了,前端会返回异常)
  11.     biMessageProducer.sendMessage(String.valueOf(newChartId));
  12.     // TODO 处理代码逻辑
  13.     ......
  14.     return ResultUtils.success(biResponse);
  15. }
复制代码


  • BiMessageProducer
  1. /**
  2. * 发送消息(固定发送目标)
  3. * @param message 消息内容,要发生的具体消息
  4. */
  5. public void sendMessage(Object message) {
  6.     // 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)
  7.     rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY, message);
  8. }
复制代码


  • BiMessageConsumer
  1. @SneakyThrows
  2. // 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
  3. @RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")
  4. public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
  5.     log.info("接收到消息:{}", message);
  6.     if (StringUtils.isBlank(message)){
  7.         // 如果更多失败,拒绝当前消息,让消息重新进入队列
  8.         channel.basicNack(deliveryTag,false,false);
  9.         throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");
  10.     }
  11.     // TODP 进行业务逻辑处理
  12.     ......
  13.     // 在需要抛出异常处,即业务处理失败时,拒绝消息
  14.     if (!updateResult){
  15.         // 只要不成功,就拒绝消息
  16.         channel.basicNack(deliveryTag,false,false);
  17.         handleChartUpdateError(chart.getId(), "更新失败");
  18.     }
  19.     // 如果业务处理完了,就确认消息
  20.     channel.basicAck(deliveryTag, false);
  21. }
复制代码
当用户提交一个请求后,会发送一个消息到RabbitMQ上。此时不会理会该消息是否被消耗,而是直接返回一个等待结果给用户;当有进程空闲时,消耗者会开始监听并处理该消息的业务,假如业务处理非常,则拒绝消息,让消息重回队列中等再次被人吸收;否则确认消息已被消耗。此时用户可在背景中看到请求已完成

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表