ToB企服应用市场:ToB评测及商务社交产业平台

标题: Springboot利用Rabbitmq的延时队列+死信队列实现消息延期消费 [打印本页]

作者: 冬雨财经    时间: 2025-1-4 23:22
标题: Springboot利用Rabbitmq的延时队列+死信队列实现消息延期消费
简介

RabbitMQ 的延时队列(Delayed Queue)是指消息在发送到队列后,会在一定的时间内被耽误处理处罚,直到预设的耽误时间结束,消息才会被消费者消费。RabbitMQ 自己并不原生支持延时队列,但我们可以通过一些插件或特定设置来实现这种功能。
延时队列的应用场景


死信队列(Dead Letter Queue,简称 DLQ)是消息队列中的一个特别队列,用于存放那些因某些原因无法被正常消费的消息。死信队列在很多消息队列体系(如 RabbitMQ、Kafka 等)中都有应用。通过将无法消费的消息转移到死信队列中,可以资助开发者发现和处理处罚消息消费过程中的异常或错误。
死信队列的主要用途是:

本文联合延时队列和死信队列,延时队列的消息设置逾期时间,在时间逾期之后,将消息内容推送到死信队列中举行处理处罚。
整合逻辑

依赖

  1. <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             <artifactId>spring-boot-starter</artifactId>
  4.         </dependency>
  5.         <dependency>
  6.             <groupId>org.springframework.boot</groupId>
  7.             <artifactId>spring-boot-starter-web</artifactId>
  8.         </dependency>
  9.         <dependency>
  10.             <groupId>org.projectlombok</groupId>
  11.             <artifactId>lombok</artifactId>
  12.             <version>1.18.30</version>
  13.         </dependency>
  14.         <dependency>
  15.             <groupId>cn.hutool</groupId>
  16.             <artifactId>hutool-all</artifactId>
  17.             <version>5.8.27</version>
  18.         </dependency>
  19.         <dependency>
  20.             <groupId>org.springframework.boot</groupId>
  21.             <artifactId>spring-boot-starter-amqp</artifactId>
  22.             <version>2.7.7</version>
  23.         </dependency>
复制代码
设置

  1. server:
  2.   port: 8081
  3. spring:
  4.   application:
  5.     name: walker-rabbitmq
  6.   rabbitmq:
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     username: guest_walker
  10.     password: guest
复制代码


创建交换机|队列

延时队列

  1. package com.walker.rabbitmq.delay.config;
  2. import com.walker.rabbitmq.delay.constants.BaseConstant;
  3. import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. @Configuration
  10. public class OrderDelayRabbitConfig {
  11. //    交换机
  12.     @Bean
  13.     public DirectExchange orderDelayExchange(){
  14.         return new DirectExchange(RabbitMqConfigEnum.ORDER_DELAY.getExchange());
  15.     }
  16. //    队列
  17.     @Bean
  18.     public Queue orderDelayQueue(){
  19.         Map<String,Object> args = new HashMap<>();
  20. //        延时队列,需要绑定死信队列,在ttl到期之后,将队列转到死信队列
  21.         args.put(BaseConstant.xDeadLetterExchange,RabbitMqConfigEnum.ORDER_DEAD.getExchange());
  22.         args.put(BaseConstant.xDeadLetterRoutingKey,RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey());
  23.         return QueueBuilder.durable(RabbitMqConfigEnum.ORDER_DELAY.getQueue()).autoDelete().withArguments(args).build();
  24.     }
  25. //
  26.     @Bean
  27.     public Binding orderDelayQueueBinding(){
  28.         return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(RabbitMqConfigEnum.ORDER_DELAY.getRoutingKey());
  29.     }
  30. }
复制代码
死信队列

  1. package com.walker.rabbitmq.delay.config;
  2. import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class OrderDeadRabbitConfig {
  8. //    交换机
  9.     @Bean
  10.     public DirectExchange orderDeadExchange(){
  11.         return new DirectExchange(RabbitMqConfigEnum.ORDER_DEAD.getExchange());
  12.     }
  13. //    队列
  14.     @Bean
  15.     public Queue orderDeadQueue(){
  16.         return QueueBuilder.durable(RabbitMqConfigEnum.ORDER_DEAD.getQueue()).autoDelete().build();
  17.     }
  18. //   交换机、队列、路由键绑定
  19.     @Bean
  20.     public Binding orderDeadQueueBinding(){
  21.         return BindingBuilder.bind(orderDeadQueue()).to(orderDeadExchange())
  22.                 .with(RabbitMqConfigEnum.ORDER_DEAD.getRoutingKey());
  23.     }
  24. }
复制代码
常量

  1. package com.walker.rabbitmq.delay.constants;
  2. import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum;
  3. public interface BaseConstant {
  4.     String xDeadLetterExchange = "x-dead-letter-exchange";
  5.     String xDeadLetterRoutingKey = "x-dead-letter-routing-key";
  6. }
复制代码
干系枚举

  1. package com.walker.rabbitmq.delay.enums;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. @AllArgsConstructor
  5. @Getter
  6. public enum RabbitMqConfigEnum {
  7.     ORDER_DELAY("direct","订单延时队列","order_delay_exchange", "order_delay_queue", "order_delay_routing_key",true),
  8.     ORDER_DEAD("direct","订单死信队列","order_dead_exchange", "order_dead_queue", "order_dead_routing_key",true);
  9.     private final String type;
  10.     private final String title;
  11.     private final String exchange;
  12.     private final String queue;
  13.     private final String routingKey;
  14.     private final Boolean durable;
  15. }
复制代码
rabbitmq工具类封装

  1. package com.walker.rabbitmq.delay.service;
  2. import cn.hutool.core.util.BooleanUtil;
  3. import cn.hutool.json.JSONUtil;
  4. import com.rabbitmq.client.Channel;
  5. import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.amqp.AmqpException;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.core.MessagePostProcessor;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.Resource;
  13. import java.io.IOException;
  14. import java.util.function.Consumer;
  15. import java.util.function.Function;
  16. @Component
  17. @Slf4j
  18. public class RabbitmqService {
  19.     @Resource
  20.     private RabbitTemplate rabbitTemplate;
  21.     /**
  22.     * author:walker
  23.     * time: 2024/12/30
  24.     * description:  无返回结果的手动处理方式
  25.     */
  26.     public void doWithManualAck(String event, Message message, Channel channel, Consumer<String> method) {
  27.         String data = new String(message.getBody());
  28.         try {
  29.             method.accept(data);
  30.             sendAck(message, channel);
  31.         } catch (Exception e) {
  32.             log.error("处理事件[{}]异常数据为:{},原因如下:",event,data, e);
  33.             sendNack(message, channel);
  34.         }
  35.     }
  36.     /**
  37.      * author:walker
  38.      * time: 2024/12/30
  39.      * description:  根据返回的布尔类型,进行确认和拒绝
  40.      */
  41.     public void doWithManualAck(String event, Message message, Channel channel, Function<String,Boolean> method) {
  42.         String data = new String(message.getBody());
  43.         try {
  44.             Boolean res = method.apply(data);
  45.             if(BooleanUtil.isTrue(res)){
  46.                 sendAck(message, channel);
  47.             }else{
  48.                 sendNack(message, channel);
  49.             }
  50.         } catch (Exception e) {
  51.             log.error("处理事件[{}]异常数据为:{},原因如下:",event,data, e);
  52.             sendNack(message, channel);
  53.         }
  54.     }
  55.     //    确认消息
  56.     public void sendAck(Message message, Channel channel) throws IOException {
  57.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  58.     }
  59.     //    拒绝消息
  60.     public void sendNack(Message message, Channel channel) {
  61.         try {
  62.             channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  63.         } catch (IOException e) {
  64.             log.error("消息消费失败,消息ID:{}", message.getMessageProperties().getMessageId(), e);
  65.         }
  66.     }
  67.     public <T> void sendDelayMsg(RabbitMqConfigEnum configEnum, T data, Integer delayTime) {
  68.         rabbitTemplate.convertAndSend(configEnum.getExchange(), configEnum.getRoutingKey(),
  69.                 JSONUtil.toJsonStr(data), new MessagePostProcessor() {
  70.                     @Override
  71.                     public Message postProcessMessage(Message message) throws AmqpException {
  72.                         message.getMessageProperties().setExpiration(String.valueOf(delayTime));
  73.                         return message;
  74.                     }
  75.                 });
  76.     }
  77. }
复制代码
案例

实体类

  1. package com.walker.rabbitmq.delay.entity;
  2. import lombok.Data;
  3. @Data
  4. public class OrderInfo {
  5.     private String productName;
  6.     private Integer num;
  7.     private String userName;
  8.     private String orderTime;
  9. }
复制代码
controller 创建订单

  1. package com.walker.rabbitmq.delay.controller;
  2. import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum;
  3. import com.walker.rabbitmq.delay.entity.OrderInfo;
  4. import com.walker.rabbitmq.delay.service.RabbitmqService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import javax.annotation.Resource;
  10. import java.util.Date;
  11. @Slf4j
  12. @RestController
  13. @RequestMapping("/order")
  14. public class OrderController {
  15.     @Resource
  16.     private RabbitmqService rabbitmqService;
  17.     /**
  18.     * author:walker
  19.     * time: 2025/1/2
  20.     * description:  下单
  21.     */
  22.     @GetMapping("/create")
  23.     public void createOrder() {
  24.         OrderInfo orderInfo = new OrderInfo();
  25.         orderInfo.setProductName("羽毛球");
  26.         orderInfo.setUserName("张三");
  27.         orderInfo.setOrderTime("2025-01-02 12:00:00");
  28.         orderInfo.setNum(1);
  29.         log.info("发送消息时间{}",new Date());
  30.         rabbitmqService.sendDelayMsg(RabbitMqConfigEnum.ORDER_DELAY,orderInfo,10000);
  31.     }
  32. }
复制代码
消费者监听消息

消费者接口

  1. package com.walker.rabbitmq.delay.service;
  2. public interface IRabbitCosumerHandler {
  3.     Boolean handle(String message);
  4. }
复制代码
消费者监听方法

  1. package com.walker.rabbitmq.delay.listener;
  2. import com.rabbitmq.client.Channel;
  3. import com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum;
  4. import com.walker.rabbitmq.delay.constants.BaseConstant;
  5. import com.walker.rabbitmq.delay.service.IRabbitCosumerHandler;
  6. import com.walker.rabbitmq.delay.service.RabbitmqService;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.stereotype.Component;
  11. import javax.annotation.Resource;
  12. import java.io.IOException;
  13. import java.util.Date;
  14. @Slf4j
  15. @Component
  16. public class OrderDeadLetterQueueConsumer implements IRabbitCosumerHandler {
  17.     @Resource
  18.     private RabbitmqService rabbitmqService;
  19.     @RabbitListener(queues = "#{T(com.walker.rabbitmq.delay.enums.RabbitMqConfigEnum).ORDER_DEAD.getQueue()}",ackMode = "MANUAL")
  20.     public void receive(Message message, Channel channel) throws IOException {
  21.         log.info("接受消息时间 {}",new Date());
  22.         String msg = new String(message.getBody());
  23.         log.info("当前时间:{},队列{}收到消息:{}", new Date().toString(),RabbitMqConfigEnum.ORDER_DEAD.getQueue(), msg);
  24.         rabbitmqService.doWithManualAck(RabbitMqConfigEnum.ORDER_DEAD.getTitle(), message, channel, this::handle);
  25.     }
  26.     @Override
  27.     public Boolean handle(String message) {
  28. //        处理逻辑
  29.         log.info("进行订单取消的操作");
  30.         return true;
  31.     }
  32. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4