【MQ】Spring3 中 RabbitMQ 的利用与常见场景

打印 上一主题 下一主题

主题 989|帖子 989|积分 2967

一、初识 MQ

传统的单体架构,分布式架构的同步调用里,无论是方法调用,还是 OpenFeign 难免会有以下问题:

  • 扩展性差(高耦合,需要依赖对应的服务,同样的事件,不停有新需求,这个事件的业务代码会越来越臃肿,这些子业务都写在一起)
  • 性能下降(等待相应,终极整个业务的相应时长就是每次长途调用的实行时长之和)
  • 级联失败(如果是一个事件,一个服务失败就会导致全部回滚,若是分布式事件就更加贫困了,但实在一些行为的失败不应该导致团体回滚)
  • 服务宕机(如果服务调用者未考虑服务提供者的性能,导致提供者由于过度请求而宕机)
但如果不是很要求同步调用,实在也可以用异步调用,如果是单体架构,你可能很快能想到一个办理方案,就是壅闭队列实现消息通知:

但是在分布式架构下,可能就需要一个中间件级别的壅闭队列,这就是我们要学习的 Message Queue 消息队列,简称 MQ,而如今流行的 MQ 还不少,在实现其根本的消息通知功能外,尚有一些不错的扩展
以 RabbitMQ 和 Kafka 为例:
RabbitMQKafka公司/社区RabbitApache开发语言ErlangScala & Java协议支持AMQP,XMPP,SMTP,STOMP自定义协议可用性高高单机吞吐量一样平常非常高(Kafka 亮点)消息延迟微秒级毫秒以内消息可靠性高一样平常 消息延迟指的是,消息到队列,并在队列中“就绪”的时间与预期时间的差距,实在就是数据在中间件中流动的耗时,预期时间可以是如今、几毫秒后、几秒后、几天后…
据统计,目前国内消息队列利用最多的还是 RabbitMQ,再加上其各方面都比较均衡,稳固性也好,因此我们讲堂上选择 RabbitMQ 来学习。
二、RabbitMQ 安装

Docker 安装 RabbitMQ:
  1. mkdir /root/mq
  2. cd /root/mq
  3. docker rm mq-server -f
  4. docker rmi rabbitmq:3.8-management -f
  5. docker volume rm mq-plugins -f
  6. docker pull rabbitmq:3.8-management
  7. # 插件数据卷最好还是直接挂载 volume,而不是挂载我们的目录
  8. docker run \
  9. --name mq-server \
  10. -e RABBITMQ_DEFAULT_USER=xxx \
  11. -e RABBITMQ_DEFAULT_PASS=xxx \
  12. --hostname mq1 \
  13. -v mq-plugins:/plugins \
  14. -p 15672:15672 \
  15. -p 5672:5672 \
  16. -d rabbitmq:3.8-management
复制代码
三、RabbitMQ 根本知识

(1)架构

15672:RabbitMQ 提供的管理控制台的端口
5672:RabbitMQ 的消息发送处置惩罚接口
用户名密码就是安装时,启动容器时指定的用户名密码
MQ 对应的就是这里的消息代理 Broker:

RabbitMQ 具体架构图:

其中包罗几个概念:


  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处置惩罚
  • exchange:互换机,负责消息路由。生产者发送的消息由互换机决定投递到哪个队列。
  • virtual host:假造主机,起到数据隔离的作用。每个假造主机相互独立,有各自的 exchange、queue
如今你可能只熟悉生产者、消费者、队列,其他是什么呢?
实在你可以理解为 MQ 也是存储东西的,存储的就是消息,virtual host 就是数据库,queue 就是表,消息就是一行数据,而 MQ 有特殊的机制,消息先通过 exchange 再决定前去哪个 queue
管理控制台的利用就不多说了
(2)五大模式

这只是最常见的五种模式:

  • 简单模式


  • 工作模式


  • 发布订阅模式
关联互换机的队列都能收到一份消息,广播


  • 路由模式
关联互换机时,提供 routing key(可以是多个,队列之间可以重复),发布消息时提供一个 routing key,由此发送给指定的队列

   值得留意的是,简单模式和工作模式,实在也是有互换机的,任何队列都会绑定一个默认互换机 "",范例是 direct,routing key 为队列的名称
  

  • 主题模式

路由模式的基础上,队列关联互换机时 routing key 可以是带通配符的
   routing key 的单词通过 . 分割, # 匹配 n 个单词(n ≥ 0),* 只匹配一个单词
  例如 #.red:
  

  • 可以匹配的 routing key:p1.red、red、p2.p1.red
  在发布消息时,要利用具体的 routing key,互换机发送给匹配的队列
(3)数据隔离


  • 隔离 virtual host


  • 隔离用户(赋予访问权限)

四、RabbitMQ 根本利用 Spring AMQP

引入 RabbitMQ 相关的 SDK,可以通过创建毗连 Connection、创建通道 Channel,用 Channel 进行操作,担当消息也差不多,不过多演示:
  1. public class PublisherTest {
  2.     @Test
  3.     public void testSendMessage() throws IOException, TimeoutException {
  4.         // 1.建立连接
  5.         ConnectionFactory factory = new ConnectionFactory();
  6.         // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  7.         factory.setHost("xx.xx.xx.xx");
  8.         factory.setPort(5672);
  9.         factory.setVirtualHost("/");
  10.         factory.setUsername("xxx");
  11.         factory.setPassword("xxx");
  12.         // 1.2.建立连接
  13.         Connection connection = factory.newConnection();
  14.         // 2.创建通道Channel
  15.         Channel channel = connection.createChannel();
  16.         // 3.创建队列
  17.         String queueName = "simple.queue";
  18.         channel.queueDeclare(queueName, false, false, false, null);
  19.         // 4.发送消息
  20.         String message = "hello, rabbitmq!";
  21.         channel.basicPublish("", queueName, null, message.getBytes());
  22.         System.out.println("发送消息成功:【" + message + "】");
  23.         // 5.关闭通道和连接
  24.         channel.close();
  25.         connection.close();
  26.     }
  27. }
复制代码
但比较贫困,Spring AMQP 框架可以自动装配 RabbitMQ 的操作对象 RabbitTemplate,这样我们就可以更方便的操作 MQ,并充分发挥其特性
  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
默认包罗 RabbitMQ 的实现,如果你想对接其他 AMQP 协议的 MQ,得自己实现其抽象封装的接口
(1)发送消息

留意,下面是 Spring3 的写法,所以会有点不一样,可能看不懂,稍后解释!
消息发送器封装:
  1. @Repository
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class RabbitMQSender {
  5.     private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
  6.     private final RabbitTemplate rabbitTemplate;
  7.    
  8.     @PostConstruct
  9.     public void init() {
  10.         rabbitTemplate.setTaskExecutor(EXECUTOR);
  11.     }
  12.     private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
  13.         log.error("处理 ack 回执失败, {}", ex.getMessage());
  14.         return null;
  15.     };
  16.     private MessagePostProcessor delayMessagePostProcessor(long delay) {
  17.         return message -> {
  18.             // 小于 0 也是立即执行
  19.             // setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的
  20.             message.getMessageProperties().setDelay((int) Math.max(delay, 0));
  21.             return message;
  22.         };
  23.     };
  24.     private CorrelationData newCorrelationData() {
  25.         return new CorrelationData(UUIDUtil.uuid32());
  26.     }
  27.     /**
  28.      * @param exchange 交换机
  29.      * @param routingKey routing key
  30.      * @param msg 消息
  31.      * @param delay 延迟时间(如果是延迟交换机,delay 才有效)
  32.      * @param maxRetries 最大重试机会
  33.      * @param <T> 消息的对象类型
  34.      */
  35.     private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
  36.         log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
  37.                 exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
  38.         CorrelationData correlationData = newCorrelationData();
  39.         MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
  40.         correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {
  41.             private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器
  42.             @Override
  43.             public void accept(CorrelationData.Confirm confirm) {
  44.                 Optional.ofNullable(confirm).ifPresent(c -> {
  45.                     if(c.isAck()) {
  46.                         log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());
  47.                     } else {
  48.                         log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());
  49.                         if(retryCount >= maxRetries) {
  50.                             log.error("次数到达上限 {}", maxRetries);
  51.                             return;
  52.                         }
  53.                         retryCount++;
  54.                         log.warn("开始第 {} 次重试", retryCount);
  55.                         CorrelationData cd = newCorrelationData();
  56.                         cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
  57.                         rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
  58.                     }
  59.                 });
  60.             }
  61.         }, EXECUTOR);
  62.         rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
  63.     }
  64.     public void sendMessage(String exchange, String routingKey, Object msg) {
  65.         send(exchange, routingKey, msg, 0, 0);
  66.     }
  67.     public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){
  68.         send(exchange, routingKey, msg, delay, 0);
  69.     }
  70.     public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {
  71.         send(exchange, routingKey, msg, 0, maxReties);
  72.     }
  73.     public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {
  74.         send(exchange, routingKey, msg, delay, maxReties);
  75.     }
  76. }
复制代码
(2)担当消息

监听器:


  • RabbitTemplate 是可以自动获取消息的,也可以不实时监听,但是一样平常情况都是监听,有消息就实行
  • 监听的是 queue,若 queue 不存在,就会根据注解创建一遍
  1. @RabbitListener(bindings = @QueueBinding(
  2.     value = @Queue(name = "xxx"),
  3.     exchange = @Exchange(name = "xxx", delayed = "true"),
  4.     key = {"xxx"}
  5. ))
  6. public void xxx(X x) {
  7. }
复制代码
(3)声明互换机与队列

可以通过 @Bean 创建 Bean 对象的方式去声明,可以自行搜刮,我更喜欢监听器注解的情势,而且 Bean 的方式,可能会由于配置不完全一样,导致其他配置类的互换机队列无法声明(征象云云,底层为啥我不知道)
(4)消息转换器

消息是一个字符串,但为了满足更多需求,需要将一个对象序列化成一个字符串,但默认的序列化实现貌似用的是 java 对象的序列化,这种方式可能得同一个步伐的 java 类才能反序列化成功,所以我们应该选择分布式的序列化方式,好比 json
  1. @Configuration
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class MessageConverterConfig {
  5.     @Bean
  6.     public MessageConverter messageConverter(){
  7.         // 1. 定义消息转换器
  8.         Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);
  9.         // 2. 配置自动创建消息 id,用于识别不同消息
  10.         jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);
  11.         return jackson2JsonMessageConverter;
  12.     }
  13. }
复制代码
  这里的 JsonUtil.OBJECT_MAPPER,就是框架的大概自己实现的 ObjectMapper
  (5)配置文件

  1. spring:
  2.   rabbitmq:
  3.     host: ${xxx.mq.host} # rabbitMQ 的 ip 地址
  4.     port: ${xxx.mq.port} # 端口
  5.     username: ${xxx.mq.username}
  6.     password: ${xxx.mq.password}
  7.     virtual-host: ${xxx.mq.virtual-host}
  8.     publisher-confirm-type: correlated
  9.     publisher-returns: true
  10.     template:
  11.       mandatory: true # 若是 false 则直接丢弃了,并不会发送者回执
  12.     listener:
  13.       simple:
  14.         prefetch: 1 # 预取为一个(消费完才能拿下一个)
  15.         concurrency: 2 # 消费者最少 2 个线程
  16.         max-concurrency: 10 # 消费者最多 10 个线程
  17.         auto-startup: true # 为 false 监听者不会实时创建和监听,为 true 监听的过程中,若 queue 不存在,会再根据注解进行创建,创建后只监听 queue,declare = "false" 才是不自动声明
  18.         default-requeue-rejected: false # 拒绝后不 requeue(成为死信,若没有绑定死信交换机,就真的丢了)
  19.         acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)
  20.         retry: # 这个属于 spring amqp 的 retry 机制
  21.           enabled: false # 不开启失败重试
  22. #          initial-interval: 1000
  23. #          multiplier: 2
  24. #          max-attempts: 3
  25. #          stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false
复制代码
五、常见问题


(1)RabbitMQ 如何保证消息可靠性

保证消息可靠性、不丢失。重要从三个层面考虑
如果报错可以先记录到日记中,再去修复数据(保底)
1、生产者确认机制

生产者确认机制,确保生产者的消息能到达队列

  • publisher-confirm,针对的是消息从发送者到互换机的可靠性,成功则进行下一步,失败返回 NACK
  1. private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
  2. private final RabbitTemplate rabbitTemplate;
  3. @PostConstruct
  4. public void init() {
  5.     rabbitTemplate.setTaskExecutor(EXECUTOR);
  6. }
  7. private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
  8.     log.error("处理 ack 回执失败, {}", ex.getMessage());
  9.     return null;
  10. };
  11. private MessagePostProcessor delayMessagePostProcessor(long delay) {
  12.     return message -> {
  13.         // 小于 0 也是立即执行
  14.         // setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的
  15.         message.getMessageProperties().setDelay((int) Math.max(delay, 0));
  16.         return message;
  17.     };
  18. };
  19. private CorrelationData newCorrelationData() {
  20.     return new CorrelationData(UUIDUtil.uuid32());
  21. }
  22. /**
  23.      * @param exchange 交换机
  24.      * @param routingKey routing key
  25.      * @param msg 消息
  26.      * @param delay 延迟时间(如果是延迟交换机,delay 才有效)
  27.      * @param maxRetries 最大重试机会
  28.      * @param <T> 消息的对象类型
  29.      */
  30. private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
  31.     log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
  32.              exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
  33.     CorrelationData correlationData = newCorrelationData();
  34.     MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
  35.     correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {
  36.         private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器
  37.         @Override
  38.         public void accept(CorrelationData.Confirm confirm) {
  39.             Optional.ofNullable(confirm).ifPresent(c -> {
  40.                 if(c.isAck()) {
  41.                     log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());
  42.                 } else {
  43.                     log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());
  44.                     if(retryCount >= maxRetries) {
  45.                         log.error("次数到达上限 {}", maxRetries);
  46.                         return;
  47.                     }
  48.                     retryCount++;
  49.                     log.warn("开始第 {} 次重试", retryCount);
  50.                     CorrelationData cd = newCorrelationData();
  51.                     cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
  52.                     rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
  53.                 }
  54.             });
  55.         }
  56.     }, EXECUTOR);
  57.     rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
  58. }
复制代码
Spring3 的 RabbitMQ Confirm,需要配置为 correlated,发送消息时提供 CorrelationData,也就是与消息关联的数据,包罗发送者确认时的回调方法

要想提供 Confirm 的回调办法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 对象(新的 JUC 工具类,可以查一查如何利用)
配置后,在未来根据回调函数进行处置惩罚(当然也可以直接设置在 RabbitTemplate 对象的 ConfirmCallBack)
还可以自己实现消息的发送者重试:


  • publisher-returns,针对的是消息从互换机到队列的可靠性,成功则返回 ACK,失败触发 returns 的回调方法
  1. @Component
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {
  5.     // 不存在 routing key 对应的队列,那在我看来转发到零个是合理的现象,但在这里也认为是路由失败(MQ 认为消息一定至少要进入一个队列,之后才能被处理,这就是可靠性)(反正就是回执了,你爱咋处理是你自己的事情)
  6.     @Override
  7.     public void returnedMessage(ReturnedMessage returnedMessage) {
  8.         // 可能一些版本的 mq 会因为是延时交换机,导致发送者回执,只要没有 NACK 这种情况其实并不是不可靠(其实我也不知道有没有版本会忽略)
  9.         // 但是其实不忽略也不错,毕竟者本来就是特殊情况,一般交换机是不存储的,但是这个临时存储消息
  10.         // 但这样也就代表了,延时后消息路由失败是没法再次处理的(因为我们交给延时交换机后就不管了,可靠性有 mq 自己保持)
  11.         MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();
  12.         // 这里的 message 并不是原本的 message,是额外的封装,x-delay 在 publish-returns 里面封装到 receiveDelay 里了
  13.         Integer delay = messageProperties.getReceivedDelay();
  14.         // 如果不是延时交换机,却设置了 delay 大于 0,是不会延时的,所以是其他原因导致的(以防万一把消息记录到日志里)
  15.         if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {
  16.             log.info("交换机 {}, 路由键 {} 消息 {} 延迟 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));
  17.             return;
  18.         }
  19.         log.warn("publisher-returns 发送者回执(应答码{},应答内容{})(消息 {} 成功到达交换机 {},但路由失败,路由键为 {})",
  20.                 returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),
  21.                 returnedMessage.getExchange(), returnedMessage.getRoutingKey());
  22.     }
  23. }
复制代码
RabbitMQSender:
  1. private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
  2. private final RabbitTemplate rabbitTemplate;
  3. private final PublisherReturnsCallBack publisherReturnsCallBack;
  4. @PostConstruct
  5. public void init() {
  6.     rabbitTemplate.setTaskExecutor(EXECUTOR);
  7.     // 设置统一的 publisher-returns(confirm 也可以设置统一的,但最好还是在发送时设置在 future 里)
  8.     // rabbitTemplate 的 publisher-returns 同一时间只能存在一个
  9.     // 因为 publisher confirm 后,其实 exchange 有没有转发成功,publisher 没必要每次发送都关注这个 exchange 的内部职责,更多的是“系统与 MQ 去约定”
  10.     rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
  11. }
复制代码
同理你也可以按照自己的想法进行重试…
   在测试训练阶段里,这个过程是异步回调的,如果是单元测试,发送完消息进程就竣事了,可能就没回调,步伐就竣事了,自然就看不到回调时的日记
  如果既没有 ACK 也没有 NACK,也没有发布者回执,那就相当于这个消息鸣金收兵了,没有任何的回应,那么就会抛出异常,我们可以处置惩罚这个异常,好比打印日记、重发之类的…
  1. private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
  2.     log.error("处理 ack 回执失败, {}", ex.getMessage());
  3.     return null;
  4. };
复制代码
2、持久化

消息队列的数据持久化,确保消息未消费前在队列中不会丢失,其中的互换机、队列、和消息都要做持久化
默认都是持久化的



3、消费者确认

队列的消息出队列,并不会立即删除,而是等待消费者返回 ACK 大概 NACK
消费者要什么时候发送 ACK 呢?
   

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处置惩罚
  如果出现这种场景,就是不可靠的,所以应该是消息处置惩罚后,再发送 ACK
Spring AMQP 有三种消费者确认模式:

  • manual,本事 ack,自己用 rabbitTemplate 去发送 ACK/NACK(这个比较贫困,不用 RabbitListener 担当消息才必须用这个)
  • auto,配合 RabbitListener 注解,代码若出现异常,NACK,成功则 ACK
  • none,获得消息后直接 ACK,无论是否实行成功
出现 NACK 后要如那里理(此过程还在我们的服务器):

  • 拒绝(默认)
  • 重新入队列
  • 返回 ACK,消费者重新发布消息指定的互换机
  1. @Configuration
  2. @RequiredArgsConstructor
  3. @Slf4j
  4. public class MessageRecovererConfig {
  5.     @Bean
  6.     public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
  7.         return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成为死信(默认)
  8. //        return new ImmediateRequeueMessageRecoverer(); // nack、requeue
  9. //        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、发送给指定的交换机,confirm 机制需要设置到 rabbitTemplate 里
  10.     }
  11. }
复制代码
Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限定的requeue到mq队列。
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)
  6.         retry: # 这个属于 spring amqp 的 retry 机制
  7.           enabled: false # 不开启失败重试
  8.           initial-interval: 1000 # 第一次重试时间间隔
  9.           multiplier: 3 # 每次重试间隔的倍数
  10.           max-attempts: 4 # 最大接受次数
  11.           stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false
复制代码
  解释:第一次失败,一秒后重试、第二次失败,三秒后重试,第三次失败,九秒后重试,第四次失败就没机会了(SpringAMQP会抛出异常AmqpRejectAndDontRequeueException)
  失败之后根据对应的处置惩罚计谋进行处置惩罚
(2)死信互换机

消息逾期、消息实行失败而且不重试也不重新入队列,堆积过多等情况,消息会成为死信,若队列绑定死信互换机,则转发给死信互换机,若没有则直接抛弃
   队列1 -> 死信互换机 -> 队列2,这个过程是消息队列内部保证的可靠性,消息也没有包罗原发送者的信息,甚至毗连已经断开了,所以没有 publisher-confirm 也没有 publisher-returns
  

这个机制和 republish 有点像,但是有本质的区别,republish 是消费者重发,而这里是队列将死信转发给死信互换机
死信的情况:

  • nack && requeue == false
  • 超时未消费
  • 队列满了,由于队列的特性,队列头会先成为死信
(3)延迟功能如何实现

刚才提到死信的诞生可能是超时未消费,那么实在这个点也可以简单的实现一个延迟队列:
队列为一个不被监听的专门用来延迟消息发送的缓冲带,其死信互换机才是目标互换机
  1. message.getMessageProperties().setExpiration("1000");
复制代码
设置的是逾期时间,其本意并不是延迟,是可以实现延迟~

另外,队列自己也能设置 ttl 逾期时间,但并不是队列的逾期时间(显然不合理,截止后无论啥都丢了,冤不冤啊,至少我想不到这种场景),而是队列中的消息存活的最大时间,消息的逾期时间和这个取一个最小值才是真实的逾期时间
值得留意的是,固然能实现延时消息的功能,但是

  • 实现复杂
  • 延迟可能不正确,由于队列的特性,如果队列头未出队列,哪怕其后者出现死信,也只能乖乖等前面的先出去之后才能前去死信互换机(例如消息的 ttl 分别为 9s、3s、1s,终极三个消息会被同时转发,由于“最长寿的”排在了前面)
这种方式的顺序优先级大于时间优先级
而 RabbitMQ 也提供了一个插件,叫 DelayExchange 延时互换机,专门用来实现延时功能
Scheduling Messages with RabbitMQ | RabbitMQ


  • 请自行上网下载
延时互换机的声明:
  1. @RabbitListener(bindings = @QueueBinding(
  2.         value = @Queue(name = "delay.queue", durable = "true"),
  3.         exchange = @Exchange(name = "delay.direct", delayed = "true"),
  4.         key = "delay"
  5. ))
  6. public void listenDelayMessage(String msg){
  7.     log.info("接收到delay.queue的延迟消息:{}", msg);
  8. }
复制代码
延时消息的发送:
  1. private MessagePostProcessor delayMessagePostProcessor(long delay) {
  2.     return message -> {
  3.         // 小于 0 也是立即执行
  4.         message.getMessageProperties().setDelay((int) Math.max(delay, 0));
  5.         return message;
  6.     };
  7. };
复制代码
这里设置的是 Delay,不是逾期时间,哪怕超过了时间也不叫做死信
期间一直存在延时互换机的硬存里,延迟消息插件内部会维护一个本地数据库表,同时利用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在弊端。
(4)消息堆积如何办理

死信的成因还可能是堆叠过多
我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,办理方案也所有许多的

  • 提高消费者的消费能力 ,可以利用多线程消费任务
  • 增加更多消费者,提高消费速度,利用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
  • 扩大队列容积,提高堆积上限
但是,RabbitMQ 队列占的是内存,间接性的落盘,提高上限终极的效果很有可能就是反复落库,特殊不稳固,且并没有办理消息堆积过多的问题
我们可以利用 RabbitMQ 惰性队列,惰性队列的好处重要是

  • 吸收到消息后直接存入磁盘而非内存,固然慢,但没有间歇性的 page-out,性能比较稳固
  • 消费者要消费消息时才会从磁盘中读取并加载到内存,正常消费后就删除了
  • 基于磁盘存储,消息上限高,支持数百万条的消息存储
声明方式:
   而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
  1. rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
复制代码
命令解读:
  

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个计谋
  • Lazy :计谋名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :计谋的作用对象,是所有的队列
  

  • x-queue-mode 参数的值为 lazy
  1. @RabbitListener(bindings = @QueueBinding(
  2.     exchange = @Exchange(name = "xxx"),
  3.     value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
  4.     key = "xxx"
  5. ))
复制代码
  互换机、队列扩展属性叫参数,消息的拓展属性叫头部,扩展属性一样平常都以 x- 开头(extra)
  消息堆积问题的办理方案?


  • 队列上绑定多个消费者,提高消费速度
  • 利用惰性队列,可以再mq中生存更多消息
惰性队列的优点有哪些?


  • 基于磁盘存储,消息上限高
  • 没有间歇性的 page-out,性能比较稳固
惰性队列的缺点有哪些?


  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO
(5)高可用如何保证

RabbitMQ 在服务大规模项目时,一样平常情况下不会像数据库那样存储的瓶颈,用惰性队列已经是很顶天了的,其特性和用途不会有太极端的存储压力
更多的是在并发情况下,处置惩罚消息的能力有瓶颈,可能出现节点宕机的情况,而克制单节点宕机,数据丢失、无法提供服务等问题需要办理,也就是需要保证高可用性
Erlang 是一种面向并发的语言,天然支持集群模式,RabbitMQ 的集群有两种模式:

  • 平凡集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
  • 镜像集群:是一种主从集群,在平凡集群的基础上,添加了主从备份的功能,提高集群的数据可用性
镜像集群固然支持主从,但主从同步并不是强同等的,某些情况下可能有数据丢失的风险(固然重启能办理,但那不是强同等,而是终极同等),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁队列来取代镜像集群,底层接纳 Raft 协议确保主从的数据同等性
1、平凡集群

各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):

  • 互换机的信息
  • 队列的信息
但不包罗队列中的消息(动态的数据不同步)
监听队列的时候,如果监听的节点不存在该队列(只是知道元数据),当前节点会访问队列地点的节点,该节点返回数据到当前节点并返回给监听者
队列地点节点宕机,队列中的消息就会“丢失”(是在重启之前,这个消息就消失无法被处置惩罚的意思)
   

  如何部署,上网搜搜就行
  2、镜像集群

各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):

  • 互换机的信息
  • 队列的信息
本质是主从模式,创建队列的节点为主节点,其他节点为镜像节点,队列中的消息会从主节点备份到镜像节点中
留意


  • 像 Redis 那样的主从集群,同步都是全部同步来着
  • 但 RabbitMQ 集群的主从模式比较特殊,他的粒度是队列,而不是全部
也就是说,一个队列的主节点,可能是另一个队列的镜像节点,所以分析某个场景的时候,要确认是哪个队列,单独进行观察分析讨论


  • 不同队列之间只有交互,不会相互影响数据同步
针对某一个队列,所有写操作都在主节点完成,然后同步给镜像节点,读操作任何一个都 ok
主节点宕机,镜像节成为新的主节点

镜像集群有三种模式:

  • exactly 正确模式,指定副本数 count = 主节点数 1 + 镜像节点数,集群会尽可能的维护这个数值,如果镜像节点出现故障,就在另一个节点上创建镜像,比较发起这种模式,可以设置为 N/2 + 1
  • all 全部模式,count = N,主节点外全部都是镜像节点
  • nodes 模式,指定镜像节点名称列表,随机一个作为主节点,如果列表里的节点都不存在或不可用,则创建队列时的节点作为主节点,之后访问集群,列表中的节点若存在才会创建镜像节点
没有镜像节点实在就相当于平凡模式了
如何配置上网搜搜就行,比较贫困,需要设置计谋,以及匹配的队列(不同队列分开来讨论,可以设置不同的计谋)
3、仲裁队列

RabbitMQ 3.8 以后,推出了新的功能仲裁队列来

  • 取代镜像集群,都是主从模式,支持主从数据同步,默认是 exactly count = 5
  • 约定大于配置,利用非常简单没有复杂的配置,队列的范例选择 Quorum 即可
  • 底层接纳 Raft 协议确保主从的数据强同等性
Spring Boot 配置:

仲裁队列声明:
  1. @RabbitListener(bindings = @QueueBinding(
  2.     exchange = @Exchange(name = "xxx"),
  3.     value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),
  4.     key = "xxx"
  5. ))
复制代码
队列不声明默认就是平凡集群,这里声明的仲裁队列也只是针对一个队列
(6)消息重复消费问题

在保证MQ消息不重复的情况下,MQ 的一条消息被消费者消费了多次
消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常大概是服务宕机,MQ 迟迟没有吸收到 ACK 也没有 NACK,此时 MQ 不会将发送的消息删除,按兵不动,消费者重新监听大概有其他消费者的时候,交由它消费,而这条消息如果在之前就消费过了的话,则会导致重复消费
办理方案:

  • 消息消费的业务自己具有幂等性,再次处置惩罚雷同消息时不会产生副作用,一些时候可能需要用到分布式锁去维护幂等性

    • 好比一个订单的状态设置为竣事,那重复消费的效果同等

  • 记录消息的唯一标识,如果消费过了的,则不再消费

    • 消费成功将 id 缓存起来,消费时查询缓存里是否有这条消息
    • 设置答应的缓存时间时,你不必想得太极端,一样平常很快就有消费者继续监听拿到消息,哪怕真有谁人情况,这里带来的损失大概率可以忽略不记了,一切要联合实际情况!

偶然候两种方案没有严酷的界定

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表