水军大提督 发表于 2024-8-6 06:22:16

RabbitMQ手动ACK与死信队列

为了包管消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。
默认环境下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ本身又没有这条消息了。以是在现实项目中会利用手动Ack。
1、手动应答


[*]Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
[*]Channel.basicNack (用于否定确认)
[*]Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
消费者端的配置,相关属性值改为本身的:
server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=lonewalker
#密码
spring.rabbitmq.password=XX
#配置虚拟机
spring.rabbitmq.virtual-host=demo
#设置消费端手动 ack   none不确认auto自动确认manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual 修改消费代码:请勿复制利用,会卡死
package com.example.consumer.service;

import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
* @description:
* @author: LoneWalker
* @create: 2022-04-04
**/
@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="publisher.addUser")
    public void addUser(String userStr,Channel channel,Message message){
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      try {
            log.info("我一直在重试");
            int a = 1/0;
            User user = JSONObject.parseObject(userStr,User.class);
            log.info(user.toString());
            //手动ack第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
            channel.basicAck(deliveryTag,false);
      } catch (Exception e) {
            //手动nack 告诉rabbitmq该消息消费失败第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
            try {
                channel.basicNack(deliveryTag,false,true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
      }
    }
} 先启动发布者发送消息,查看控制台:有一条消息待消费·
https://i-blog.csdnimg.cn/blog_migrate/f8ef66448ba8c33c4185a6a57167f127.png
启动消费端,因为代码中有除0,以是会报错,这里就会出现一条unacked消息:
https://i-blog.csdnimg.cn/blog_migrate/a34aaf6c5fa468d6f8e021776ace9dd7.png
因为设置的是将消息重新请求,以是它会陷入死循环
https://i-blog.csdnimg.cn/blog_migrate/adae594f52e8b9378560b105e76cc40b.png
https://i-blog.csdnimg.cn/blog_migrate/ad5c53aa7843ca1e4f538b9f3e4125df.png
防止出现这种环境,可以将basicNack最后一个参数改为false,让消息进去死信队列
2、什么是死信队列

说简单点就是备胎队列,而死信的来源有以下几种:

[*]消息被否定确认,利用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
[*]消息在队列的存活时间凌驾设置的TTL时间。
[*]消息队列的消息数量已经凌驾最大队列长度。
https://i-blog.csdnimg.cn/blog_migrate/45e70ce133372c4422f52b10147ba6f8.png
“死信”消息会被RabbitMQ进行特殊处理,假如配置了死信队列信息,那么该消息将会被丢进死信队列中,假如没有配置,则该消息将会被丢弃。
3、配置死信队列

一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:

[*]配置业务队列,绑定到业务互换机上
[*]为业务队列配置死信互换机和路由key
[*]为死信互换机配置死信队列
从控制台将之前的互换机都删除,然后修改代码。
首先看一下发布者的配置代码:
package com.example.publisher.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* @author LoneWalker
* @date 2023/4/8
* @description
*/
@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
      //设置给rabbitTemplate
      rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.setReturnsCallback(this);
      rabbitTemplate.setMandatory(true);
      return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
      return new Jackson2JsonMessageConverter();
    }
    /************ 正常配置 ******************/
    /**
   * 正常交换机,开启持久化
   */
    @Bean
    DirectExchange normalExchange() {
      return new DirectExchange("normalExchange", true, false);
    }

    @Bean
    public Queue normalQueue() {
      // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
      // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
      // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
      Map<String, Object> args = deadQueueArgs();
      // 队列设置最大长度
      args.put("x-max-length", 5);
      return new Queue("normalQueue", true, false, false, args);
    }

    @Bean
    public Queue ttlQueue() {
      Map<String, Object> args = deadQueueArgs();
      // 队列设置消息过期时间 60 秒
      args.put("x-message-ttl", 60 * 1000);
      return new Queue("ttlQueue", true, false, false, args);
    }

    @Bean
    Binding normalRouteBinding() {
      return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with("normalRouting");
    }

    @Bean
    Binding ttlRouteBinding() {
      return BindingBuilder.bind(ttlQueue())
                .to(normalExchange())
                .with("ttlRouting");
    }


    /**************** 死信配置 *****************/
    /**
   * 死信交换机
   */
    @Bean
    DirectExchange deadExchange() {
      return new DirectExchange("deadExchange", true, false);
    }

    /**
   * 死信队列
   */
    @Bean
    public Queue deadQueue() {
      return new Queue("deadQueue", true, false, false);
    }

    @Bean
    Binding deadRouteBinding() {
      return BindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with("deadRouting");
    }

    /**
   * 转发到 死信队列,配置参数
   */
    private Map<String, Object> deadQueueArgs() {
      Map<String, Object> map = new HashMap<>();
      // 绑定该队列到死信交换机
      map.put("x-dead-letter-exchange", "deadExchange");
      map.put("x-dead-letter-routing-key", "deadRouting");
      return map;
    }


    /**
   * 消息成功到达交换机会触发
   * @param correlationData
   * @param ack
   * @param cause
   */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
      }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
      }
    }

    /**
   * 消息未成功到达队列会触发
   * @param returnedMessage
   */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
      log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
} properties
server.port=8081
#rabbitmq服务ip
spring.rabbitmq.host=localhost
#rabbitmq端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=用户名改为自己的
#密码
spring.rabbitmq.password=密码改为自己的
#虚拟机
spring.rabbitmq.virtual-host=demo

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true 发送消息:
@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void addUser(User user) {

      CorrelationData correlationData = new CorrelationData();
      correlationData.setId(UUID.randomUUID().toString());

      rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
    }
} 4、模仿场景

4.1消息处理异常

文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:
package com.example.consumer.service;

import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
* @description:
* @author: LoneWalker
* @create: 2022-04-04
**/
@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="normalQueue")
    public void addUser(String userStr,Channel channel,Message message){
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      try {
            int a = 1/0;
            User user = JSONObject.parseObject(userStr,User.class);
            log.info(user.toString());
            //手动ack第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
            channel.basicAck(deliveryTag,false);
      } catch (Exception e) {
            //手动nack 告诉rabbitmq该消息消费失败第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
            try {
                channel.basicNack(deliveryTag,false,false);
            } catch (IOException ex) {
                throw new RuntimeException("消息处理失败");
            }
      }
    }
} 注意basicNack的第三个参数,设置为false后就不会重新请求。
https://i-blog.csdnimg.cn/blog_migrate/f617bfdfe9f1fb6d273335d0b44cd0eb.png
4.2队列达到最大长度

配置上面的代码已经有过了:
https://i-blog.csdnimg.cn/blog_migrate/71466af1327273fe7a6928cacdb4d212.png
测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:
https://i-blog.csdnimg.cn/blog_migrate/58ce2d26fa186a72b76cfe703d640b3f.png
4.3消息TTL过期

过期时间TTL表现可以对消息设置预期的时间,凌驾这个时间就删除大概放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s
https://i-blog.csdnimg.cn/blog_migrate/96a1211913cc4227af378df37463de33.png
https://i-blog.csdnimg.cn/blog_migrate/019cea1644de2de1d545372c5cce72eb.png
死信队列中的消息处理和正常的队列没什么区别,就不赘述了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ手动ACK与死信队列