马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
目次
生产者丢失:
方法一: 发送数据之前开启RabbitMQ 变乱channel.txSelect(不保举
方法二:开启confirm机制 (保举,异步
代码演示:
MQ中丢失:
方法一:开启 RabbitMQ 的恒久化
设置恒久化有两个步调:
消耗端丢失:
方法一: 关闭自动ack,开启手动ack
自动ack和手动ack区别:
自动ack:
手动ack:
完备代码演示:
在MQ当中消息丢失有几种差异的场景,大概出现在生产者、MQ、消耗者中。
生产者丢失:
方法一: 发送数据之前开启RabbitMQ 变乱channel.txSelect(不保举
但是这种方法是不保举的,由于变乱机制是同步的,提交一个变乱后会壅闭,大大低沉了体系的吞吐量,消耗性能。
方法二:开启confirm机制 (保举,异步
- 在生产者那边设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,
- 然后假如写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。
- 假如 RabbitMQ 没能处置惩罚这个消息,会回调你一个nack接口,告诉你这个消息吸收失败,你可以重试。
- 可以团结这个机制本身在内存里维护每个消息 id 的状态,假如高出肯定时间还没吸收到这个消息的回调,那么你可以重发。
代码演示:
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConfirmCallback;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.util.Collections;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.TimeoutException;
- public class ConfirmProducer {
- private static final String QUEUE_NAME = "confirm_queue";
-
- // 维护消息 ID 和消息内容的映射
- private static final Map<Long, String> messageMap = new ConcurrentHashMap<>();
- // 存储未确认的消息 ID
- private static final Set<Long> unconfirmedSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost"); // 连接 RabbitMQ 服务器
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
-
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- // 开启 confirm 模式
- channel.confirmSelect();
- // 处理确认消息的回调
- ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
- if (multiple) {
- unconfirmedSet.removeIf(tag -> tag <= deliveryTag);
- } else {
- unconfirmedSet.remove(deliveryTag);
- }
- messageMap.remove(deliveryTag);
- Syst
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|