安装RabbitMQ
下载网站:https://www.rabbitmq.com/docs/install-windows
点击后,会直接定位到依赖介绍位置,告诉你需要安装Erlang
下载Erlang
Erlang也是一种编程语言,只是比较小众,但其拥有极为出色的性能
这个网站是到GitHub上下载的,可能需要点魔法,也可以去Erlang官网下载(能下,但慢)
下载RabbitMQ
下载Erlang的同时,也顺便下载RabbitMQ吧
大概直接利用别人下载好的包,比如我这提供的包
安装Erlang
运行下载好的exe文件
安装RabbitMQ
运行下载好的exe文件
安装插件
找到RabbitMQ目录下的sbin目录,打开CMD控制台,输入rabbitmq-plugins.bat enable rabbitmq_management命令
重启RabbitMQ服务后访问http://localhost:15672
默认账号暗码均为guest
利用RabbitMQ
- <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.24.0</version>
- </dependency>
复制代码 一对一队列模子
- /**
- * 一对一消息队列模型:生产者
- */
- public class SingleProducer {
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂,用于创建到RabbitMQ服务器的连接
- ConnectionFactory factory = new ConnectionFactory();
- // 设置RabbitMQ服务器地址
- factory.setHost("localhost");
- // 创建一个连接,用于和RabbitMQ服务器建立通信通道
- try (Connection connection = factory.newConnection();
- // 创建一个通道
- Channel channel = connection.createChannel()) {
- // 声明一个队列,队列名为hello
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- // 将消息发布到指定队列中
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
复制代码 运行后,RabbitMQ管理背景会增加一个队列

- /**
- * 一对一消息队列模型:消费者
- */
- public class SingleConsumer {
- // 定义我们正在监听的队列名称
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂并配置连接信息
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- // 从工厂中获取一个新的连接
- Connection connection = factory.newConnection();
- // 创建一个新的通道
- Channel channel = connection.createChannel();
- // 声明一个队列,在该通道中声明我们要监听的队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- // 创建一个回调函数,用于处理从队列中接收到的消息
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- // 获取消息体并转换为字符串
- String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
- System.out.println(" [x] Received '" + message + "'");
- };
- // 在通道上开始消费队列中的消息,接收的消息会传递给deliverCallback进行处理,会持续阻塞
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- }
- }
复制代码 未运行代码前
运行代码后
此时在不中断消耗者代码运行的情况下,再去运行生产者代码,会发现消耗者会连续消耗生产者增加的消息
一对多队列模子
- /**
- * 一对多消息队列:生产者
- */
- public class MultiProducer {
- private static final String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // durable参数设置为 true,服务器重启后队列不丢失
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- // 此处使用scanner循环输入消息,模拟生产者不定时多次生产
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String message = scanner.nextLine();
- // 指定 MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化
- channel.basicPublish("", TASK_QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
- }
复制代码 运行代码,可以模拟发送多条数据

- /**
- * 一对多消息队列:消费者
- */
- public class MultiConsumer {
- private static final String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- final Connection connection = factory.newConnection();
- final Channel channel = connection.createChannel();
- // 设置持久化
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- channel.basicQos(1);
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- try {
- System.out.println(" [x] Received '" + message + "'");
- doWork(message);
- } finally {
- System.out.println(" [x] Done");
- // 手动确认消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- };
- // 开始消费消息,传入队列名称。关闭自动确认,投递回调和消费者取消回调
- channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
- }
- // 模拟消息处理,消息中每有一个“.”就让线程暂停10s,模拟复杂的耗时工程
- private static void doWork(String task) {
- for (char ch : task.toCharArray()) {
- if (ch == '.') {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException _ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
复制代码 运行消耗者代码

队列中的消息不是一次性全部吸收的,而是需要等待消耗者完成消耗(处理完事件后)并自动确认完成后,才会继承发送下一条消息

这与一对多有什么关系呢?不急,我们停掉消耗者代码运行先,然后让生产者举行天生消息

然后,直接拷贝一份消耗者代码,命名为MultiConsumer2。此时运行两个消耗者看结果

会发现,两个消耗者会轮流吸收队列消息,消耗完(完成使命)后才会确认并吸收新的队列消息,直至队列所有消息被消耗完

下面交换机模子为初学者看法,可能存在理解错误,看看就好,我背面也会深入学习,但大概率不会再修改本文章,所以要专业的、正确的还是得看官方文档,这里会用就行,被误导了概不负责,谢谢
交换机模子
交换机是消息队列中的一个组件,有点雷同于中转站,上面两个模子都是生产者创建消息队列,然后由消耗者去吸收指定消息队列中的消息,而交换机模子中,生产者不再创建指定的消息队列,而是创建一个交换机,由消耗者去绑定交换机并创建消息队列,然后再吸收生产者的消息。由直接变间接。
这就有点像网络路由一样,最初,两台电脑要互发消息,就必须各自开一个网口连接网线,三台电脑要交互就各开两个网口,随着电脑接入的越多,一台电脑上要的网口就越多,网线交织也就越复杂,这时为了更好梳理网线和减少网口,就有了集线器、交互器、路由器等,而RabbitMQ中的交换机也是同样道理,为了方便管理多个消息队列及厥后续变更
交换机有direct, topic, headers 和 fanout四种类型,由于headers交换机的性能较差,不太保举利用,了解有该类型即可
Fanout交换机
fanout有扇出的意思,该类型交换机遇把消息一次性扇出(发布)给所有与该交换机绑定的消息队列,适用于广播消息,如更新文章后,广播消息给所有订阅文章的用户
- /**
- * 交换机模型:生产者
- */
- public class FanoutProducer {
- // 定义交换机的名称
- private static final String EXCHANGE_NAME = "exchange";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置工厂的主机地址
- factory.setHost("localhost");
- // 创建一个连接和通道
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // 声明一个交换机,交换机名称为exchange,类型为fanout
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 发布消息
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()){
- // 获取用户输入
- String message = scanner.nextLine();
- // 将消息发送到指定的交换机上,交换机名称为exchange,路由键为空,消息属性为null,消息内容为用户输入的字符串
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
- }
复制代码 此时运行生产者代码,并输入内容,会发现队列表中没有新增队列

但可以再Exchanges中看到名为exchange、类型为fanout的交换机信息

- /**
- * 交换机模型:消费者
- */
- public class FanoutConsumer {
- // 声明交换机的名称
- private static final String EXCHANGE_NAME = "exchange";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置工厂的主机地址
- factory.setHost("localhost");
- // 创建一个连接和通道
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明一个交换机,交换机名称为exchange,类型为fanout
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // String queueName = channel.queueDeclare().getQueue();
- // 创建队列,名称为fanout_queue,并绑定到交换机上
- String queueName = "yg1_queue";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, EXCHANGE_NAME, "");
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [员工1] Received '" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
- }
复制代码 运行消耗者代码,会发现队列表中新增了yg1_queue队列

此时拷贝多一份消耗者代码,并修改队列名为yg2_queue

运行后,队列表会新增yg2_queue队列

此时生产者发送消息后,会同时发送给两位消耗者举行处理

需要留意的是,假如生产者先发送消息,再创建消耗者,由于还没有创建存储的消息队列,所以是无法存储消息的,即消耗者无法吸收队列创建前的旧消息;但假如消耗者已经启动过一次了(RabbitMQ中已有其消息队列),那么生产者发送消息后再启动消耗者,还是能吸收到消息的;
就比如你没有QQ号,那么别人发送不了消息给你,当你创建好QQ号后,无论你是否上线,别人都能发送消息给你
以一对多队列模子为例
- 然后只运行生产者代码,会发现它直接创建好了一个消息队列
- 此时发送消息后再启动消耗者,消耗者是能吸收到队列消息的
由于消息只在消息队列中通报,交换机只是中间件。这里的生产者只创建交换机,不创建队列,队列有消耗者创建
为什么要用交换机呢? (个人理解)
打个不恰当的比喻:消息队列就是Q群,此时你有n个Q群,你要给每个群都发送一个拼手气红包,让群友去争抢;你自然可以手动一个一个去发,但更好的方式是选择采用脚本(交换机),通过脚本(交换机)去给该账号下的每个群(消息队列)都去发送一个拼手气红包。这样的利益在于,背面不论是有新群还是有群被解散,你都无需理会,你只需在意是否是自己q号上的群(是否绑定在交换机上)
Direct交换机
fanout就像AOE技能,无差别的范围攻击,而Direct就像是指定性单体技能,纵然有多个消息队列绑定在其上,也能根据路由键给指定消息队列发送消息,适用于指派使命,通过路由键分发使命给指定消息队列
- /**
- * direct交换机模型:生产者
- */
- public class DirectProducer {
- // 定义交换机名称
- private static final String EXCHANGE_NAME = "direct_exchange";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置连接工厂的主机地址为本地主机
- factory.setHost("localhost");
- // 建立连接并创建通道
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // 使用通道声明交换机,类型为direct
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()){
- // 读取用户输入内容,并以空格分隔
- String userInput = scanner.nextLine();
- String[] parts = userInput.split(" ");
- if(parts.length < 1){
- continue;
- }
- String message = parts[0];
- // 路由键,用于确定消息被路由到哪个队列
- String severity = parts[1];
- // 发布消息到交换机
- channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
- System.out.println(" [生产者] 发送 '" + severity + "':'" + message + "'");
- }
- }
- }
- }
复制代码 运行生产者代码,同样不会天生消息队列而是创建类型为direct的交换机

- /**
- * direct交换机模型:消费者
- */
- public class DirectConsumer {
- // 定义我们正在监听的交换机名称
- private static final String EXCHANGE_NAME = "direct_exchange";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置连接工厂的主机地址为本地主机
- factory.setHost("localhost");
- // 建立与 RabbitMQ 服务器的连接
- Connection connection = factory.newConnection();
- // 创建一个通道
- Channel channel = connection.createChannel();
- // 声明一个 direct 类型的交换机
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- // 声明一个匿名队列,并获取队列名称
- // String queueName = channel.queueDeclare().getQueue();
- // 手动声明队列名称
- String queueName = "cy_queue";
- // 创建队列,并设置队列的持久化属性为 true
- channel.queueDeclare(queueName, true, false, false, null);
- // 绑定队列到交换机上,并指定绑定路由键为“cy”
- channel.queueBind(queueName, EXCHANGE_NAME, "cy");
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- // 创建一个 DeliverCallback 实例来处理接收到的消息
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [残炎] 收到了 '" +
- delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- // 开始消费队列中的消息,设置自动确认消息已被消费
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
- });
- }
- }
复制代码 运行消耗者代码,会天生指定名称的队列

此时,生产者发送消息并指定路由键后,对应的消耗者会收到消息,而不属于它 的消息不会收到

同样的,拷贝一份消耗者代码并启动

生产者每次发送给不同目的发送消息,都会精准无误地转发到指定目的

其实上面也同样演示了一个题目,那就是发送消息给不存在的路由键目的,也就是还没拷贝第二份消耗者(fy)代码时生产者给fy发送的消息,是直接丢弃的。
direct就像是能输入详细发送目的类型的红包脚本,它答应我们自行选择要发送的目的群类型,而不是账号下的所有群,毕竟有些群可能是不干系的人拉你进的,群友与你没有任何瓜葛,你又何必给他们发呢?又大概你只想给自己所有的家属群发(只要你有标记哪些群是家属群)
direct能否给不同队列发送消息?
可以的,官网明确说了,不同消息队列答应绑定相同的路由键,而我们发送消息只关注路由键是否存在,并不在意有几个队列绑定在同一路由键上,所以我们可以将同类型的消息队列绑定在同一路由键上
Topic交换机
topic交换机与direct交换机类型,也是指定性,只不过它不再是单体指定,而是答应指定多个目的(留意,这里的目的指的是路由键而非详细的消息队列)。
- /**
- * topic交换机模型:生产者
- */
- public class TopicProducer {
- // 交换机名称
- private static final String EXCHANGE_NAME = "topic_exchange";
- public static void main(String[] argv) throws Exception {
- // 创建连接工厂并设置连接参数
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- // 建立连接并创建通道
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // 使用通道声明交换机,类型为topic
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()){
- String userInput = scanner.nextLine();
- String[] parts = userInput.split(" ");
- if (parts.length < 1) {
- continue;
- }
- String message = parts[0];
- String routingKey = parts[1];
- // 发布消息到指定的交换机和路由键
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
- System.out.println(" [生产者] 发送 '" + routingKey + "':'" + message + "'");
- }
- }
- }
- }
复制代码 运行生产者代码,同样不会天生消息队列而是创建类型为topic的交换机

- /**
- * topic交换机模型:消费者
- */
- public class TopicConsumer {
- // 定义监听的交换机名称
- private static final String EXCHANGE_NAME = "topic_exchange";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- // 创建连接和通道
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明使用的交换机,并指定类型为topic
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 创建cy队列
- String queueName = "cy_queue";
- // 声明队列,并设置队列未持久化、非排他、非自动删除
- channel.queueDeclare(queueName, true, false, false, null);
- // 绑定队列到交换机上,并指定路由键模式为“#.cy.#”
- channel.queueBind(queueName, EXCHANGE_NAME, "#.cy.#");
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [cy组] 收到 '" +
- delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
- });
- }
- }
复制代码 同样拷贝一份消耗者代码并运行

与direct对比,除了routing key采用格式不同外,表面上好像并没有太大的区别

但topic可以以组合的发送而direct并不能,如下,cy组和fy组可以以恣意情势组合,并发送对应消息给他们

这样,我们就可以一次性指定多个详细目的去处理指定消息
值得留意的一点,topic答应一个消息队列绑定多个绑定键,然后只要匹配其中一个即可收到消息
我们再拷贝一份消耗者代码,修改如下,这里我们绑定的路由键包含cy与fy

此时无论我们给cy发,还是fy发,cyfy组都能收到消息

且需要留意的是,纵然我们同时给cy和fy发消息(项目3 cy.fy),cyfy组都只会收到一条消息,不同出现重复吸收的情况

topic就像是比direct更高级的脚本(官方说的,topic是比direct更复杂),direct这个脚本只能指定一个群类型,假设我除了想给所有家属群发,还想给我创建的群发,大概我管理的群,很显然,这需要我重新设置目的类型并再一次启动脚本。

这自然没有任何题目,无非就是需要操作多次的题目,可究竟上,可能有些家属群是由我创建的大概由我管理的,那么就会出现一个题目,脚本会在某些群里发送多次红包,这很显然不符合我给指定目的群发送一个拼手气红包的目的。topic便是能处理这个题目的脚本,我能一次性设置家属群、我是群主、我是管理员三个类型,然后一次性给我Q号下满足这些类型的群发送一个拼手气红包,且实时有些群满足多个条件,也只会发送一个。
项目实战
项目中有很多种方法利用RabbitMQ,如利用官方的客户端,大概利用别人封装好的客户端,由于我的项目利用的 SpringBoot 框架,所以这里直接选择 Spring官方提供的封装好的客户端:Spring Boot RabbitMQ Starter
- <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.7.2</version>
- </dependency>
复制代码
- spring:
- rabbitmq:
- # rabbitmq 主机地址
- host: localhost
- # 端口
- port: 5672
- # 登录账号和密码
- username: guest
- password: guest
复制代码 在项目利用MQ前,需先创建好交换机和队列
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- /**
- * 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)
- */
- public class MqInitMain {
- public static void main(String[] args) {
- try{
- // 创建链接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- // 创建连接
- Connection connection = factory.newConnection();
- // 创建通道
- Channel channel = connection.createChannel();
- // 定义交换机的名称为“code_exchange”
- String EXCHAMGE_NAME = "code_exchange";
- // 声明交换机,并指定类型
- channel.exchangeDeclare(EXCHAMGE_NAME, "direct");
- // 创建队列,随机分配一个队列名称
- String queueName = "code_queue";
- // 声明队列,设置队列持久化、非独占、非自动删除、并传入额外参数为null
- channel.queueDeclare(queueName, true, false, false, null);
- // 将队列绑定到交换机,指定路由键为“my_routingKey”
- channel.queueBind(queueName, EXCHAMGE_NAME, "my_routingKey");
- }
- catch (Exception e){
- e.printStackTrace();
- }
- }
- }
复制代码 运行后关闭即可,RabbitMQ背景会增加对应交换机和队列

- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- @Component
- public class MyMessageProducer {
- @Resource
- private RabbitTemplate rabbitTemplate;
- /**
- * 发送消息
- * @param exchange 交换机名称,指定消息要发送到哪个交换机
- * @param routingKey 路由键,指定消息要根据什么规则路由到对应的队列
- * @param message 消息内容,要发生的具体消息
- */
- public void sendMessage(String exchange, String routingKey, Object message) {
- // 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)
- rabbitTemplate.convertAndSend(exchange, routingKey, message);
- }
- }
复制代码
- @Component
- @Slf4j
- public class MyMessageConsumer {
- /***
- *
- * @param message
- * @param channel
- * @param deliveryTag
- */
- @SneakyThrows
- // 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
- @RabbitListener(queues = {"code_queue"},ackMode = "MANUAL")
- public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
- log.info("接收到消息:{}", message);
- // deliveryTag是一个数字标识,在消息消费者接收到消息后用于向RabbitMQ确认消息的处理状态,告知该消息已经被消费,可以进行确认和从队列中删除。
- channel.basicAck(deliveryTag, false);
- }
- }
复制代码 需要留意,填写的队列名肯定得在RabbitMQ服务中存在,否则会直接报错
所以保举把队列名、交换机名、路由键给抽出来作为常数管理
- public interface BiMqConstant {
- // 交换机、队列、路由键信息
- String BI_EXCHANGE_NAME = "bi_exchange";
- String BI_QUEUE_NAME = "bi_queue";
- String BI_ROUTING_KEY = "bi_routingKey";
- }
复制代码 测试
- @SpringBootTest
- class MyMessageProducerTest {
- @Resource
- private MyMessageProducer myMessageProducer;
- @Test
- void sendMessage() {
- myMessageProducer.sendMessage("code_exchange", "my_routingKey", "Hello, RabbitMQ!");
- }
- }
复制代码 消耗者监听到队列有消息,就会吸收并处理
当我们路由键写错,RabbitMQ没有对应路由键,RabbitMQ会直接丢弃该消息
假如本项目没有对应消息队列的消耗者,那么也不会去消耗该消息队列的消息
项目中利用
- @Resource
- private BiMessageProducer biMessageProducer;
- public BaseResponse<BiResponse> genChartByAiAsyncMq(@RequestPart("file") MultipartFile multipartFile,
- GenChartByAiRequest genChartByAiRequest, HttpServletRequest request){
- // TODO 处理代码逻辑
- ......
- // 这里是先将用户信息处理好后,先存入到数据库中,并标准执行状态为等待
-
- // 在返回结果前先提交一个任务
- // 建议处理任务队列满之后,抛异常的情况(因为提交任务报错了,前端会返回异常)
- biMessageProducer.sendMessage(String.valueOf(newChartId));
- // TODO 处理代码逻辑
- ......
- return ResultUtils.success(biResponse);
- }
复制代码
- /**
- * 发送消息(固定发送目标)
- * @param message 消息内容,要发生的具体消息
- */
- public void sendMessage(Object message) {
- // 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)
- rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY, message);
- }
复制代码
- @SneakyThrows
- // 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
- @RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")
- public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
- log.info("接收到消息:{}", message);
- if (StringUtils.isBlank(message)){
- // 如果更多失败,拒绝当前消息,让消息重新进入队列
- channel.basicNack(deliveryTag,false,false);
- throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");
- }
- // TODP 进行业务逻辑处理
- ......
- // 在需要抛出异常处,即业务处理失败时,拒绝消息
- if (!updateResult){
- // 只要不成功,就拒绝消息
- channel.basicNack(deliveryTag,false,false);
- handleChartUpdateError(chart.getId(), "更新失败");
- }
- // 如果业务处理完了,就确认消息
- channel.basicAck(deliveryTag, false);
- }
复制代码 当用户提交一个请求后,会发送一个消息到RabbitMQ上。此时不会理会该消息是否被消耗,而是直接返回一个等待结果给用户;当有进程空闲时,消耗者会开始监听并处理该消息的业务,假如业务处理非常,则拒绝消息,让消息重回队列中等再次被人吸收;否则确认消息已被消耗。此时用户可在背景中看到请求已完成
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |