RabbitMQ高级特性--消息确认机制

打印 上一主题 下一主题

主题 1884|帖子 1884|积分 5652

目次


一、消息确认
1.消息确认机制
2.手动确认方法
二、代码示例
1. AcknowledgeMode.NONE
1.1 配置文件
1.2 生产者
 1.3 消费者
1.4 运行程序 
 2.AcknowledgeMode.AUTO
3.AcknowledgeMode.MANUAL





一、消息确认

1.消息确认机制

生产者发送消息之后,到达消费端之后,大概会有以下环境:
1. 消息处理成功;
2. 消息处理异常。

RabbitMQ向消费者发送消息后,就会把这条消息删除掉,那么第二种环境就会造成消息丢失。
那么如何确保消息端已经被成功接收了并且被正确处理了呢?
为了确保消息从队列可靠的到达消费者,RabbitMQ提供了消息确认机制(Messageacknowledment)。
消费者在订阅队列时,可以指定autoAck参数,根据这个参数,消息确认机制分为以下两种:
自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(大概磁盘)中删除,而不管消费者是否真正的接收到消息,自动确认模式实用于对于消息可靠性要求不高的场景。
手动确认:当autoAck等于false时,RabbitMQ会等候消费者显式的调用BasicAck命令,复兴确认信号后才从内存(大概磁盘)中删除,这种方式实用于对消息可靠性要求较高的场景。
自动确认代码示例:
  1. DefaultConsumer consumer = new DefaultConsumer(channel) {
  2. @Override
  3. public void handleDelivery(String consumerTag, Envelope envelope,
  4. AMQP.BasicProperties properties, byte[] body) throws IOException {
  5. System.out.println("接收到消息: " + new String(body));
  6. }
  7. };
  8. channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
复制代码
当autoAck参数置为false,对于RabbitMQ服务器来说,队列中的消息分为了两个部门:
一是等候发送给消费者的消息;二是已经发送给消费者,但是还没收到消费者确认信号的消息。
如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开毗连,则RabbitMQ会重新安排这条消息进入队列,等候投递给下一个消费者,当然也有大概是原来的那个消费者。

从RabbitMQ的Web管理平台上也可以看到当前队列中Ready状态和Unacked状态的消息数。 

Ready:等候投递给消费者的消息数。
Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数。
2.手动确认方法

消费者在收到消息后,可以选择确认,也可以选择跳过大概直接拒绝确认,RabbitMQ也提供了不同的确认方法,消费者客户端可以调用与其对应的channel的干系方法,共有以下三种:
  1. 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple);
复制代码
RabbitMQ 已经知道该消息并且成功的处理消息,可以将其抛弃。
参数说明:
deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值,deliveryTag是每个信道(Channel)独立维护的,以是在每个信道上都是唯一的,当消费者确认(ack)一条消息时,必须使用对应的信道进行确认。
multiple:是否批量确认,在某些环境下,为了减少网络流量,可以对一系列一连的deliveryTag进行批量确认,值为true则会一次性ack以是小于等于指定deliveryTag的消息,值为false,则只确认当前deliveryTag的消息。

   deliveryTag   是RabbitMQ中消息确认机制的⼀个告急组成部门, 它确保了消息传递的可靠性和顺     序性。   
  1. 否定确认: Channel.basicReject(long deliveryTag, boolean requeue);
复制代码
参数说明:
  deliveryTag:参考上文。
  requeue:表现拒绝后,这条消息该如何处理,如果值为true那么,则RabbitMQ会将这条消息重新入队,重新发送给下一个订阅的消费者,值为false,则RabbitMQ会把这条消息从队列中移除,不会再发送给消费者。
  1. 否定确认: Channel.basicNack(long deliveryTag, boolean multiple,
  2. boolean requeue);
复制代码
参数说明:
  参考上文 
  multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
  二、代码示例

  我们基于SpringBoot来演示消息的确认机制,使用方式和方法与RabbitMQ Java Client有一定差别,
  Spring AMQP对消息确认提供了三种策略:
  1. public enum AcknowledgeMode {
  2. NONE,
  3. MANUAL,
  4. AUTO;
  5. }
复制代码
    .    AcknowledgeMode.NONE:         这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认        消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息大概会丢失.              AcknowledgeMode.AUTO(默认):         这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确        认消息.              AcknowledgeMode.MANUAL:         ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消        息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这        种模式提⾼了消息处理的可靠性, 因为纵然消费者处理消息后失败, 消息也不会丢失, ⽽是可以被        重新处理.   
1. AcknowledgeMode.NONE

1.1 配置文件

  1. spring:
  2. rabbitmq:
  3. addresses: amqp:
  4. listener:
  5. simple:
  6. acknowledge-mode: none
复制代码
1.2 生产者

  1. public class Constant {
  2. public static final String ACK_EXCHANGE_NAME = "ack_exchange";
  3. public static final String ACK_QUEUE = "ack_queue";
  4. }
复制代码
  1. /*
  2. 以下为消费端⼿动应答代码⽰例配置
  3. */
  4. @Bean("ackExchange")
  5. public Exchange ackExchange(){
  6. return
  7. ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
  8. ;
  9. }
  10. //2. 队列
  11. @Bean("ackQueue")
  12. public Queue ackQueue() {
  13. return QueueBuilder.durable(Constant.ACK_QUEUE).build();
  14. }
  15. //3. 队列和交换机绑定 Binding
  16. @Bean("ackBinding")
  17. public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
  18. @Qualifier("ackQueue") Queue queue) {
  19. return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
  20. }
复制代码
  1. import com.xiaowu.rabbitmq.constant.Constant;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. @RequestMapping("/producer")
  8. public class ProductController {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. @RequestMapping("/ack")
  12. public String ack(){
  13. rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",
  14. "consumer ack test...");
  15. return "发送成功!";
  16. }
  17. }
复制代码
 1.3 消费者


  1. import com.xiaowu.rabbitmq.constant.Constant;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class AckQueueListener {
  8. //指定监听队列的名称
  9. @RabbitListener(queues = Constant.ACK_QUEUE)
  10. public void ListenerQueue(Message message, Channel channel) throws
  11. Exception {
  12. System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
  13. String(message.getBody(),"UTF-8"),
  14. message.getMessageProperties().getDeliveryTag());
  15. //模拟处理失败
  16. int num = 3/0;
  17. System.out.println("处理完成");
  18. }
  19. }
复制代码
1.4 运行程序 

启动生产者可以从RabbitMQ Web管理界面看到如下:

再启动消费者,控制台输出:
  1. 接收到消息: consumer ack test..., deliveryTag: 1
  2. 2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1]
  3. s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
  4. listener failed.
  5. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
  6. Listener method 'public void
  7. com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
  8. mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
  9. threw exception
  10. //....
复制代码
管理界面:

可以看到消息处理失败但是消息已经从管理界面移除。 
 2.AcknowledgeMode.AUTO

将配置文件修改为:
  1. spring:
  2. rabbitmq:
  3. addresses: amqp:
  4. listener:
  5. simple:
  6. acknowledge-mode: auto
复制代码
再次启动程序,控制台不绝输堕落误信息:
  1. 接收到消息: consumer ack test..., deliveryTag: 1
  2. 2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1]
  3. s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
  4. listener failed.
  5. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
  6. Listener method 'public void
  7. com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
  8. mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
  9. threw exception
  10. 接收到消息: consumer ack test..., deliveryTag: 2
  11. 2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1]
  12. s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
  13. listener failed.
  14. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
  15. Listener method 'public void
  16. com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
  17. mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
  18. threw exception
  19. 接收到消息: consumer ack test..., deliveryTag: 3
  20. 2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1]
  21. s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
  22. listener failed.
  23. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
  24. Listener method 'public void
  25. com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
  26. mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
  27. threw exception
  28. 接收到消息: consumer ack test..., deliveryTag: 4
  29. 2024-04-29T17:07:09.254+08:00 WARN 16488 --- [ntContainer#0-1]
  30. s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
  31. listener failed.
  32. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
  33. Listener method 'public void
  34. com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
  35. mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
  36. threw exception
复制代码
  从⽇志上可以看出, 当消费者出现异常时, RabbitMQ会不绝的重发. 由于异常,多次重试还是失败,消 息没被确认,也无法nack,就⼀直是unacked状态,导致消息积压。   3.AcknowledgeMode.MANUAL

  1. spring:
  2. rabbitmq:
  3. addresses: amqp:
  4. listener:
  5. simple:
  6. acknowledge-mode: manual
复制代码
消费者手动确认逻辑:
  1. import com.xiaowu.rabbitmq.constant.Constant;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class AckQueueListener {
  8. //指定监听队列的名称
  9. @RabbitListener(queues = Constant.ACK_QUEUE)
  10. public void ListenerQueue(Message message, Channel channel) throws
  11. Exception {
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. try {
  14. //1. 接收消息
  15. System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
  16. String(message.getBody(),"UTF-8"),
  17. message.getMessageProperties().getDeliveryTag());
  18. //2. 处理业务逻辑
  19. System.out.println("处理业务逻辑");
  20. //⼿动设置⼀个异常, 来测试异常拒绝机制
  21. // int num = 3/0;
  22. //3. ⼿动签收
  23. channel.basicAck(deliveryTag, true);
  24. } catch (Exception e) {
  25. //4. 异常了就拒绝签收
  26. //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
  27. 则直接丢弃
  28. channel.basicNack(deliveryTag, true,true);
  29. }
  30. }
  31. }
复制代码
 这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0。
异常时拒绝:
  1. import com.xiaowu.rabbitmq.constant.Constant;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class AckQueueListener {
  8. //指定监听队列的名称
  9. @RabbitListener(queues = Constant.ACK_QUEUE)
  10. public void ListenerQueue(Message message, Channel channel) throws
  11. Exception {
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. try {
  14. //1. 接收消息
  15. System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
  16. String(message.getBody(),"UTF-8"),
  17. message.getMessageProperties().getDeliveryTag());
  18. //2. 处理业务逻辑
  19. System.out.println("处理业务逻辑");
  20. //⼿动设置⼀个异常, 来测试异常拒绝机制
  21. int num = 3/0;
  22. //3. ⼿动签收
  23. channel.basicAck(deliveryTag, true);
  24. } catch (Exception e) {
  25. //4. 异常了就拒绝签收
  26. //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
  27. 则直接丢弃
  28. channel.basicNack(deliveryTag, true,true);
  29. }
  30. }
  31. }
复制代码
  运⾏结果: 消费异常时不绝重试, deliveryTag 从1递增     控制台日志:   
  1. 接收到消息: consumer ack test..., deliveryTag: 1
  2. 处理业务逻辑
  3. 接收到消息: consumer ack test..., deliveryTag: 2
  4. 处理业务逻辑
  5. 接收到消息: consumer ack test..., deliveryTag: 3
  6. 处理业务逻辑
  7. 接收到消息: consumer ack test..., deliveryTag: 4
  8. 处理业务逻辑
  9. 接收到消息: consumer ack test..., deliveryTag: 5
  10. 处理业务逻辑
  11. 接收到消息: consumer ack test..., deliveryTag: 6
  12. 处理业务逻辑
复制代码
管理页面上unacked也是1:
  

   







免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表