MQ消息丢失办理方案

[复制链接]
发表于 2025-10-21 04:07:56 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
目次
生产者丢失:
方法一: 发送数据之前开启RabbitMQ 变乱channel.txSelect(不保举
方法二:开启confirm机制 (保举,异步
代码演示:
MQ中丢失:
  方法一:开启 RabbitMQ 的恒久化
设置恒久化有两个步调:
消耗端丢失:
方法一: 关闭自动ack,开启手动ack
自动ack和手动ack区别:
自动ack:
  手动ack:
完备代码演示:

在MQ当中消息丢失有几种差异的场景,大概出现在生产者、MQ、消耗者中。

生产者丢失:

方法一: 发送数据之前开启RabbitMQ 变乱channel.txSelect(不保举

但是这种方法是不保举的,由于变乱机制是同步的,提交一个变乱后会壅闭,大大低沉了体系的吞吐量,消耗性能

方法二:开启confirm机制 (保举,异步

      
  • 在生产者那边设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,  
  • 然后假如写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。  
  • 假如 RabbitMQ 没能处置惩罚这个消息,会回调你一个nack接口,告诉你这个消息吸收失败,你可以重试。   
  • 可以团结这个机制本身在内存里维护每个消息 id 的状态,假如高出肯定时间还没吸收到这个消息的回调,那么你可以重发。
代码演示:

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.ConfirmCallback;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.util.Collections;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import java.util.concurrent.ConcurrentHashMap;
  9. import java.util.concurrent.TimeoutException;
  10. public class ConfirmProducer {
  11.     private static final String QUEUE_NAME = "confirm_queue";
  12.    
  13.     // 维护消息 ID 和消息内容的映射
  14.     private static final Map<Long, String> messageMap = new ConcurrentHashMap<>();
  15.     // 存储未确认的消息 ID
  16.     private static final Set<Long> unconfirmedSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
  17.     public static void main(String[] args) throws Exception {
  18.         ConnectionFactory factory = new ConnectionFactory();
  19.         factory.setHost("localhost"); // 连接 RabbitMQ 服务器
  20.         try (Connection connection = factory.newConnection();
  21.              Channel channel = connection.createChannel()) {
  22.             
  23.             // 声明队列
  24.             channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  25.             // 开启 confirm 模式
  26.             channel.confirmSelect();
  27.             // 处理确认消息的回调
  28.             ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  29.                 if (multiple) {
  30.                     unconfirmedSet.removeIf(tag -> tag <= deliveryTag);
  31.                 } else {
  32.                     unconfirmedSet.remove(deliveryTag);
  33.                 }
  34.                 messageMap.remove(deliveryTag);
  35.                 Syst
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表