RabbitMQ之消费者ACK 功能

铁佛  金牌会员 | 2024-7-17 10:36:39 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 542|帖子 542|积分 1626

什么是消费者ACK?
简单说就是 消息确认机制 mq要包管消息能可靠的到达消费者。消费者在消费是是可以指定autoAck参数(自动/手动)的方式告诉mq本身是否确认收到消息。
当 autoAck 参数为 false 时, 队列中的消息分成了两部门: 一部门是等待投递给消费者的消息;一部门是已经投递给消费者,但是还没有收到消费者确认信号的消息。 若 RabbitMQ 服务器端不停没有收到消费者简直认信号,而且消费此消息的消费者已经断开连接, 则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
如下图

可以看到当前队列中的 “Ready” 状态和 “Unacked” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。
上述分析 消息必要确认后消息将会被释放掉 从磁盘/内存中。
所以消息确认分类有两大类 如下图

由上图可知
有发送方确认、接收方确认。
注意:
其中发送方确认又分为:生产者到互换器到确认、互换器到队列简直认
本节主要介绍 消费者消息确认
方式一,自动确认。
方式二,手动确认。
对于自动确认的方式,RabbitMQ Broker 只要将消息写入到 TCP Socket 中乐成,就认为该消息投递乐成,而无需 Consumer 手动确认。
对于手动确认的方式,RabbitMQ Broker 将消息发送给 Consumer 之后,由 Consumer 手动确认之后,才任务消息投递乐成。
实际场景下,因为自动确认存在可能丢失消息的情况,所以在对可靠性有要求的场景下,我们基本采用手动确认。当然,假如允许消息有一定的丢失,对性能有更高的产经下,我们可以思量采用自动确认。
进行实战演练
yml配置类
  1. spring:
  2.   rabbitmq:
  3.           #host为一般模式 若集群模式 将key换成addresses的形式
  4.     host: 192.168.9.104
  5.     port: 5672
  6.     #账号密码自行替换
  7.     username: admin
  8.     password: admin
  9.     # 以下手动提交消息
  10.     listener:
  11.    
  12.       simple:
  13.         acknowledge-mode: manual
  14.       direct:
  15.         acknowledge-mode: manual
复制代码
以上ackoneledge-mode的值有如下:
NONE 对应 Consumer 的自动确认
MANUAL 对应 Consumer 的手动确认,由开发者在消费逻辑中,手动进行确认。
AUTO 对应 Consumer 的手动确认,在消费消息完成(包括正常返回、和抛出非常)后,由 Spring-AMQP 框架来“自动”进行确认。
以下为逻辑代码演示
  1. ================》Direct Exchange 配置
  2. @Configuration
  3. public class DirectExchangeConfiguration {
  4.     /**
  5.      * 创建一个 Queue
  6.      *
  7.      * @return Queue
  8.      */
  9.     @Bean
  10.     public Queue queue05() {
  11.         // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除
  12.         return new Queue(
  13.                 Message05.QUEUE,
  14.                 true,
  15.                 false,
  16.                 false);
  17.     }
  18.     /**
  19.      * 创建 Direct Exchange
  20.      *
  21.      * @return DirectExchange
  22.      */
  23.     @Bean
  24.     public DirectExchange exchange05() {
  25.         // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
  26.         return new DirectExchange(Message05.EXCHANGE,
  27.                 true,
  28.                 false);
  29.     }
  30.     /**
  31.      * 创建 Binding
  32.      * Exchange:Message05.EXCHANGE
  33.      * Routing key:Message05.ROUTING_KEY
  34.      * Queue:Message05.QUEUE
  35.      *
  36.      * @return Binding
  37.      */
  38.     @Bean
  39.     public Binding binding05() {
  40.         return BindingBuilder
  41.                 .bind(queue05()).to(exchange05())
  42.                 .with(Message05.ROUTING_KEY);
  43.     }
  44. =======================================》 direct 类型的消息对象
  45. @Data
  46. public class Message05 implements Serializable {
  47.     public static final String QUEUE = "QUEUE_05";
  48.     public static final String EXCHANGE = "EXCHANGE_05";
  49.     public static final String ROUTING_KEY = "ROUTING_KEY_05";
  50.     private String id;
  51. }
  52. =======================================》 生产者逻辑
  53. @Component
  54. public class Producer05 {
  55.     @Resource
  56.     private RabbitTemplate rabbitTemplate;
  57.     public void syncSend(String id) {
  58.         // 创建 Message05 消息
  59.         Message05 message = new Message05();
  60.         message.setId(id);
  61.         // 同步发送消息
  62.         rabbitTemplate.convertAndSend(Message05.EXCHANGE, Message05.ROUTING_KEY, message);
  63.     }
  64. }
  65. =======================================》 消费者逻辑
  66. @Component
  67. @RabbitListener(queues = Message05.QUEUE)
  68. @Slf4j
  69. public class Consumer05 {
  70.     /**
  71.      *  ack 模式 ,
  72.      * 这里需要和 application.yml 中的 acknowledge-mode: manual  对应一起使用
  73.      */
  74.     @RabbitHandler
  75.     public void onMessageAck(Message05 message01, Message message, Channel channel) throws IOException {
  76.         try {
  77.             log.info("[Consumer05 onMessageAck][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message01);
  78.             //  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
  79.             long deliveryTag = message.getMessageProperties().getDeliveryTag();
  80.             // 取当前时间,达到一个随机效果,测试的话可以多跑几次试试
  81.             if (System.currentTimeMillis() % 2 == 1) {
  82.                 // 通知 MQ 消息已被成功消费,可以ACK了
  83.                 // 第二个参数 multiple ,用于批量确认消息,为了减少网络流量,手动确认可以被批处。
  84.                 // 1. 当 multiple 为 true 时,则可以一次性确认 deliveryTag 小于等于传入值的所有消息
  85.                 // 2. 当 multiple 为 false 时,则只确认当前 deliveryTag 对应的消息
  86.                 channel.basicAck(deliveryTag, false);
  87.                 log.info("[Consumer05 onMessageAck][正常ack:{}]", message01);
  88.             } else {
  89.                 log.info("[Consumer05 onMessageAck][未ack:{}]", message01);
  90.                 throw new RuntimeException("手动异常");
  91.             }
  92.         } catch (Exception e) {
  93.             // 处理失败,重新压入MQ
  94.             channel.basicRecover();
  95.             log.info("[Consumer05 onMessageAck][消息重新压入MQ:{}]", message01);
  96.         }
  97.     }
  98. =======================================》测试类
  99. @Test
  100.     void syncSend() {
  101.         String id = UUID.randomUUID().toString();
  102.         producer05.syncSend(id);
  103.         log.info("[test producer05 syncSend][id:{}] 发送成功", id);
  104.         TimeUnit.SECONDS.sleep(2);
  105.     }
复制代码
以上的是消费者ACK实现的代码 若不相识rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

铁佛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表