一、初识 MQ
传统的单体架构,分布式架构的同步调用里,无论是方法调用,还是 OpenFeign 难免会有以下问题:
- 扩展性差(高耦合,需要依赖对应的服务,同样的事件,不停有新需求,这个事件的业务代码会越来越臃肿,这些子业务都写在一起)
- 性能下降(等待相应,终极整个业务的相应时长就是每次长途调用的实行时长之和)
- 级联失败(如果是一个事件,一个服务失败就会导致全部回滚,若是分布式事件就更加贫困了,但实在一些行为的失败不应该导致团体回滚)
- 服务宕机(如果服务调用者未考虑服务提供者的性能,导致提供者由于过度请求而宕机)
但如果不是很要求同步调用,实在也可以用异步调用,如果是单体架构,你可能很快能想到一个办理方案,就是壅闭队列实现消息通知:
但是在分布式架构下,可能就需要一个中间件级别的壅闭队列,这就是我们要学习的 Message Queue 消息队列,简称 MQ,而如今流行的 MQ 还不少,在实现其根本的消息通知功能外,尚有一些不错的扩展
以 RabbitMQ 和 Kafka 为例:
RabbitMQKafka公司/社区RabbitApache开发语言ErlangScala & Java协议支持AMQP,XMPP,SMTP,STOMP自定义协议可用性高高单机吞吐量一样平常非常高(Kafka 亮点)消息延迟微秒级毫秒以内消息可靠性高一样平常 消息延迟指的是,消息到队列,并在队列中“就绪”的时间与预期时间的差距,实在就是数据在中间件中流动的耗时,预期时间可以是如今、几毫秒后、几秒后、几天后…
据统计,目前国内消息队列利用最多的还是 RabbitMQ,再加上其各方面都比较均衡,稳固性也好,因此我们讲堂上选择 RabbitMQ 来学习。
二、RabbitMQ 安装
Docker 安装 RabbitMQ:
- mkdir /root/mq
- cd /root/mq
- docker rm mq-server -f
- docker rmi rabbitmq:3.8-management -f
- docker volume rm mq-plugins -f
- docker pull rabbitmq:3.8-management
- # 插件数据卷最好还是直接挂载 volume,而不是挂载我们的目录
- docker run \
- --name mq-server \
- -e RABBITMQ_DEFAULT_USER=xxx \
- -e RABBITMQ_DEFAULT_PASS=xxx \
- --hostname mq1 \
- -v mq-plugins:/plugins \
- -p 15672:15672 \
- -p 5672:5672 \
- -d rabbitmq:3.8-management
复制代码 三、RabbitMQ 根本知识
(1)架构
15672:RabbitMQ 提供的管理控制台的端口
5672:RabbitMQ 的消息发送处置惩罚接口
用户名密码就是安装时,启动容器时指定的用户名密码
MQ 对应的就是这里的消息代理 Broker:
RabbitMQ 具体架构图:
其中包罗几个概念:
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处置惩罚
- exchange:互换机,负责消息路由。生产者发送的消息由互换机决定投递到哪个队列。
- virtual host:假造主机,起到数据隔离的作用。每个假造主机相互独立,有各自的 exchange、queue
如今你可能只熟悉生产者、消费者、队列,其他是什么呢?
实在你可以理解为 MQ 也是存储东西的,存储的就是消息,virtual host 就是数据库,queue 就是表,消息就是一行数据,而 MQ 有特殊的机制,消息先通过 exchange 再决定前去哪个 queue
管理控制台的利用就不多说了
(2)五大模式
这只是最常见的五种模式:
关联互换机的队列都能收到一份消息,广播
关联互换机时,提供 routing key(可以是多个,队列之间可以重复),发布消息时提供一个 routing key,由此发送给指定的队列
值得留意的是,简单模式和工作模式,实在也是有互换机的,任何队列都会绑定一个默认互换机 "",范例是 direct,routing key 为队列的名称
路由模式的基础上,队列关联互换机时 routing key 可以是带通配符的
routing key 的单词通过 . 分割, # 匹配 n 个单词(n ≥ 0),* 只匹配一个单词
例如 #.red:
- 可以匹配的 routing key:p1.red、red、p2.p1.red
在发布消息时,要利用具体的 routing key,互换机发送给匹配的队列
(3)数据隔离
四、RabbitMQ 根本利用 Spring AMQP
引入 RabbitMQ 相关的 SDK,可以通过创建毗连 Connection、创建通道 Channel,用 Channel 进行操作,担当消息也差不多,不过多演示:
- public class PublisherTest {
- @Test
- public void testSendMessage() throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("xx.xx.xx.xx");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("xxx");
- factory.setPassword("xxx");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
- // 4.发送消息
- String message = "hello, rabbitmq!";
- channel.basicPublish("", queueName, null, message.getBytes());
- System.out.println("发送消息成功:【" + message + "】");
- // 5.关闭通道和连接
- channel.close();
- connection.close();
- }
- }
复制代码 但比较贫困,Spring AMQP 框架可以自动装配 RabbitMQ 的操作对象 RabbitTemplate,这样我们就可以更方便的操作 MQ,并充分发挥其特性
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 默认包罗 RabbitMQ 的实现,如果你想对接其他 AMQP 协议的 MQ,得自己实现其抽象封装的接口
(1)发送消息
留意,下面是 Spring3 的写法,所以会有点不一样,可能看不懂,稍后解释!
消息发送器封装:
- @Repository
- @RequiredArgsConstructor
- @Slf4j
- public class RabbitMQSender {
- private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
- private final RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init() {
- rabbitTemplate.setTaskExecutor(EXECUTOR);
- }
- private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
- log.error("处理 ack 回执失败, {}", ex.getMessage());
- return null;
- };
- private MessagePostProcessor delayMessagePostProcessor(long delay) {
- return message -> {
- // 小于 0 也是立即执行
- // setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的
- message.getMessageProperties().setDelay((int) Math.max(delay, 0));
- return message;
- };
- };
- private CorrelationData newCorrelationData() {
- return new CorrelationData(UUIDUtil.uuid32());
- }
- /**
- * @param exchange 交换机
- * @param routingKey routing key
- * @param msg 消息
- * @param delay 延迟时间(如果是延迟交换机,delay 才有效)
- * @param maxRetries 最大重试机会
- * @param <T> 消息的对象类型
- */
- private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
- log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
- exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
- CorrelationData correlationData = newCorrelationData();
- MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
- correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {
- private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器
- @Override
- public void accept(CorrelationData.Confirm confirm) {
- Optional.ofNullable(confirm).ifPresent(c -> {
- if(c.isAck()) {
- log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());
- } else {
- log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());
- if(retryCount >= maxRetries) {
- log.error("次数到达上限 {}", maxRetries);
- return;
- }
- retryCount++;
- log.warn("开始第 {} 次重试", retryCount);
- CorrelationData cd = newCorrelationData();
- cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
- rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
- }
- });
- }
- }, EXECUTOR);
- rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
- }
- public void sendMessage(String exchange, String routingKey, Object msg) {
- send(exchange, routingKey, msg, 0, 0);
- }
- public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){
- send(exchange, routingKey, msg, delay, 0);
- }
- public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {
- send(exchange, routingKey, msg, 0, maxReties);
- }
- public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {
- send(exchange, routingKey, msg, delay, maxReties);
- }
- }
复制代码 (2)担当消息
监听器:
- RabbitTemplate 是可以自动获取消息的,也可以不实时监听,但是一样平常情况都是监听,有消息就实行
- 监听的是 queue,若 queue 不存在,就会根据注解创建一遍
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "xxx"),
- exchange = @Exchange(name = "xxx", delayed = "true"),
- key = {"xxx"}
- ))
- public void xxx(X x) {
- }
复制代码 (3)声明互换机与队列
可以通过 @Bean 创建 Bean 对象的方式去声明,可以自行搜刮,我更喜欢监听器注解的情势,而且 Bean 的方式,可能会由于配置不完全一样,导致其他配置类的互换机队列无法声明(征象云云,底层为啥我不知道)
(4)消息转换器
消息是一个字符串,但为了满足更多需求,需要将一个对象序列化成一个字符串,但默认的序列化实现貌似用的是 java 对象的序列化,这种方式可能得同一个步伐的 java 类才能反序列化成功,所以我们应该选择分布式的序列化方式,好比 json
- @Configuration
- @RequiredArgsConstructor
- @Slf4j
- public class MessageConverterConfig {
- @Bean
- public MessageConverter messageConverter(){
- // 1. 定义消息转换器
- Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);
- // 2. 配置自动创建消息 id,用于识别不同消息
- jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);
- return jackson2JsonMessageConverter;
- }
- }
复制代码 这里的 JsonUtil.OBJECT_MAPPER,就是框架的大概自己实现的 ObjectMapper
(5)配置文件
- spring:
- rabbitmq:
- host: ${xxx.mq.host} # rabbitMQ 的 ip 地址
- port: ${xxx.mq.port} # 端口
- username: ${xxx.mq.username}
- password: ${xxx.mq.password}
- virtual-host: ${xxx.mq.virtual-host}
- publisher-confirm-type: correlated
- publisher-returns: true
- template:
- mandatory: true # 若是 false 则直接丢弃了,并不会发送者回执
- listener:
- simple:
- prefetch: 1 # 预取为一个(消费完才能拿下一个)
- concurrency: 2 # 消费者最少 2 个线程
- max-concurrency: 10 # 消费者最多 10 个线程
- auto-startup: true # 为 false 监听者不会实时创建和监听,为 true 监听的过程中,若 queue 不存在,会再根据注解进行创建,创建后只监听 queue,declare = "false" 才是不自动声明
- default-requeue-rejected: false # 拒绝后不 requeue(成为死信,若没有绑定死信交换机,就真的丢了)
- acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)
- retry: # 这个属于 spring amqp 的 retry 机制
- enabled: false # 不开启失败重试
- # initial-interval: 1000
- # multiplier: 2
- # max-attempts: 3
- # stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false
复制代码 五、常见问题
(1)RabbitMQ 如何保证消息可靠性
保证消息可靠性、不丢失。重要从三个层面考虑
如果报错可以先记录到日记中,再去修复数据(保底)
1、生产者确认机制
生产者确认机制,确保生产者的消息能到达队列
- publisher-confirm,针对的是消息从发送者到互换机的可靠性,成功则进行下一步,失败返回 NACK
- private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
- private final RabbitTemplate rabbitTemplate;
- @PostConstruct
- public void init() {
- rabbitTemplate.setTaskExecutor(EXECUTOR);
- }
- private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
- log.error("处理 ack 回执失败, {}", ex.getMessage());
- return null;
- };
- private MessagePostProcessor delayMessagePostProcessor(long delay) {
- return message -> {
- // 小于 0 也是立即执行
- // setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的
- message.getMessageProperties().setDelay((int) Math.max(delay, 0));
- return message;
- };
- };
- private CorrelationData newCorrelationData() {
- return new CorrelationData(UUIDUtil.uuid32());
- }
- /**
- * @param exchange 交换机
- * @param routingKey routing key
- * @param msg 消息
- * @param delay 延迟时间(如果是延迟交换机,delay 才有效)
- * @param maxRetries 最大重试机会
- * @param <T> 消息的对象类型
- */
- private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
- log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
- exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
- CorrelationData correlationData = newCorrelationData();
- MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
- correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {
- private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器
- @Override
- public void accept(CorrelationData.Confirm confirm) {
- Optional.ofNullable(confirm).ifPresent(c -> {
- if(c.isAck()) {
- log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());
- } else {
- log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());
- if(retryCount >= maxRetries) {
- log.error("次数到达上限 {}", maxRetries);
- return;
- }
- retryCount++;
- log.warn("开始第 {} 次重试", retryCount);
- CorrelationData cd = newCorrelationData();
- cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
- rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
- }
- });
- }
- }, EXECUTOR);
- rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
- }
复制代码 Spring3 的 RabbitMQ Confirm,需要配置为 correlated,发送消息时提供 CorrelationData,也就是与消息关联的数据,包罗发送者确认时的回调方法

要想提供 Confirm 的回调办法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 对象(新的 JUC 工具类,可以查一查如何利用)
配置后,在未来根据回调函数进行处置惩罚(当然也可以直接设置在 RabbitTemplate 对象的 ConfirmCallBack)
还可以自己实现消息的发送者重试:
- publisher-returns,针对的是消息从互换机到队列的可靠性,成功则返回 ACK,失败触发 returns 的回调方法
- @Component
- @RequiredArgsConstructor
- @Slf4j
- public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {
- // 不存在 routing key 对应的队列,那在我看来转发到零个是合理的现象,但在这里也认为是路由失败(MQ 认为消息一定至少要进入一个队列,之后才能被处理,这就是可靠性)(反正就是回执了,你爱咋处理是你自己的事情)
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- // 可能一些版本的 mq 会因为是延时交换机,导致发送者回执,只要没有 NACK 这种情况其实并不是不可靠(其实我也不知道有没有版本会忽略)
- // 但是其实不忽略也不错,毕竟者本来就是特殊情况,一般交换机是不存储的,但是这个临时存储消息
- // 但这样也就代表了,延时后消息路由失败是没法再次处理的(因为我们交给延时交换机后就不管了,可靠性有 mq 自己保持)
- MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();
- // 这里的 message 并不是原本的 message,是额外的封装,x-delay 在 publish-returns 里面封装到 receiveDelay 里了
- Integer delay = messageProperties.getReceivedDelay();
- // 如果不是延时交换机,却设置了 delay 大于 0,是不会延时的,所以是其他原因导致的(以防万一把消息记录到日志里)
- if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {
- log.info("交换机 {}, 路由键 {} 消息 {} 延迟 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));
- return;
- }
- log.warn("publisher-returns 发送者回执(应答码{},应答内容{})(消息 {} 成功到达交换机 {},但路由失败,路由键为 {})",
- returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),
- returnedMessage.getExchange(), returnedMessage.getRoutingKey());
- }
- }
复制代码 RabbitMQSender:
- private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");
- private final RabbitTemplate rabbitTemplate;
- private final PublisherReturnsCallBack publisherReturnsCallBack;
- @PostConstruct
- public void init() {
- rabbitTemplate.setTaskExecutor(EXECUTOR);
- // 设置统一的 publisher-returns(confirm 也可以设置统一的,但最好还是在发送时设置在 future 里)
- // rabbitTemplate 的 publisher-returns 同一时间只能存在一个
- // 因为 publisher confirm 后,其实 exchange 有没有转发成功,publisher 没必要每次发送都关注这个 exchange 的内部职责,更多的是“系统与 MQ 去约定”
- rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
- }
复制代码 同理你也可以按照自己的想法进行重试…
在测试训练阶段里,这个过程是异步回调的,如果是单元测试,发送完消息进程就竣事了,可能就没回调,步伐就竣事了,自然就看不到回调时的日记
如果既没有 ACK 也没有 NACK,也没有发布者回执,那就相当于这个消息鸣金收兵了,没有任何的回应,那么就会抛出异常,我们可以处置惩罚这个异常,好比打印日记、重发之类的…
- private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
- log.error("处理 ack 回执失败, {}", ex.getMessage());
- return null;
- };
复制代码 2、持久化
消息队列的数据持久化,确保消息未消费前在队列中不会丢失,其中的互换机、队列、和消息都要做持久化
默认都是持久化的
3、消费者确认
队列的消息出队列,并不会立即删除,而是等待消费者返回 ACK 大概 NACK
消费者要什么时候发送 ACK 呢?
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处置惩罚
如果出现这种场景,就是不可靠的,所以应该是消息处置惩罚后,再发送 ACK
Spring AMQP 有三种消费者确认模式:
- manual,本事 ack,自己用 rabbitTemplate 去发送 ACK/NACK(这个比较贫困,不用 RabbitListener 担当消息才必须用这个)
- auto,配合 RabbitListener 注解,代码若出现异常,NACK,成功则 ACK
- none,获得消息后直接 ACK,无论是否实行成功
出现 NACK 后要如那里理(此过程还在我们的服务器):
- 拒绝(默认)
- 重新入队列
- 返回 ACK,消费者重新发布消息指定的互换机
- @Configuration
- @RequiredArgsConstructor
- @Slf4j
- public class MessageRecovererConfig {
- @Bean
- public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
- return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成为死信(默认)
- // return new ImmediateRequeueMessageRecoverer(); // nack、requeue
- // return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、发送给指定的交换机,confirm 机制需要设置到 rabbitTemplate 里
- }
- }
复制代码 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限定的requeue到mq队列。
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)
- retry: # 这个属于 spring amqp 的 retry 机制
- enabled: false # 不开启失败重试
- initial-interval: 1000 # 第一次重试时间间隔
- multiplier: 3 # 每次重试间隔的倍数
- max-attempts: 4 # 最大接受次数
- stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false
复制代码 解释:第一次失败,一秒后重试、第二次失败,三秒后重试,第三次失败,九秒后重试,第四次失败就没机会了(SpringAMQP会抛出异常AmqpRejectAndDontRequeueException)
失败之后根据对应的处置惩罚计谋进行处置惩罚
(2)死信互换机
消息逾期、消息实行失败而且不重试也不重新入队列,堆积过多等情况,消息会成为死信,若队列绑定死信互换机,则转发给死信互换机,若没有则直接抛弃
队列1 -> 死信互换机 -> 队列2,这个过程是消息队列内部保证的可靠性,消息也没有包罗原发送者的信息,甚至毗连已经断开了,所以没有 publisher-confirm 也没有 publisher-returns
这个机制和 republish 有点像,但是有本质的区别,republish 是消费者重发,而这里是队列将死信转发给死信互换机
死信的情况:
- nack && requeue == false
- 超时未消费
- 队列满了,由于队列的特性,队列头会先成为死信
(3)延迟功能如何实现
刚才提到死信的诞生可能是超时未消费,那么实在这个点也可以简单的实现一个延迟队列:
队列为一个不被监听的专门用来延迟消息发送的缓冲带,其死信互换机才是目标互换机,
- message.getMessageProperties().setExpiration("1000");
复制代码 设置的是逾期时间,其本意并不是延迟,是可以实现延迟~
另外,队列自己也能设置 ttl 逾期时间,但并不是队列的逾期时间(显然不合理,截止后无论啥都丢了,冤不冤啊,至少我想不到这种场景),而是队列中的消息存活的最大时间,消息的逾期时间和这个取一个最小值才是真实的逾期时间
值得留意的是,固然能实现延时消息的功能,但是
- 实现复杂
- 延迟可能不正确,由于队列的特性,如果队列头未出队列,哪怕其后者出现死信,也只能乖乖等前面的先出去之后才能前去死信互换机(例如消息的 ttl 分别为 9s、3s、1s,终极三个消息会被同时转发,由于“最长寿的”排在了前面)
这种方式的顺序优先级大于时间优先级
而 RabbitMQ 也提供了一个插件,叫 DelayExchange 延时互换机,专门用来实现延时功能
Scheduling Messages with RabbitMQ | RabbitMQ
延时互换机的声明:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue", durable = "true"),
- exchange = @Exchange(name = "delay.direct", delayed = "true"),
- key = "delay"
- ))
- public void listenDelayMessage(String msg){
- log.info("接收到delay.queue的延迟消息:{}", msg);
- }
复制代码 延时消息的发送:
- private MessagePostProcessor delayMessagePostProcessor(long delay) {
- return message -> {
- // 小于 0 也是立即执行
- message.getMessageProperties().setDelay((int) Math.max(delay, 0));
- return message;
- };
- };
复制代码 这里设置的是 Delay,不是逾期时间,哪怕超过了时间也不叫做死信
期间一直存在延时互换机的硬存里,延迟消息插件内部会维护一个本地数据库表,同时利用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在弊端。
(4)消息堆积如何办理
死信的成因还可能是堆叠过多
我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,办理方案也所有许多的
- 提高消费者的消费能力 ,可以利用多线程消费任务
- 增加更多消费者,提高消费速度,利用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
- 扩大队列容积,提高堆积上限
但是,RabbitMQ 队列占的是内存,间接性的落盘,提高上限终极的效果很有可能就是反复落库,特殊不稳固,且并没有办理消息堆积过多的问题
我们可以利用 RabbitMQ 惰性队列,惰性队列的好处重要是
- 吸收到消息后直接存入磁盘而非内存,固然慢,但没有间歇性的 page-out,性能比较稳固
- 消费者要消费消息时才会从磁盘中读取并加载到内存,正常消费后就删除了
- 基于磁盘存储,消息上限高,支持数百万条的消息存储
声明方式:
而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
- rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
复制代码 命令解读:
- rabbitmqctl :RabbitMQ的命令行工具
- set_policy :添加一个计谋
- Lazy :计谋名称,可以自定义
- "^lazy-queue$" :用正则表达式匹配队列的名字
- '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
- --apply-to queues :计谋的作用对象,是所有的队列
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(name = "xxx"),
- value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
- key = "xxx"
- ))
复制代码 互换机、队列扩展属性叫参数,消息的拓展属性叫头部,扩展属性一样平常都以 x- 开头(extra)
消息堆积问题的办理方案?
- 队列上绑定多个消费者,提高消费速度
- 利用惰性队列,可以再mq中生存更多消息
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的 page-out,性能比较稳固
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
(5)高可用如何保证
RabbitMQ 在服务大规模项目时,一样平常情况下不会像数据库那样存储的瓶颈,用惰性队列已经是很顶天了的,其特性和用途不会有太极端的存储压力
更多的是在并发情况下,处置惩罚消息的能力有瓶颈,可能出现节点宕机的情况,而克制单节点宕机,数据丢失、无法提供服务等问题需要办理,也就是需要保证高可用性
Erlang 是一种面向并发的语言,天然支持集群模式,RabbitMQ 的集群有两种模式:
- 平凡集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
- 镜像集群:是一种主从集群,在平凡集群的基础上,添加了主从备份的功能,提高集群的数据可用性
镜像集群固然支持主从,但主从同步并不是强同等的,某些情况下可能有数据丢失的风险(固然重启能办理,但那不是强同等,而是终极同等),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁队列来取代镜像集群,底层接纳 Raft 协议确保主从的数据同等性
1、平凡集群
各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):
但不包罗队列中的消息(动态的数据不同步)
监听队列的时候,如果监听的节点不存在该队列(只是知道元数据),当前节点会访问队列地点的节点,该节点返回数据到当前节点并返回给监听者
队列地点节点宕机,队列中的消息就会“丢失”(是在重启之前,这个消息就消失无法被处置惩罚的意思)
如何部署,上网搜搜就行
2、镜像集群
各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):
本质是主从模式,创建队列的节点为主节点,其他节点为镜像节点,队列中的消息会从主节点备份到镜像节点中
留意
- 像 Redis 那样的主从集群,同步都是全部同步来着
- 但 RabbitMQ 集群的主从模式比较特殊,他的粒度是队列,而不是全部
也就是说,一个队列的主节点,可能是另一个队列的镜像节点,所以分析某个场景的时候,要确认是哪个队列,单独进行观察分析讨论
针对某一个队列,所有写操作都在主节点完成,然后同步给镜像节点,读操作任何一个都 ok
主节点宕机,镜像节成为新的主节点
镜像集群有三种模式:
- exactly 正确模式,指定副本数 count = 主节点数 1 + 镜像节点数,集群会尽可能的维护这个数值,如果镜像节点出现故障,就在另一个节点上创建镜像,比较发起这种模式,可以设置为 N/2 + 1
- all 全部模式,count = N,主节点外全部都是镜像节点
- nodes 模式,指定镜像节点名称列表,随机一个作为主节点,如果列表里的节点都不存在或不可用,则创建队列时的节点作为主节点,之后访问集群,列表中的节点若存在才会创建镜像节点
没有镜像节点实在就相当于平凡模式了
如何配置上网搜搜就行,比较贫困,需要设置计谋,以及匹配的队列(不同队列分开来讨论,可以设置不同的计谋)
3、仲裁队列
RabbitMQ 3.8 以后,推出了新的功能仲裁队列来
- 取代镜像集群,都是主从模式,支持主从数据同步,默认是 exactly count = 5
- 约定大于配置,利用非常简单没有复杂的配置,队列的范例选择 Quorum 即可
- 底层接纳 Raft 协议确保主从的数据强同等性
Spring Boot 配置:
仲裁队列声明:
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange(name = "xxx"),
- value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),
- key = "xxx"
- ))
复制代码 队列不声明默认就是平凡集群,这里声明的仲裁队列也只是针对一个队列
(6)消息重复消费问题
在保证MQ消息不重复的情况下,MQ 的一条消息被消费者消费了多次
消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常大概是服务宕机,MQ 迟迟没有吸收到 ACK 也没有 NACK,此时 MQ 不会将发送的消息删除,按兵不动,消费者重新监听大概有其他消费者的时候,交由它消费,而这条消息如果在之前就消费过了的话,则会导致重复消费
办理方案:
- 消息消费的业务自己具有幂等性,再次处置惩罚雷同消息时不会产生副作用,一些时候可能需要用到分布式锁去维护幂等性
- 好比一个订单的状态设置为竣事,那重复消费的效果同等
- 记录消息的唯一标识,如果消费过了的,则不再消费
- 消费成功将 id 缓存起来,消费时查询缓存里是否有这条消息
- 设置答应的缓存时间时,你不必想得太极端,一样平常很快就有消费者继续监听拿到消息,哪怕真有谁人情况,这里带来的损失大概率可以忽略不记了,一切要联合实际情况!
偶然候两种方案没有严酷的界定
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |