王國慶 发表于 2024-8-10 11:24:05

【RabbitMQ】利用手册

RabbitMQ

同步调用

长处:时效性强,期待到结果后才返回
缺点:扩展性差、性能下降(调用链越长耗时越久)、级联失败问题(一个调用点卡住,后面的链路都不能实行)
异步调用

异步调用通常是基于消息通知的方式,包含三个脚色:
消息发送者:投递消息的人,就是原来的调用者
消息吸收者:吸收和处置惩罚消息的人,就是原来的服务提供者
消息署理:管理、暂存、转发消息,你可以把它明确成微佩服务器
https://img-blog.csdnimg.cn/img_convert/63bcee3ef9649ae3dfe386ecc8faa47a.png
长处:
耦合度低,拓展性强
异步调用,无需期待,性能好
故障隔离,下游服务故障不影响上游业务
缓存消息,流量削峰填谷
缺点:
不能立即得到调用结果,时效性差
不确定下游业务实行是否乐成
业务安全依靠于Broker(消息署理)的可靠性
MQ技术选型

https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240620233034775.png&pos_id=img-x4xIcPdb-1720023956407
RabbitMQ安装


[*] 查找镜像:docker search rabbitmq
https://img-blog.csdnimg.cn/img_convert/eaa5de64577db102eb10bd2e7985a70f.png
[*] 拉取镜像:docker pull rabbitmq:3.8.19,指定拉取版本为3.18.19,如果不指定则默认拉取latest
https://img-blog.csdnimg.cn/img_convert/e0a1738d84693b16fee306f1ac1d3973.png
[*] 查看镜像:docker images
https://img-blog.csdnimg.cn/img_convert/4521f5f654d4c1d4799ebdea2fe1b2f3.png
[*] 启动镜像:设置账号登录为admin,登录暗码为admin,不指定镜像版本,默认启动rabbitmq:latest
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-v mq-plugins:/plugins \
--name mq \
--hostname localhost \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8.19


[*]15672是RabbitMQ的背景管理端口
[*]5672是RabbitMQ的AQMP端口
[*]/plugins是RabbitMQ容器存放插件的路径
https://img-blog.csdnimg.cn/img_convert/db600b5f8f9a68151a0d1992be835633.png

[*] 查看容器:docker ps
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622142819875.png&pos_id=img-xQRciIZN-1720023956408
[*] 进入RabbitMQ容器:docker exec -it 4df /bin/bash
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622143041949.png&pos_id=img-vTiCA7ml-1720023956409
[*] 开启RabbitMQ背景访问:rabbitmq-plugins enable rabbitmq_management
https://img-blog.csdnimg.cn/img_convert/ff5e58f616b60ecb74ecdf7ccf31223c.png
[*] 退出容器bash:exit
https://img-blog.csdnimg.cn/img_convert/1f8600fcd7a79dd1d1438f9987d89951.png
[*] 网页访问RabbitMQ背景:访问http://localhost:15672,账号admin,暗码admin
https://img-blog.csdnimg.cn/img_convert/edb4d1802a6988d88d02627a78f8629e.png
常见问题:

[*] 背景管理系统的可视化界面中出现:All stable feature flags must be enabled after completing an upgrade
**办理方案:**点击Admin -> Feature Flags,确保所有稳定的特性标志都是启用状态。如果有任何标志未启用,请将其启用。
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622214453608.png&pos_id=img-vsJlCegw-1720023956410
[*] 背景管理系统的可视化界面中出现:Stats in management UI are disabled on this node
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=%2FUsers%2Flichunyu%2FLibrary%2FApplication%2520Support%2Ftypora-user-images%2Fimage-20240622214703025.png&pos_id=img-KnMhElC7-1720023956410
**办理方案:**进入RabbitMQ容器,运行命令:echo management_agent.disable_metrics_collector=false>/etc/rabbitmq/conf.d/management_agent.disable_metrics_collector.conf,退出RabbitMQ容器,然后运行docker restart 容器id重启RabbitMQ容器。
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622215340267.png&pos_id=img-LXo7VCqz-1720023956410
[*] 背景管理系统的可视化界面中 Overview 不显示图形的问题
办理方案:同《2. 背景管理系统的可视化界面中出现:Stats in management UI are disabled on this node》
RabbitMQ介绍



[*] publisher:消息发送者
[*] comsumer:消息消费者
[*] queue:队列-存储消息
[*] exchange:互换机-吸收发送者发送的消息,并将消息路由到与其绑定的队列
[*] virtual-host:虚拟主机-将数据隔离(多个项目利用同一个RabbitMQ时,可以为每个项目创建一个virtual-host,将不同项目之间的exchange和queue隔离)
https://img-blog.csdnimg.cn/img_convert/3e79b6d2baa9bae0921a492fee09c5d6.png
Work Queues(任务模子)

任务模子简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。同一个消息只会被一个消费者处置惩罚。多个消费者绑定到一个队列,可以加快消息处置惩罚速度。
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622235716183.png&pos_id=img-qROY1uNO-1720023956411
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处置惩罚完消息,可能出现消息堆积。因此我们需要修改消费者的application.yml,设置preFetch值为1,确保同一时候最多投递给消费者1条消息,消费者处置惩罚完后再投递下一条消息。
https://img-blog.csdnimg.cn/img_convert/1bd154c40287d57425b2aa75d5e80045.png
Fanout互换机

Fanout Exchange 会将吸收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式。 Fanout互换机会将收到的消息复制成n份,然后将消息发送到n个与其绑定的队列中。
https://img-blog.csdnimg.cn/img_convert/4344779e5e12ccac854edf9e10c98517.png
应用场景:用户付出乐成后,生意业务服务更新订单状态,短佩服务通知用户,积分服务为用户增长积分。
实现:生意业务服务的queue、短佩服务的queue、积分服务的queue都绑定到Fanout互换机,用户付出乐成后,付出服务将消息发送到Fanout互换机,这样生意业务服务、短佩服务、积分服务九都能收到这条消息了。
案例演示:
实现思路:

[*]在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
[*]在RabbitMQ控制台中,声明互换机hmall.fanout,将两个队列与其绑定
[*]在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
[*]在publisher中编写测试方法,向hmall.fanout发送消息
代码实现:
https://img-blog.csdnimg.cn/img_convert/783e75422f0b7ece012de5171cb43a96.png
发送者:
@SpringBootTest
class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
      String exchangeName="hmall.fanout";
      String message="hello everyone";                                                    rabbitTemplate.convertAndSend(exchangeName,null,message);
    }
}
消费者:
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenerWorkQueue1(String message){
      log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenerWorkQueue2(String message){
      log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}
消费者输出:
https://img-blog.csdnimg.cn/img_convert/3a2717f5ab4fd1620ac1cb003e63480e.png
Direct互换机

Direct Exchange 会将吸收到的消息根据规则路由到指定的Queue,因此称为定向路由。


[*]每一个Queue都与Exchange设置一个Bindingkey(可以为每一个Queue指定相同的Bindingkey,实现和Fanout互换机相同的功能)。
[*]发布者发送消息时,指定消息的RoutingKey
[*]Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
https://img-blog.csdnimg.cn/img_convert/4b3419f67b3bbd2eb150bedd19a52edb.png
应用场景:用户取消后,只需要给生意业务服务发送消息,通知生意业务服务更新订单状态,而不需要给短佩服务和积分服务发送消息。
案例演示:
实现思路:

[*]在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
[*]在RabbitMQ控制台中,声明互换机hmall. direct,将两个队列与其绑定,routeKey 为blue时路由到direct.queue1,为yellow时路由到direct.queue2,为red时路由到direct.queue1和direct.queue2
[*]在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
[*]在publisher中编写测试方法,利用不同的RoutingKey向hmall. direct发送消息
代码实现:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623130217877.png&pos_id=img-k5J1uhUX-1720023956412
消费者
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "direct.queue1")
    public void listenerWorkQueue1(String message){
      log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }
    @RabbitListener(queues = "direct.queue2")
    public void listenerWorkQueue2(String message){
      log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}
发送者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.direct";
//消息
String message_blue="hello blue";
String message_yellow="hello yellow";
String message_red="hello red";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
rabbitTemplate.convertAndSend(exchangeName,"red",message_red);
}
消费者输出:
https://img-blog.csdnimg.cn/img_convert/61cc384ab39030392a72ac0e8d9a0696.png
Topic互换机

TopicExchange也是基于routingkey做消息路由,但是routingkey通常是多个单词的组合,而且以.分割。
Queue与Exchange指定routingkey时可以利用通配符:


[*]#:代指0个或多个单词
[*]*:代指一个单词
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623131430290.png&pos_id=img-Bwwkv4pQ-1720023956413
案例演示:
实现思路:

[*]在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
[*]在RabbitMQ控制台中,声明互换机hmall. topic,将两个队列与其绑定
[*]在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
[*]在publisher中编写测试方法,利用不同的RoutingKey向hmall. topic发送消息
代码实现:
https://img-blog.csdnimg.cn/img_convert/a2f31f819b64c619f1d7240f9c5a5619.png
消费者:
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "topic.queue1")
    public void listenerWorkQueue1(String message){
      log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }
    @RabbitListener(queues = "topic.queue2")
    public void listenerWorkQueue2(String message){
      log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}
发送者1:
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
      //交换机名称
      String exchangeName="hmall.topic";
      //消息
      String message="中国新闻";
      //发送消息
      rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }
消费者输出:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623133933171.png&pos_id=img-Dh9Licay-1720023956413
发送者2:
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
      //交换机名称
      String exchangeName="hmall.topic";
      //消息
      String message="中国天气";
      //发送消息
      rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
    }
消费者输出:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623134104501.png&pos_id=img-nR7y8fo3-1720023956414
AMQP

Advanced Message Queuing Protocol,是用于在应用步伐之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和吸收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
AmqpTemplate和RabbitTemplate

AmqpTemplate 是一个接口,定义了基本的 AMQP 操纵,如发送消息、吸收消息、转换消息等。它提供了与 AMQP(包括 RabbitMQ)通讯的基本功能的抽象。
RabbitTemplate 是 AmqpTemplate 的默认实现类,专门用于与 RabbitMQ 举行交互。它实现了 AmqpTemplate 接口,并提供了更多与 RabbitMQ 交互的具体功能和配置选项。
RabbitTemplate 比 AmqpTemplate 更加丰富,提供了一些额外的高级特性和配置选项,如事件支持、消息确认机制、消息转换器等。这些功能可以更好地满意与 RabbitMQ 高级交互需求。
综上所述,AmqpTemplate 是一个通用的 AMQP 操纵接口,而 RabbitTemplate 是对其的具体实现,提供了更多与 RabbitMQ 交互的功能和默认配置,使得在 Spring 应用中利用 RabbitMQ 变得更加简单和方便。
RabbitMQ利用

背景可视化界面操纵



[*] 创建用户
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622221803997.png&pos_id=img-NHcCyZyE-1720023956414
[*] 创建虚拟主机
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622221157604.png&pos_id=img-9uFKv1jh-1720023956414
[*] 为用户添加可访问的虚拟主机
https://img-blog.csdnimg.cn/img_convert/2273fdbe719e700d5c28d5b0cf1a9846.png
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240622222326991.png&pos_id=img-uFLo70oj-1720023956415
注意:当前登录用户默认有权限访问其创建的所有虚拟主机。
[*] 创建队列
https://img-blog.csdnimg.cn/img_convert/babca32ffe9b72cb2d4f6177e0b86841.png

[*] Durability:
Durable:长期化队列,Rabbit服务器重启后,这个队列还会存在
Transient:暂时队列,Rabbit服务器重启后,这个队列将会被删除

[*] 查看队列的消费者
https://img-blog.csdnimg.cn/img_convert/e5543f15fd7834b4389a345bb188e1e5.png
[*] 向队列中发布消息
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623001452004.png&pos_id=img-CNp0m8Tm-1720023956415
[*] 获取队列中消息
队列中可以存储消息。当队列中的消息未被消费时,消息将存储在队列中,此时可以查看队列中的消息。
https://img-blog.csdnimg.cn/img_convert/1830543a1a4b11e0b424f17d90895db3.png

[*] Act Mode:
Nack message requeue true:获取消息,但是不做ack应答确认,消息重新入队
Ack message requeue false:获取消息,应答确认,消息不重新入队,将会从队列中删除
reject requeue true:拒绝获取消息,消息重新入队
reject requeue false:拒绝获取消息,消息不重新入队,将会被删除
[*] Encoding:可以选择将消息举行base64编码
[*] Messages:从队列中获取的消息数量

[*] 清算消息
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623214725176.png&pos_id=img-kRVRNcSA-1720023956416
代码操纵


[*] 引入依靠
<!-- AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

[*] application.yaml中配置RabbitMQ
spring:
rabbitmq:
    host: 192.168.1.2 # RabbitMQ地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码

创建队列和互换机

SpringAMQP提供了几个类,用来声明队列、互换机及其绑定关系:


[*]Queue:用于声明队列,可以用工厂类QueueBuilder构建
[*]Exchange:用于声明互换机,可以用工厂类ExchangeBuilder构建
[*]Binding:用于声明队列和互换机的绑定关系,可以用工厂类BindingBuilder构建
https://img-blog.csdnimg.cn/img_convert/486bc3df588086f5dfcb10dd093097a0.png
如果已经存在互换机、队列、绑定关系,运行代码时则不会举行创建,而且也不会报错。
通常发送者只需要关心消息发送,消费者关心队列、互换机、以及绑定关系,所以创建操纵一样平常写在消费者中。
Sping提供了基于java bean和基于@RabbitListener注解两种方式创建。


[*]基于bean代码演示:
package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {
//声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
//      方式1
//      return new FanoutExchange("hmall.fanout");
//      方式2
      return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    }
//声明队列
    @Bean
    public Queue fanoutQueue1(){
//      方式1
//      return new Queue("fanout.queue1",true);
//      方式2
      return QueueBuilder.durable("fanout.queue1").build();
    }
//将队列和交换机绑定
    @Bean
    public Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue1'的bean作为参数传进来
      return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @Bean
    public Queue fanoutQueue2(){
      return QueueBuilder.durable("fanout.queue2").build();
    }
    @Bean
    public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){//spring会去找beanName='fanoutQueue2'的bean作为参数传进来
      return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}


[*]基于@RabbitListener注解代码演示:
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding( //将交换机和队列绑定
            value = @Queue(name="direct.queue1",durable = "true"), //如果没有队列direct.queue1则创建队列,并监听队列direct.queue1
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), //如果没有交换机hmall.direct则创建交换机
            key = {"blue","red"} //routingKey
            ))
    public void listenerWorkQueue1(String message){
      log.info("消费者1接收到消息:{},---{}",message, LocalTime.now());
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue2",durable = "true"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"yellow","red"}
    ))
    public void listenerWorkQueue2(String message){
      log.info("消费者2接收到消息:{},---{}",message,LocalTime.now());
    }
}
发送消息



[*] 直接发送给队列
方法:public void convertAndSend(String routingKey, final Object object),直接发给队列时,routingKey相称于队列名。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimp leQueue() {
//队列名称
String queueName = "simple. queue";
//消息
String message = "hello, spring amqp!";
//发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
注意:队列不显示绑定互换机时,默认还是会绑定到defalut exchange上
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623143726683.png&pos_id=img-ErZ7AJLZ-1720023956416
[*] 发送给Fanout Exchange
方法:public void convertAndSend(String exchange, String routingKey, final Object object),利用Fanout Exchange时,routingKey相称于队列名,发送给Fanout Exchange时,routingKey传null或""
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.fanout";
//消息
String message="hello everyone";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,null,message);
}

[*] 发送给direct互换机
方法:public void convertAndSend(String exchange, String routingKey, final Object object),routingKey就是互换机和队列绑定时的routingKey
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.direct";
//消息
String message_blue="hello blue";
String message_yellow="hello yellow";
String message_red="hello red";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue",message_blue);
rabbitTemplate.convertAndSend(exchangeName,"yellow",message_yellow);
rabbitTemplate.convertAndSend(exchangeName,"red",message_red);
}

[*] 发送给topic互换机
方法:方法:public void convertAndSend(String exchange, String routingKey, final Object object)
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//交换机名称
String exchangeName="hmall.topic";
//消息
String message="中国新闻";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}

吸收消息

@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "队列名")
    public void listenerSimpleQueue(String message){
      log.info("消费者收到消息:{}",message);
    }
}
配置消息转换器

convertAndSend方法会先将消息举行序列化,然后再发送。
Spring的对消息对象的处置惩罚是由org.springframework.amap.support.converter.Messageconverter来处置惩罚的。而
默认实现是SimpleMessageConverter,如果消息实现了Serializable接口,则会利用serialize方法举行序列化,而serialize方法是基于JDK的Objectoutputstream完成序列化的。存在下列问题:


[*]JDK的序列化有安全风险
[*]JDK序列化的消息太大
[*]JDK序列化的消息可读性差
https://img-blog.csdnimg.cn/img_convert/54ea794b2bdd3e39cb3e687464c92e52.png
建议采用JSON序列化取代默认的JDK序列化,要做两件事情:

[*] 在publisher和consumer中都要引入jackson依靠,发送者和消费者要利用相同的消息转换器:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

[*] 在publisher和consumer中都要配置MessageConverter:
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

测试:


[*] 利用默认的消息转换器
发送者:
package com.itheima.publisher;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.Serializable;

@SpringBootTest
class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
      User jack = new User("jack", 18);
      rabbitTemplate.convertAndSend("testConvertMessage", jack);
    }
}
@Data
@AllArgsConstructor
class User implements Serializable { //要实现Serializable接口,否则convertAndSend方法进行消息转换时会抛出异常
    private String name;
    private Integer age;
}
查看消息:
https://img-blog.csdnimg.cn/img_convert/8023c7a880af4eae932287970901bbc1.png
消费者:
package com.itheima.consumer.mq;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.Serializable;

@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "testConvertMessage")
    public void listenerWorkQueue(User message){
      log.info("消费者接收到消息:{}",message);
    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class User implements Serializable {
    private String name;
    private Integer age;
}
消费者输出:
https://img-blog.csdnimg.cn/img_convert/788af1dff3152ff535e458e72bf16882.png
[*] 配置消息转换器
发送者:
package com.itheima.publisher;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
      User jack = new User("jack", 18);
      rabbitTemplate.convertAndSend("testConvertMessage", jack);
    }
}
@Data
@AllArgsConstructor
class User {
    private String name;
    private Integer age;
}
查看消息:
https://img-blog.csdnimg.cn/img_convert/dfed493a7b439bc50ec5f16e61514d24.png
消费者:
package com.itheima.consumer.mq;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "testConvertMessage")
    public void listenerWorkQueue(User message){ //自动将json字符串转为User独享
      log.info("消费者接收到消息:{}",message);
    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor //消费者将消息转为User对象时,User对象一定要有空参构造器
class User {
    private String name;
    private Integer age;
}
消费者输出:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623155128402.png&pos_id=img-P9OSQIRz-1720023956417
消息可靠性

消息丢失三种情况:


[*]发送者到MQ服务器时消息丢失
[*]MQ服务器宕机导致消息丢失
[*]MQ服务器将消息发送给消费者时消息丢失
发送者的可靠性

发送者重连

有的时候由于网络颠簸,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制,默认重连机制是关闭的。
spring:
rabbitmq:
    connection-timeout: 1s #设置MQ的连接超时时间,超过1秒钟还没有连上MQ则表示连接超时
    template:
      retry:
      enabled: true # 开启超时重试机制
      initial-interval: 1000ms # 失败后的初始等待时间
      multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 initial-interval * multiplier
      max-attempts: 3 # 最大重试次数
案例演示:

[*] 停止MQ
https://img-blog.csdnimg.cn/img_convert/7bd8da20b628223c9bbd8a23734cb1ea.png
[*] 开启重连
spring:
rabbitmq:
    host: 192.168.1.2
    port: 5672
    virtual-host: /hmall
    username: jack
    password: jack
    connection-timeout: 1s
    template:
      retry:
      enabled: true
      initial-interval: 1000ms
      multiplier: 1
      max-attempts: 3

[*] 发送者发送消息
package com.itheima.publisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
      rabbitTemplate.convertAndSend("testConvertMessage", "你好");
    }
}

[*] 消息发送失败
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623173414691.png&pos_id=img-uMxw9T4S-1720023956418
注意:当网络不稳定的时候,利用重试机制可以有效进步消息发送的乐成率。不过SpringAMQP提供的重试机制是壅闭式的重试,也就是说多次重试期待的过程中,当前线程是被壅闭的,会影响业务性能,如果对于业务性能有要求,建议禁用重试机制。如果一定要利用,请合理配置期待时长和重试次数,固然也可以考虑利用异步线程来实行发送消息的代码。
发送者确认

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确人机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:


[*]消息投递到了MQ,但是MQ路由失败。此时会通过PublisherReturn返回路由非常原因,然后PublisherConfirm返回ACK,告知发送者投递乐成
[*]暂时消息投递到了MQ,而且入队乐成,PublisherConfirm返回ACK,告知投递乐成
[*]长期消息投递到了MQ,而且入队且完成长期化,PublisherConfirm返回ACK,告知投递乐成
[*]其它情况都会返回NACK,告知投递失败
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623174629789.png&pos_id=img-uPrjgIAc-1720023956418
开开导送者确认机制:

[*] 开启配置
spring:
rabbitmq:
    publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开局publisher return机制
这里publisher-confirm-type有三种模式可选:

[*] none:关闭confirm机制
[*] simple:同步壅闭期待MQ的回执消息
[*] correlated:MQ异步回调方式返回回执消息

[*] 为RabbitTemplate配置ReturnsCallback
每个RabbitTemplate只能配置一个ReturnsCallback,因此需要在项目启动过程中配置:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623205437418.png&pos_id=img-I2xlFGI7-1720023956418
[*] 每次发送消息时,指定消息ID、消息ConfirmCallback
https://img-blog.csdnimg.cn/img_convert/607b5f201b341e8bf373fba428fd2eba.png
案例演示:

[*] 开开导送者确认配置
spring:
rabbitmq:
    host: 192.168.1.2 # RabbitMQ地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码
    publisher-confirm-type: correlated # 开局publisher confirm机制,并设置confirm范例    publisher-returns: true # 开局publisher return机制
[*] 定义ReturnsCallback
package com.itheima.publisher;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;

@Configuration
@Slf4j
@AllArgsConstructor
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
      rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.info("监听到了消息return callback");
            log.info("exchange: {}", returnedMessage.getExchange());
            log.info("routingKey: {}", returnedMessage.getRoutingKey());
            log.info("message:{}", returnedMessage.getMessage());
            log.info("replyCode: {}", returnedMessage.getReplyCode());
            log.info("replyText: {}", returnedMessage.getReplyText());
      });
    }
}

[*] 定义ConfirmCallback并发送消息
3.1 发送乐成
package com.itheima.publisher;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@Slf4j
class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirmCallback() {
      //0. 创建CorrelationData,并设置消息ID
      CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
      cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("spring amqp 处理确认结果异常", ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()) {
                  log.info("收到ConfirmCallback ack,消息发送成功!");
                } else {
                  log.info("收到ConfirmCallback nack,消息发送失败!", result.getReason());
                }
            }
      });
      //1. 交换机名称
      String exchangeName = "hmall.direct";
      //2. 消息
      String message = "测试发送者确认";
      //3. 发送消息
      rabbitTemplate.convertAndSend(exchangeName, "blue", message, cd);
      //4. 此单元测试方法执行完,main线程就结束了,因此需要睡眠2s接收回调函数
      try {
            TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }
}
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623212257140.png&pos_id=img-nmLz3xjk-1720023956419
3.2 发送失败-路由失败
rabbitTemplate.convertAndSend(exchangeName, "blue22", message, cd);
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623212531629.png&pos_id=img-hYTvoGws-1720023956419
注意:发送者确认机制需要发送者和MQ举行确认,会大大影响消息发送的服从,通常情况下不建议开开导送者确认机制。
MQ的可靠性

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:


[*]一旦MQ宕机,内存中的消息会丢失
[*]内存空间有限,当消费者故障或处置惩罚过慢时,会导致消息积存,引发MQ壅闭
数据长期化

RabbitMQ实现数据长期化包括3个方面,设置为长期化后,重启MQ,互换机、队列、消息也不会丢失。


[*] 互换机长期化(新建互换机默认就是长期化)
https://img-blog.csdnimg.cn/img_convert/eca6d47a1caadfb9878718adadd4d34d.png
D表现长期化
https://img-blog.csdnimg.cn/img_convert/4e372103f3166a4b14bbcf2c6459da03.png
[*] 队列长期化(新建队列默认就是长期化)
https://img-blog.csdnimg.cn/img_convert/f1d7d301cbece09c86df190ba1091ee9.png
[*] 消息长期化(可视化界面发送消息时默认黑白长期化,SpringAmqp发送消息时默认是长期化的)
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623213936822.png&pos_id=img-wYOxqy3d-1720023956420
案例演示:
MQ吸收非长期化消息
发送者发送1百万条非长期化消息
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623221207869.png&pos_id=img-RLZpKloH-1720023956420
发送耗时:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623222816310.png&pos_id=img-LhLH4iCI-1720023956420
MQ收到了一百万条非长期化消息
注意:本测试利用的MQ是3.13.3,默认利用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表现存入磁盘且长期化的消息的数量)
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623221117585.png&pos_id=img-NSjdQmVM-1720023956421
重启MQ后,一百万条非长期化消息全部丢失
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623221416018.png&pos_id=img-8qZcs5m7-1720023956421
MQ吸收长期化消息
发送者发送1百万条长期化消息
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623222209093.png&pos_id=img-sPL7ROiB-1720023956421
发送耗时:
https://img-blog.csdnimg.cn/img_convert/3da9db6d2bff1e4c8e5a384934fcec99.png
MQ收到了一百万条长期化消息
注意:本测试利用的MQ是3.13.3,默认利用的是Lazy Queue模式:所有的消息直接存入磁盘,不再存储到内存,所以In memory显示1。(paged out代表的就是从内存移动到磁盘中的消息的数量,Persistent表现存入磁盘且长期化的消息的数量)
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623222150284.png&pos_id=img-LqmZvNn0-1720023956421
重启MQ后,一百万条长期化消息不会丢失
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623222334921.png&pos_id=img-LVGFJzAk-1720023956422
结论:
在吸收非长期化消息时,MQ收到消息后会先将消息存到内存中的队列中,队列满了之后会把先收到的消息存到磁盘中(这个行为称为paged out,paged out会导致MQ壅闭),然后再继承吸收消息,把消息存进内存中的队列中,队列满了之后再把队列中的消息存入磁盘中,以此类推。
在吸收长期化消息时,MQ会直接将消息存到磁盘中,不会等内存中的队列满了之后再将消息保存到磁盘中。
发送一万万条非长期化消息耗时:
https://img-blog.csdnimg.cn/img_convert/ae387dd8e04c22ca42a4bf2b949536ce.png
发送一万万条长期化消息耗时:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623223826459.png&pos_id=img-h1Bhy4Ps-1720023956422
从上面发送者发送一百万条消息的耗时来看,发送长期化消息比发送非长期化消息耗时更少(不需要paged out),而且长期化消息在MQ重启后不会丢失,所以建议发送长期化消息。
Lazy Queue

从RabbitMQ的3.6.0版本开始,就增长了Lazy Queue的概念,也就是惰性队列。
惰性队列的特性如下:


[*]吸收到消息后直接存入磁盘,不再存储到内存
[*]消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
3.12版本之前的MQ设置Lazy Queue模式有三种方式:


[*] 可视化界面设置
要设置一个队列为情性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
https://img-blog.csdnimg.cn/img_convert/92d9cd1958d1f84ce8c7a840e876366f.png
[*] Spring Bean方式设置
https://img-blog.csdnimg.cn/img_convert/7eec582b5a4cd7a279eb3cd49cf2defe.png
[*] 注解方式设置
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623225259397.png&pos_id=img-w7bClgPT-1720023956423
非Lazy Queue模式+长期化消息和Lazy Queue模式+长期化消息MQ吸收消息速度对比:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240623225516918.png&pos_id=img-eSpYtPzO-1720023956423
消费者的可靠性

消费者确认机制

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否乐成处置惩罚消息。MQ将一条消息发送给消费者后,MQ上的这条消息处置惩罚待确认状态,当消费者处置惩罚消息结束后,应该向RabbitMO发送一个回执,告知RabbitMQ自己消息处置惩罚状态:


[*]ack:乐成处置惩罚消息,RabbitMQ从队列中删除该消息
[*]nack:消息处置惩罚失败,RabbitMQ需要再次投递消息
[*]reject:消息处置惩罚失败并拒绝该消息,RabbitMQ从队列中删除该消息
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624001937778.png&pos_id=img-Rmug6CQA-1720023956423
SpringAMQP已经实现了消息确认功能。并答应我们通过配置文件选择ACK处置惩罚方式,有三种方式:


[*] none:不处置惩罚。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议利用
[*] manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
[*] auto:自动模式(默认模式)。SpringAMQP利用 AOP对我们的消息处置惩罚逻辑做了环绕增强,当业务正常实行时则自动返回ack。当业务出现非常时,根据非常判断返回不同结果:

[*]如果是业务非常(好比throw new RuntimeException),会自动返回nack
[*]如果是消息处置惩罚或校验非常,自动返回reject
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=..%2F..%2FLibrary%2FApplication%2520Support%2Ftypora-user-images%2Fimage-20240623231443489.png&pos_id=img-rIMIOuJb-1720023956423

案例演示-自动模式:

[*] 消费者配置
spring:
rabbitmq:
    host: 192.168.1.2 # RabbitMQ地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码
    listener:      simple:      prefetch: 1      acknowledge-mode: auto
[*] 消费者
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624004654368.png&pos_id=img-UiSpcPEi-1720023956423
[*] 发送者
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624004733266.png&pos_id=img-lr5dtgVJ-1720023956424
查看消息状态:
https://img-blog.csdnimg.cn/img_convert/ae54747f4d630f968f9d21253b06e60e.png

[*] 由于消费者抛出业务非常,所以会给MQ发送nack,然后MQ不停地向消费者投递消息
https://img-blog.csdnimg.cn/img_convert/324406aca1a3540779b5d5dee4278e43.png
查看消息内容
https://img-blog.csdnimg.cn/img_convert/541f93d8f6f5864b02a210fc80c9ce1f.png
   

[*]查看队列中的消息,提示队列是空的,所以得出结论:待确认的消息不保存在队列中

案例演示-手动模式:

[*] 消费者配置
spring:
rabbitmq:
    host: 192.168.1.2 # RabbitMQ地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码
    listener:      simple:      prefetch: 1      acknowledge-mode: manual
[*] 消费者
https://img-blog.csdnimg.cn/img_convert/b26fdfcafb2819e0480b3f8c68b3756e.png
[*] 发送者
3.1 发送者发送ackxxxx
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624233603263.png&pos_id=img-4rZQktdO-1720023956424
步伐会运行到消费者在21行的断点处,消费者输出
https://img-blog.csdnimg.cn/img_convert/9bbd15b29a868964a788d20acf8f8d9d.png
查看消息状态
https://img-blog.csdnimg.cn/img_convert/30d6e021bbe170b644bd6eec357e29c2.png
放行消费者21行断点,查看消息状态
https://img-blog.csdnimg.cn/img_convert/032f7fe9b4a38127db2468512cf0b1ea.png
3.2 发送者发送nackxxxx,步伐会运行到消费者在26行的断点处,消费者输出
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624235416211.png&pos_id=img-lnhufWIM-1720023956425
查看消息状态
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624235446318.png&pos_id=img-0H7srI9J-1720023956425
放行断点,消费者输出
https://img-blog.csdnimg.cn/img_convert/b03476d2c7343512dc293b60ca770cdc.png
查看消息状态,消息又被重新放回了队列,而且MQ又将消息投递给了消费者
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624235651450.png&pos_id=img-t3Nzyq3U-1720023956426
取消断点26行断点,消费者不停地输出,阐明MQ不停地向消费者举行消息重投
https://img-blog.csdnimg.cn/img_convert/1f4234ff63ff03fa5653e9f7568dd6fe.png
查看消息状态
https://img-blog.csdnimg.cn/img_convert/2039e77c7a9824f0f9d4b4b003024f0d.png
停掉消费者进程,查看消息状态
https://img-blog.csdnimg.cn/img_convert/bbc4ba0ed818143f692bd4571ecb7372.png
3.3 发送者发送xxxx,消费者停在30行的断点处,消费者输出
https://img-blog.csdnimg.cn/img_convert/f98ca4ec18285de9e61fd72af3ad1ec1.png
查看消息状态
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240625001059479.png&pos_id=img-JXDOeAQA-1720023956427
放行断点,查看消息状态
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240625001233037.png&pos_id=img-1F96M8x9-1720023956427
失败重试机制

SpringAMQP提供了消费者失败重试机制,在消费者出现非常时利用当地重试,而不是无穷的requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624003901201.png&pos_id=img-qRcPHxPj-1720023956427
案例演示:

[*]消费者配置
spring:
rabbitmq:
    host: 192.168.1.2 # RabbitMQ地址
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: jack # 用户名
    password: jack # 密码
    listener:      simple:      prefetch: 1      acknowledge-mode: auto      retry:          enabled: true          initial-interval: 1000ms          multiplier: 1          max-attempts: 3          stateless: true
[*] 消费者
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624010523176.png&pos_id=img-N6Yq6sPJ-1720023956427
[*] 发送者
https://img-blog.csdnimg.cn/img_convert/b66c5e4815d6d68280192651d0a1b50e.png
[*] 消费者输出
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624011143793.png&pos_id=img-AOPS8xY8-1720023956428
[*] 查看消息状态
https://img-blog.csdnimg.cn/img_convert/601d491e5fdc9154923546e7e5ae2ab1.png
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处置惩罚,它包含三种不同的实现:


[*]RejectAndDontRequeueRecoverer(默认):重试耗尽后,给MQ返回reject,MQ收到reject后会将消息丢弃。
[*]ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
[*]RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的互换机。
将失败处置惩罚策略改为RepublishMessageRecoverer:

[*] 首先,定义吸收失败消息的互换机、队列及其绑定关系。
[*] 然后,定义RepublishMessageRecoverer:
https://img-blog.csdnimg.cn/img_convert/3392138d0e7917e042241392c3632125.png
案例演示:

[*]定义吸收失败消息的互换机、队列、绑定关系、RepublishMessageRecoverer
https://img-blog.csdnimg.cn/img_convert/6371e7c33ba290d532e7e9a1862ca697.png

[*] 消费者
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624012751968.png&pos_id=img-fVO7MpUz-1720023956428
[*] 消费者输出
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624013013748.png&pos_id=img-i062lpnG-1720023956429
[*] 查看error.queue上的消息
https://img-blog.csdnimg.cn/img_convert/542aacee49ad9da2af9d86cec76eaf6e.png
业务幂等性

幂等是一个数学概念,用函数表达式来形貌是这样的:f(x)=f(f(x)),例如求绝对值的函数。在步伐开发中,则是指同一个业务,实行一次或多次对业务状态的影响是一致的。
https://img-blog.csdnimg.cn/img_convert/deb649a2695552e09171715ccf60cbaa.png
消除非幂等性的手段:


[*] 唯一消息id
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624014152269.png&pos_id=img-I2HqVs4s-1720023956429
案例演示:

[*] 配置消息转换器
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624014540109.png&pos_id=img-77MBMAUm-1720023956429
[*] 发送者发送消息
https://img-blog.csdnimg.cn/img_convert/bed92cbbc0ca37c3f4bce3f2b453590c.png
[*] 查看消息
https://img-blog.csdnimg.cn/img_convert/d968f59d6a07dd86c492f9717ff71e26.png
[*] 消费者利用Message吸收
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624015108951.png&pos_id=img-3wJR0Tzs-1720023956430


[*] 业务判断
https://img-blog.csdnimg.cn/img_convert/196c69dbd88bdb88e837694ae2aed6b7.png
延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才实行的任务
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624021057066.png&pos_id=img-JgOXUshb-1720023956430
死信互换机

当一个队列中的消息满意下列情况之一时,就会成为死信 (dead letter)


[*]消费者利用basic.reject或 basic.nack声明消费失败,而且消息的requeue参数设置为false
[*]消息是一个过期消息(到达了队列或消息本身设置的过期时间),超时无人消费
[*]要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个互换机,那么该队列中的死信就会投递到这个互换机中。这个互换机称为死信互换机(Dead Letter Exchange,简称DLX)
https://img-blog.csdnimg.cn/img_convert/548aae0439e7a9ac478cb217440c2e52.png
案例演示:

[*] 消费者中定义互换机和队列,并监听
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624022248280.png&pos_id=img-PTGuCUP3-1720023956431
[*] 定义互换机和队列,并将互换机dlx.direct声明为死信互换机,并与队列normal.queue绑定。
https://img-blog.csdnimg.cn/img_convert/782dfc210cf9c0436b06f01bcbcfc55b.png
[*] 查看队列、互换机、绑定关系
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624190201238.png&pos_id=img-6JiTzbit-1720023956431
https://img-blog.csdnimg.cn/img_convert/f602689779ecc6a2f42eb79e84a29142.png
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624190559133.png&pos_id=img-z0LMSOyN-1720023956431
[*] 发送者,发送消息时设置消息的殒命时间
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624191944431.png&pos_id=img-ogNnliTT-1720023956432
[*] 大约10s后消费者收到消息
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624192943620.png&pos_id=img-CoGPyXML-1720023956432
https://img-blog.csdnimg.cn/img_convert/ad61a4bdbda02124313ce225114d07b8.png
[*] 阐明
向互换机normal.direct中投递消息M,指定routingKey为hi,消息M的殒命时间设置为10s,消息M会被路由到队列normal.queue中,由于队列normal.queue没有消费者监听,可巧队列normal.queue绑定了死信互换机dlx.direct,所以投递到队列normal.queue的消息M殒命后,会被转投到死信互换机dlx.direct中,由于指定的routingKey为hi,所以死信互换机dlx.direct会将消息M路由到队列dlx.queue中,而队列dlx.queue有消费者C监听,所以消费者C会消费消息M。这就实现了消息M延迟10秒后被消费。
延迟消息插件

利用死信队列可以实现延迟消息,但这种方法过于繁琐。为了简化这一过程,RabbitMQ的官方推出了一款插件,该插件原生支持延迟消息功能。该插件的运作原理是计划了一种特别的互换机,当消息投递到这种互换机时,它能够暂存一段时间,直到到达设定的延迟时间后再将消息投递到相应的队列。这种计划大大简化了延迟消息的处置惩罚过程,进步了系统的服从和可靠性。
下载:
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择下载的版本,插件的版本要和RabbitMQ的版本保持一致


[*] 查看RabbitMQ的版本:
docker run -d --name containerId -p 5672:5672 rabbitmq:3-management
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624201442825.png&pos_id=img-qRUo1Z8L-1720023956432
[*] 选择3.13.x版本的插件
https://img-blog.csdnimg.cn/img_convert/696ab8a87330a7ca34f1235a4e33f68c.png
安装:


[*] 将插件复制到RabbitMQ容器中
docker cp 插件路径 容器ID或名称:/plugins/

[*] 安装
docker exec -it 容器ID或名称 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
https://img-blog.csdnimg.cn/img_convert/3a3f68231dd76c2f1f1ab827bff43456.png
利用:

[*] 创建延迟互换机,三种方式

[*] 图形化界面操纵
https://img-blog.csdnimg.cn/img_convert/06d6e36dab3b49f5734811b5a88e70b6.png
[*] 注解方式
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624204108827.png&pos_id=img-xJmML7Yz-1720023956433
[*] SpringBean方式
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624204146184.png&pos_id=img-di982nSw-1720023956433

[*] 查看延迟互换机
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624215210233.png&pos_id=img-NaimXz6l-1720023956433
[*] 发送消息时需要通过消息头x-delay来设置过期时间
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624204312276.png&pos_id=img-Gqw2JYL9-1720023956433
[*] 消费者大约10s后收到消息
https://img-blog.csdnimg.cn/img_convert/1e3ea35dec8f7cebfa51373a5560c50d.png
https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgitee.com%2Flcyorz%2Fmarkdown-image%2Fraw%2Fmaster%2Fjava-imgs%2Fimage-20240624215410231.png&pos_id=img-2fp4OeSY-1720023956434
本文参考文档

https://b11et3un53m.feishu.cn/wiki/A9SawKUxsikJ6dk3icacVWb4n3g
https://blog.csdn.net/karry_zzj/article/details/119513541
https://blog.csdn.net/weixin_42050545/article/details/121487823

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【RabbitMQ】利用手册