RabbitMQ实现保证消息可靠性

打印 上一主题 下一主题

主题 926|帖子 926|积分 2778

本篇文章不再介绍RabbitMQ详细实现原理,直接介绍如何保证消息的可靠性题目。所谓可靠性,指消息不重不漏。
生产者消费者模子
  生产者-消费者模子用于描述两类进程(生产者和消费者)之间的数据交互。可以被认为是独立的服务,生产者负责生成数据,消费者负责处置惩罚这些数据。在分布式体系中,队列在其中扮演了消息(数据)传递的功能。

关于消息队列的作用,一般解读为:
解耦:生产者和消费者独立运作,无需知道对方的运行状态。
异步:并非实时,生产者不必关注消费端的消费情况。
削峰:限定流量,防止消费者过载。
消息丢失
  这其实不难懂白,就像生存中下单-快递-签收的过程。这个过程和上边的生产者-消费者模子恰有异曲同工之妙。

这个过程中,
下单用户(生产者)
快递小哥(队列)
签收人(消费者)
快件(消息)
假如包裹被大略的认为是一条消息,那么快件在邮寄过程中丢失了,就是消息丢失。快件从发货到签收,我们不用去关心中间发生了什么。但是要是充公到货,那得给我个理由。
如何排查?
  就上边的快件丢失题目,怎么知道快递为何没有收到?很简朴,一段一段的排查:
商家是否有发货?
快递公司是否揽收?
查察快递小哥是否放入代收点
相应的,假如生产环境中突然发现诸如:告警、服务宕机、数据流转非常等题目时,我们也会在链路上(A、B、C三处)逐一排查。
产生缘故因由及办理方案
1、生产端可靠性投递
为确保消息从生产端可靠地投递到RabbitMQ,我们必要思量以下几个关键点:
网络故障:消息可能在传输过程中因网络题目而丢失。
RabbitMQ故障:假如RabbitMQ宕机,消息也可能丢失。

对应办理方案:
开启事务机制
事务在RabbitMQ中可能会影响性能,因为它们必要在全部节点上同步状态。因此,RabbitMQ尽量克制利用事务。焦点代码:
  1. private static void executeTransaction(Channel channel) throws IOException {
  2.         boolean transactionSuccess = false;
  3.         try {
  4.             // 开启事务
  5.             channel.txSelect();
  6.             // 执行一系列消息操作,例如:channel.basicPublish(exchange, routingKey, message);
  7.             // 提交事务
  8.             channel.txCommit();
  9.             transactionSuccess = true;
  10.         } catch (ShutdownSignalException | IOException e) {
  11.             // 回滚事务
  12.             if (!transactionSuccess) {
  13.                 channel.txRollback();
  14.             }
  15.             throw e;
  16.         }
  17.     }
复制代码
生产者确认机制
发布者确认机制答应发布者知道消息是否已经被RabbitMQ成功接收:
  1. public static void sendPersistentMessage(String host, String queueName, String message) {
  2.         try (Connection connection = new ConnectionFactory().setHost(host).newConnection();
  3.              Channel channel = connection.createChannel()) {
  4.             // 启用发布者确认
  5.             channel.confirmSelect();
  6.            // 将消息设置为持久化
  7.             AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  8.                     .deliveryMode(2)
  9.                     .build();
  10.                     
  11.             // 添加确认监听器
  12.             channel.addConfirmListener(new ConfirmListener() {
  13.                 @Override
  14.                 public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  15.                     System.out.println("消息已确认: " + deliveryTag);
  16.                     // 消息正确到达Broker时的处理逻辑
  17.                 }
  18.                 @Override
  19.                 public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  20.                     System.out.println("消息未确认: " + deliveryTag);
  21.                     // 因为内部错误导致消息丢失时的处理逻辑
  22.                 }
  23.             });
  24.             channel.basicPublish("", queueName, properties, message.getBytes());
  25.             // 等待消息确认,或者超时
  26.             boolean allConfirmed = channel.waitForConfirms();
  27.             
  28.             if (allConfirmed) {
  29.                 //所有消息都已确认
  30.             } else {
  31.                 //超时或其它
  32.             }
  33.            
  34.         } catch (IOException | TimeoutException | InterruptedException e) {
  35.             e.printStackTrace();
  36.         }
  37. }
复制代码
2、消息持久化
在RabbitMQ中,消息的持久化它确保消息不仅存储在内存中,而且也安全地保存在磁盘上。这样,纵然在RabbitMQ服务崩溃或重启的情况下,消息也不会丢失,可以从磁盘恢复。

消息到达RabbitMQ后通过Exchange交换机,路由给queue队列,末了发送给消费端。

从RabbitMQ设计上看,消息的持久化应该从以下方面入手:
Exchange持久化:
  1. // 设置 durable = true;
  2. channel.exchangeDeclare(exchangeName, "direct", durable);
复制代码
消息持久化:
  1. // 设置 MessageProperties.PERSISTENT_TEXT_PLAIN
  2. channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
复制代码
Queue持久化:
  1. //设置 boolean durable = true;
  2. channel.queueDeclare(queueName, durable, exclusive, false, null);
复制代码
这样,假如RabbitMQ收到消息后挂了,重启后会自行从磁盘上恢复消息。
3、消费者确认机制
假如上述生产端、消息队列都精确投递,那么题目出现在消费端是否可以精确消费?

消费者在成功处置惩罚了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。
默认情况下,以下3种缘故因由导致消息丢失:
1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;
2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;
3、 处置惩罚过程中服务宕机:消费端精确接收到消息,但在处置惩罚消息的过程中发生非常或宕机了,消息也会丢失。
这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处置惩罚完,就立刻删除这条消息,导致消息丢失。

应对方案
将自动ack机制改为手动ack机制。
  1. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  2.     try {
  3.         //接收消息,业务处理
  4.         //设置手动确认
  5.         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  6.     } catch (Exception e) {
  7.         //发生异常时,可以选择重新发送消息或进行错误处理
  8.         // 例如,可以选择负确认(nack),让消息重回队列
  9.         // channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
  10.     }
  11. };
  12. //设置autoAck为false,表示关闭自动确认机制,改为手动确认
  13. channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
复制代码
4、消息补偿机制
以上3种办理办法理论上可靠,但是体系的非常或者故障比较偶然,我们没法做到100%消息不丢失。因此必要加入补偿机制或者人工干预。这是我们的末了一道防线。
如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:

生产端发送消息;
确认失败,将消息保存到数据库中,并设置初始状态0;
定时任务以一定频率扫描数据库中status=0 的消息(失败消息);
重发消息,可多次;
重发成功,更新数据库:status=1;
凌驾固定次数重发仍然失败,人工干预。
标注:
凌驾最大失败次数后,对于无法被正常消费的消息可移入死信队列。
可人工干预手动排查
也可自动重试,必要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处置惩罚等。
以上,我们知道了消息丢失题目如那边理?那么对于消息重复的题目,下面做个介绍。
消息重复消费
消息重复消费是指在消息队列中,同一条消息被不同的消费者多次消费处置惩罚。
产生缘故因由:
网络题目:消费者处置惩罚完消息后,因网络题目导致确认信息未能成功发送回消息队列。
服务停止:消费者在确认消息之前服务崩溃,消息队列未收到确认信号。
确认机制:自动确认模式下,假如确认在消息处置惩罚完成前发生,消息可能会被重复消费
对应办理方案:
1. 幂等性设计
设计消费者的消息处置惩罚逻辑时,要保证纵然消息被多次消费,也不会对体系状态产生不良影响。幂等性可以通过以下方式实现:
数据库唯一约束:利用数据库的主键约束或唯一索引防止插入重复记载。
业务逻辑检查:在执行业务操作前,先检查是否已经处置惩罚过该消息。
2. 消息去重策略
利用唯一标识符(如订单号、massageID)来识别消息,并在消费者中实现去重逻辑:
缓存检查:利用内存缓存(如Redis)存储已处置惩罚的消息ID。
持久化存储:将消息ID与处置惩罚状态保存在数据库中,以便跨服务重启后仍然有效。
3. 手动确认与重试机制
通过手动确认消息,控制消息何时从队列中移除:
手动确认:在消息成功处置惩罚后,显式调用channel.basicAck()方法确认消息。
重试机制:假如消息处置惩罚失败,可以选择将消息重新入队(channel.basicReject(requeue=true))或扬弃(channel.basicReject(requeue=false))。
代码演示:
消费者端去重逻辑
  1. @RabbitListener(queues = "queueName", acknowledgeMode = "MANUAL")
  2. public void receiveMessage(Message message, Channel channel) throws IOException {
  3.     String messageId = message.getMessageProperties().getMessageId();
  4.    
  5.     // 检查消息是否已消费
  6.     if (messageAlreadyProcessed(messageId)) {
  7.         // 消息已消费,确认消息并返回
  8.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  9.         return;
  10.     }
  11.    
  12.     // 处理消息
  13.     try {
  14.         processMessage(message);
  15.         // 消息处理成功,持久化消息ID并确认消息
  16.         persistMessageId(messageId);
  17.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  18.     } catch (Exception e) {
  19.         // 处理失败,可以选择重新入队或丢弃
  20.         boolean requeue = shouldRequeue(message);
  21.         channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);
  22.     }
  23. }
复制代码
生产者端发布确认
  1. void sendWithConfirm(AmqpTemplate amqpTemplate, Message message) throws IOException {
  2.     ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
  3.         if (!ack) {
  4.             // 处理消息发送失败的逻辑
  5.             // ...
  6.         }
  7.     };
  8.     amqpTemplate.setConfirmCallback(confirmCallback);
  9.     amqpTemplate.convertAndSend("exchangeName", "routingKey", message);
  10. }
复制代码
详细实现必要根据现实业务逻辑和RabbitMQ配置举行调整。
总结
以上介绍了RabbitMQ保证消息可靠性的题目、产生缘故因由、办理方案等。不足之处,欢迎指正。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表