ToB企服应用市场:ToB评测及商务社交产业平台

标题: 消息队列RabbitMQ [打印本页]

作者: 我可以不吃啊    时间: 2024-10-3 08:57
标题: 消息队列RabbitMQ
1. 简介与安装

RabbitMQ是基于Erlang语言开辟的开源消息通信中间件,支持AMQP,XMPP,SMTP,STOMP协议,消息延迟时微秒级别的。
Ubuntu系统RabbitMQ的安装

2. 基本概念

3. SpringAMQP

4. 交换机范例

5. 消息转换器

5.1 默认转换器

在数据传输时,发送的消息被序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring接纳的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

5.2 设置JSON转换器

6 生产者的可靠性

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况。少数情况下,大概出现投递的消息没有成功入队。
6.1 生产者超时重连机制

在生产者服务中进行如下设置
  1. spring:
  2.   rabbitmq:
  3.     connection-timeout: 1s # 连接超时时间
  4.     template:
  5.       retry:
  6.         enabled: true # 开启超时重连机制
  7.         initial-interval: 1000ms # 初始等待时间
  8.         multiplier: 1 # 等待时长倍数,下次等待时长 initial-interval * multiplier
  9.         max-attempts: 3 # 重试次数
复制代码
当网络不稳定时,超时重连机制可以提高消息的发送成功率,但是SpringAMQP提供的重连机制时阻塞式的。不发起开启该功能,若业务必要,必要设置合理的等候时间和重试次数,也可以使用异步线程来实验发送消息的代码。
6.2 生产者确认机制

设置文件设置选项
  1. spring:
  2.   rabbitmq:
  3.     publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
  4.     # none:关闭confirm机制; simple:同步阻塞等待MQ的回执; correlated:MQ异步回调返回回执(推荐)
  5.     publisher-returns: true # 开启publisher return机制
复制代码
6. MQ的可靠性

消息到达MQ以后,如果MQ不能实时生存,也会导致消息丢失。
- MQ宕机;
- 内存空间不敷,引发MQ阻塞实验持久化;
6.1 数据持久化

6.2 惰性队列 Lazy Queue

7. 消耗者的可靠性

当RabbitMQ向消耗者投递消息以后,必要知道消耗者的处理状态如何。因为消耗者消耗消息大概出现故障,比如:

7.1 消耗者确认机制

RabbitMQ提供了消耗者确认机制(Consumer Acknowledgement),当消耗者处理消息后,向RabbitMQ发送一个回执,告知RabbitMQ本身消息处理状态。回执有三种可选值:

消息确认机制的实现方式

7.2 失败重试机制

开启消耗者确认机制后,如果消息处理不停返回NACK,那么消息会反复进行入队和处理,会导致MQ压力飙升。
而开启失败重试机制后,消息会在当地重试,而不是重新入队,当地重试达到最大次数后,默认会返回reject丢弃消息。
在消耗者服务的设置文件中进行设置
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         retry:
  6.           enabled: true # 开启消费者失败重试
  7.           initial-interval: 1000ms # 初识的失败等待时长为1秒
  8.           multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
  9.           max-attempts: 3 # 最大重试次数
  10.           stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
复制代码
7.3 失败处理战略

当地重试达到最大次数后,默认会返回reject丢弃消息,而有些业务显然无法接受消息的丢失。MQ支持之界说重试次数耗尽后的处理战略

  1. @Configuration
  2. public class MqErrorConfig {
  3.     private final static String ERROR_EXCHANGE = "error.direct";
  4.     private final static String ERROR_QUEUE = "error.queue";
  5.     private final static String ERROR_ROTING_KEY = "error";
  6.     /**
  7.      * 创建处理失败消息的交换机
  8.      * @return
  9.      */
  10.     @Bean
  11.     public DirectExchange errorExchange() {
  12.         return new DirectExchange(ERROR_EXCHANGE);
  13.     }
  14.     /**
  15.      * 创建存放失败消息的队列
  16.      * @return
  17.      */
  18.     @Bean
  19.     public Queue errorQueue() {
  20.         return new Queue(ERROR_QUEUE);
  21.     }
  22.     /**
  23.      * 交换机与队列绑定
  24.      * @param errorQueue
  25.      * @param errorExchange
  26.      * @return
  27.      */
  28.     @Bean
  29.     public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
  30.         return BindingBuilder.bind(errorQueue).to(errorExchange).with(ERROR_ROTING_KEY);
  31.     }
  32.     /**
  33.      * 注册处理失败消息处理策略
  34.      * @param rabbitTemplate
  35.      * @return
  36.      */
  37.     @Bean
  38.     public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
  39.         return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE, ERROR_ROTING_KEY);
  40.     }
  41. }
复制代码
7.4 业务幂等性方案

在步伐开辟中,则是指同一个业务,实验一次或多次对业务状态的影响是一致的。
7.4.1 唯一消息ID


  1. @Bean
  2. public MessageConverter messageConverter(){
  3.     // 1.定义消息转换器
  4.     Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  5.     // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  6.     jjmc.setCreateMessageIds(true);
  7.     return jjmc;
  8. }
复制代码
7.4.2 业务判定

非幂等性业务,会对数据进行更改,那么我们在实验业务逻辑前,可先判定数据记录是否处于未处理状态,比如可以根据订单的状态。
7.5 兜底战略

开启定时任务自动去查询数据库,判定数据有必要处理的数据。
8. 延迟消息

8.1 死信交换机

设计两个队列两个交换机,当消息过期时,消息会被投递到死信队列,只需监听死信队列即可。通过设置队列dead-letter-exchange指定过期的消息投递的交换机,也就是死信交换机。对于消息,通过expration指定过期时间。
然而,RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不肯定会被移除或投递到死信交换机,而是在消息恰恰处于队首时才会被处理。 当队列中消息堆积很多的时候,过期消息大概不会被按时处理,因此你设置的TTL时间不肯定正确。
8.2 DelayExchange插件

开启队列的delayed设置,而且在投递消息时设置delay时长。
延迟消息插件内部会维护一个当地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,大概会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不发起设置延迟时间过长的延迟消息。
改进战略,将消息的delay时长分段,比如将延迟时间切割成10s 10s 10s 15s 15s …,大部分消息在前30s内就已经可以被消耗,不必要等到30分钟,可以有效防止消息堆积。
参考资料:https://www.bilibili.com/video/BV1mN4y1Z7t9/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4