RabbitMq手动ack的超简单案例+Confirm和Return机制的配置和利用 ...

打印 上一主题 下一主题

主题 245|帖子 245|积分 735

最简单的例子

先简单介绍一下这三个方法
basicAck

表现确认乐成,利用此方法后,消息会被rabbitmq broker删除



 
basicNack

表现失败确认,一样平常在消费消息业务异常时用到此方法,可以将消息重新投递入队列




basicReject

拒绝消息,与basickNack区别在于不能举行批量操纵,其他用法很相似


形参

multiple表现是否批量处理
requeue表现是否重复入队

deliverTag:表现消息投递序号,每次消费消息或者消息重新投递后,deliverTag都会增长。手动消息确认模式下,我们可以对指定deliverTag的消息举行ack、nack、reject等操纵。
mutiple:是否批量确认,值为 true 则会一次性 ack全部小于当前消息 delivertag 的消息

依赖

  1.   <dependencies>
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-web</artifactId>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.springframework.boot</groupId>
  8.             <artifactId>spring-boot-starter-test</artifactId>
  9.             <scope>test</scope>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>org.springframework.amqp</groupId>
  13.             <artifactId>spring-amqp</artifactId>
  14.         </dependency>
  15.         <dependency>
  16.             <groupId>org.projectlombok</groupId>
  17.             <artifactId>lombok</artifactId>
  18.         </dependency>
  19.         <dependency>
  20.             <groupId>com.alibaba.fastjson2</groupId>
  21.             <artifactId>fastjson2</artifactId>
  22.             <version>2.0.51</version>
  23.         </dependency>
  24.         <dependency>
  25.             <groupId>org.springframework.amqp</groupId>
  26.             <artifactId>spring-rabbit</artifactId>
  27.         </dependency>
  28.         <dependency>
  29.             <groupId>org.springframework.amqp</groupId>
  30.             <artifactId>spring-rabbit</artifactId>
  31.         </dependency>
  32.     </dependencies>
复制代码
springboot配置

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.88.130
  4.     port: 5672
  5.     username: itcast
  6.     password: 123321
  7.     virtual-host: /   #虚拟主机,默认是/,RabbitMQ 使用虚拟主机来隔离不同的消息环境,虚拟主机用于将消息、交换器、队列和绑定隔离开来
  8.     publisher-confirm-type: correlated #发布者消息确认功能(异步)
  9.     listener:
  10.       simple:
  11.         retry:
  12.           enabled: true  #开启消费者失败重试
  13.           max-attempts: 5 #最大重试次数
  14.           initial-interval: 1000ms #初始失败等待时长为1秒
  15.           multiplier: 1 #下次失败的等待时长倍数,下次等待时长=multipiler*last-interval
  16.         acknowledge-mode: manual #开启手动ack机制 auto是自动 none是发送后直接ack(这个不会用上的)
  17.     publisher-returns: true #发布者返回消息功能
  18. logging:
  19.   level:
  20.     com.atguigu.mq.config.MQProducerAckConfig: info
复制代码
交换机和队列配置

  1. package com.example.rabbitmq.Configuration;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer;
  5. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  6. import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
  7. import org.springframework.beans.factory.annotation.Qualifier;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. @Configuration
  11. public class QueueConfig {
  12.     @Bean(name = "confirmTestQueue")
  13.     public Queue confirmTestQueue() {
  14.         return new Queue("confirm_test_queue", true, false, false);
  15.     }
  16.     @Bean(name = "confirmTestExchange")
  17.     public FanoutExchange confirmTestExchange() {
  18.         return new FanoutExchange("confirmTestExchange");
  19.     }
  20.     @Bean
  21.     public Binding confirmTestFanoutExchangeAndQueue(
  22.             @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
  23.             @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
  24.         return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
  25.     }
  26. }
复制代码

消费者

我设置了最多重试三次
有一个小细节
就是他重试的时候是在队头重试的
所以他重试的时候会阻塞一段时间,此时后面的消息是不能消费的
  1. package com.example.rabbitmq.Listener;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.rabbitmq.client.AMQP;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.MessageProperties;
  6. import com.rabbitmq.tools.json.JSONUtil;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  10. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  11. import org.springframework.stereotype.Component;
  12. import java.io.IOException;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. import java.util.concurrent.ConcurrentMap;
  17. @Slf4j
  18. @Component
  19. public class ReceiverMessage1 {
  20.     //用来存放消息唯一标识的map,设置一定的重试次数
  21.     public static final ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
  22.     @RabbitListener(queues = "confirm_test_queue")
  23.     public void getMessage3(String msg, Channel channel, Message message) throws IOException {
  24.         //得到当前信息的唯一标识
  25.         String messageId=message.getMessageProperties().getMessageId();
  26.         try {
  27.             System.out.println("成功接收到消息:" + msg);
  28.        int i=1/0;
  29.             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  30. log.info("确认成功");
  31.         } catch (Exception e) {
  32.             map.put(messageId,map.getOrDefault(messageId, 0)+1);
  33.             log.error("接收消息失败");
  34.           log.info("开始重试");
  35.           log.info(messageId);
  36.           //重复处理失败
  37.           if(map.get(messageId)<=3) {
  38.               log.info("确认失败,重新入队");
  39.              channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
  40.    map.put(messageId, map.getOrDefault(messageId, 0)+1);
  41.           }
  42. else {
  43.     log.info("重试仍然失败,所以我们决定丢弃这个消息");
  44.               channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
  45.           }
  46.         }
  47.     }
  48.     }
复制代码

生产者(测试类)

  1. package com.example.rabbitmq;
  2. import org.apache.catalina.Executor;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.AmqpException;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import java.util.UUID;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.ThreadPoolExecutor;
  11. import static org.junit.jupiter.api.Assertions.assertThrows;
  12. import static org.mockito.Mockito.doThrow;
  13. @SpringBootTest
  14. class RabbitmqApplicationTests {
  15.     @Test
  16.     void contextLoads() {
  17.     }
  18.     @Autowired
  19.     private RabbitTemplate rabbitTemplate;
  20.     @Test
  21.     void sendMessage() {
  22.        rabbitTemplate.convertAndSend("confirmTestExchange",
  23.                "confirm_test_queue", "这是一个测试消息",message -> {
  24.            message.getMessageProperties().setMessageId(UUID.randomUUID().toString());  //把消息的唯一标识设置为UUID
  25.            return message;
  26.                });
  27.     }
  28. }
复制代码

修改了messageId

因为messageId是基于交换机,内容,队列来生成的
相同的消息可能messageId是一样的
所以我发送消息的时候把底层改成了UUID




Return和Confirm机制





Confirm机制是消息发送到交换机乐成或失败时的处理机制


Retrun机制是消息发送到队列失败时的处理机制

配置(加了日志输出)

  1. package com.example.rabbitmq.Configuration;
  2. import jakarta.annotation.PostConstruct;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.ReturnedMessage;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.annotation.Configuration;
  9. @Configuration
  10. @Slf4j
  11. public class RabbitConfig implements RabbitTemplate.ReturnsCallback,RabbitTemplate.ConfirmCallback {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14. //在Bean注入前就要执行的方法
  15.     @PostConstruct
  16.     private  void initRabbitTemplate(){
  17.         rabbitTemplate.setReturnsCallback(this);
  18.         rabbitTemplate.setConfirmCallback(this);
  19.     }
  20.     //消息发送到交换机成功或失败时调用这个方法
  21.     @Override
  22.     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  23. log.info("confirm()回调函数大打印correlationData:"+correlationData);
  24. log.info("confirm()回调函数大打印ack:"+ack);
  25. log.info("confirm()回调函数大打印cause:"+cause);
  26.     }
  27.     //发送到队列失败的时候调用这个方法
  28.     @Override
  29.     public void returnedMessage(ReturnedMessage returned) {
  30.         log.info("消息主体: " + new String(returned.getMessage().getBody()));
  31.         log.info("应答码: " + returned.getReplyCode());
  32.         log.info("描述:" + returned.getReplyText());
  33.         log.info("消息使用的交换器 exchange : " + returned.getExchange());
  34.         log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
  35.     }
  36. }
复制代码

遇到的小问题


它会有个报错
2024-07-28T14:00:10.866+08:00 ERROR 45104 --- [168.88.130:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
这个是因为我们的多次ack,导致的错误
例如我们可能多次ac或者reject了,就会发生这种错误
这是因为我们的配置默认是自动ack
假如我们不开启手动ack,我们的本身写的手动ack代码就算是再次ack了
所以会出现这个通道错误
这个会让我们的mq通道断连,然后再重连,这样子会导致部门消息丢失,所以记得配置开启手动ack


发送消息回队尾

这个我处理失败了,我也不知道为什么,可能我本身修改了messageId吧
反正这个重新发送消息,我得到的和之前的不同,有部门属性缺失了
所以我就没弄这个
假如想具体相识发起看参考文章



参考文章
Springboot + RabbitMQ 用了消息确认机制,感觉掉坑里了!-腾讯云开辟者社区-腾讯云 (tencent.com)











免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

高级会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表