RabbitMQ手动ACK与死信队列

打印 上一主题 下一主题

主题 529|帖子 529|积分 1587

为了包管消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。
默认环境下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ本身又没有这条消息了。以是在现实项目中会利用手动Ack。
1、手动应答


  • Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
  • Channel.basicNack (用于否定确认)
  • Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
消费者端的配置,相关属性值改为本身的:
  1. server.port=8082
  2. #rabbitmq服务器ip
  3. spring.rabbitmq.host=localhost
  4. #rabbitmq的端口
  5. spring.rabbitmq.port=5672
  6. #用户名
  7. spring.rabbitmq.username=lonewalker
  8. #密码
  9. spring.rabbitmq.password=XX
  10. #配置虚拟机
  11. spring.rabbitmq.virtual-host=demo
  12. #设置消费端手动 ack   none不确认  auto自动确认  manual手动确认
  13. spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码
修改消费代码:请勿复制利用,会卡死
  1. package com.example.consumer.service;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.example.consumer.entity.User;
  4. import com.rabbitmq.client.Channel;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Service;
  9. import java.io.IOException;
  10. /**
  11. * @description:
  12. * @author: LoneWalker
  13. * @create: 2022-04-04
  14. **/
  15. @Service
  16. @Slf4j
  17. public class ConsumerService {
  18.     @RabbitListener(queues ="publisher.addUser")
  19.     public void addUser(String userStr,Channel channel,Message message){
  20.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  21.         try {
  22.             log.info("我一直在重试");
  23.             int a = 1/0;
  24.             User user = JSONObject.parseObject(userStr,User.class);
  25.             log.info(user.toString());
  26.             //手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
  27.             channel.basicAck(deliveryTag,false);
  28.         } catch (Exception e) {
  29.             //手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
  30.             try {
  31.                 channel.basicNack(deliveryTag,false,true);
  32.             } catch (IOException ex) {
  33.                 ex.printStackTrace();
  34.             }
  35.         }
  36.     }
  37. }
复制代码
先启动发布者发送消息,查看控制台:有一条消息待消费·

启动消费端,因为代码中有除0,以是会报错,这里就会出现一条unacked消息:

因为设置的是将消息重新请求,以是它会陷入死循环


防止出现这种环境,可以将basicNack最后一个参数改为false,让消息进去死信队列
2、什么是死信队列

说简单点就是备胎队列,而死信的来源有以下几种:

  • 消息被否定确认,利用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间凌驾设置的TTL时间。
  • 消息队列的消息数量已经凌驾最大队列长度。

“死信”消息会被RabbitMQ进行特殊处理,假如配置了死信队列信息,那么该消息将会被丢进死信队列中,假如没有配置,则该消息将会被丢弃。
3、配置死信队列

一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:

  • 配置业务队列,绑定到业务互换机上
  • 为业务队列配置死信互换机和路由key
  • 为死信互换机配置死信队列
从控制台将之前的互换机都删除,然后修改代码。
首先看一下发布者的配置代码:
  1. package com.example.publisher.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  8. import org.springframework.amqp.support.converter.MessageConverter;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. /**
  14. * @author LoneWalker
  15. * @date 2023/4/8
  16. * @description
  17. */
  18. @Slf4j
  19. @Configuration
  20. public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  21.     @Bean
  22.     public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
  23.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  24.         rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
  25.         //设置给rabbitTemplate
  26.         rabbitTemplate.setConfirmCallback(this);
  27.         rabbitTemplate.setReturnsCallback(this);
  28.         rabbitTemplate.setMandatory(true);
  29.         return rabbitTemplate;
  30.     }
  31.     @Bean
  32.     public MessageConverter jackson2JsonMessageConverter() {
  33.         return new Jackson2JsonMessageConverter();
  34.     }
  35.     /************ 正常配置 ******************/
  36.     /**
  37.      * 正常交换机,开启持久化
  38.      */
  39.     @Bean
  40.     DirectExchange normalExchange() {
  41.         return new DirectExchange("normalExchange", true, false);
  42.     }
  43.     @Bean
  44.     public Queue normalQueue() {
  45.         // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  46.         // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
  47.         // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  48.         Map<String, Object> args = deadQueueArgs();
  49.         // 队列设置最大长度
  50.         args.put("x-max-length", 5);
  51.         return new Queue("normalQueue", true, false, false, args);
  52.     }
  53.     @Bean
  54.     public Queue ttlQueue() {
  55.         Map<String, Object> args = deadQueueArgs();
  56.         // 队列设置消息过期时间 60 秒
  57.         args.put("x-message-ttl", 60 * 1000);
  58.         return new Queue("ttlQueue", true, false, false, args);
  59.     }
  60.     @Bean
  61.     Binding normalRouteBinding() {
  62.         return BindingBuilder.bind(normalQueue())
  63.                 .to(normalExchange())
  64.                 .with("normalRouting");
  65.     }
  66.     @Bean
  67.     Binding ttlRouteBinding() {
  68.         return BindingBuilder.bind(ttlQueue())
  69.                 .to(normalExchange())
  70.                 .with("ttlRouting");
  71.     }
  72.     /**************** 死信配置 *****************/
  73.     /**
  74.      * 死信交换机
  75.      */
  76.     @Bean
  77.     DirectExchange deadExchange() {
  78.         return new DirectExchange("deadExchange", true, false);
  79.     }
  80.     /**
  81.      * 死信队列
  82.      */
  83.     @Bean
  84.     public Queue deadQueue() {
  85.         return new Queue("deadQueue", true, false, false);
  86.     }
  87.     @Bean
  88.     Binding deadRouteBinding() {
  89.         return BindingBuilder.bind(deadQueue())
  90.                 .to(deadExchange())
  91.                 .with("deadRouting");
  92.     }
  93.     /**
  94.      * 转发到 死信队列,配置参数
  95.      */
  96.     private Map<String, Object> deadQueueArgs() {
  97.         Map<String, Object> map = new HashMap<>();
  98.         // 绑定该队列到死信交换机
  99.         map.put("x-dead-letter-exchange", "deadExchange");
  100.         map.put("x-dead-letter-routing-key", "deadRouting");
  101.         return map;
  102.     }
  103.     /**
  104.      * 消息成功到达交换机会触发
  105.      * @param correlationData
  106.      * @param ack
  107.      * @param cause
  108.      */
  109.     @Override
  110.     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  111.         if (ack) {
  112.             log.info("交换机收到消息成功:" + correlationData.getId());
  113.         }else {
  114.             log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
  115.         }
  116.     }
  117.     /**
  118.      * 消息未成功到达队列会触发
  119.      * @param returnedMessage
  120.      */
  121.     @Override
  122.     public void returnedMessage(ReturnedMessage returnedMessage) {
  123.         log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
  124.     }
  125. }
复制代码
properties
  1. server.port=8081
  2. #rabbitmq服务ip
  3. spring.rabbitmq.host=localhost
  4. #rabbitmq端口号
  5. spring.rabbitmq.port=5672
  6. #用户名
  7. spring.rabbitmq.username=用户名改为自己的
  8. #密码
  9. spring.rabbitmq.password=密码改为自己的
  10. #虚拟机
  11. spring.rabbitmq.virtual-host=demo
  12. spring.rabbitmq.publisher-confirm-type=correlated
  13. spring.rabbitmq.publisher-returns=true
  14. spring.rabbitmq.template.mandatory=true
复制代码
发送消息:
  1. @RequiredArgsConstructor
  2. @Service
  3. public class PublisherServiceImpl implements PublisherService{
  4.     private final RabbitTemplate rabbitTemplate;
  5.     @Override
  6.     public void addUser(User user) {
  7.         CorrelationData correlationData = new CorrelationData();
  8.         correlationData.setId(UUID.randomUUID().toString());
  9.         rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
  10.     }
  11. }
复制代码
4、模仿场景

4.1消息处理异常

文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:
  1. package com.example.consumer.service;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.example.consumer.entity.User;
  4. import com.rabbitmq.client.Channel;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Service;
  9. import java.io.IOException;
  10. /**
  11. * @description:
  12. * @author: LoneWalker
  13. * @create: 2022-04-04
  14. **/
  15. @Service
  16. @Slf4j
  17. public class ConsumerService {
  18.     @RabbitListener(queues ="normalQueue")
  19.     public void addUser(String userStr,Channel channel,Message message){
  20.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  21.         try {
  22.             int a = 1/0;
  23.             User user = JSONObject.parseObject(userStr,User.class);
  24.             log.info(user.toString());
  25.             //手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
  26.             channel.basicAck(deliveryTag,false);
  27.         } catch (Exception e) {
  28.             //手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
  29.             try {
  30.                 channel.basicNack(deliveryTag,false,false);
  31.             } catch (IOException ex) {
  32.                 throw new RuntimeException("消息处理失败");
  33.             }
  34.         }
  35.     }
  36. }
复制代码
注意basicNack的第三个参数,设置为false后就不会重新请求。

4.2队列达到最大长度

配置上面的代码已经有过了:

测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:

4.3消息TTL过期

过期时间TTL表现可以对消息设置预期的时间,凌驾这个时间就删除大概放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s


死信队列中的消息处理和正常的队列没什么区别,就不赘述了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

水军大提督

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表