ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spring RabbitMQ 解决 生产者丢失消息、消息列表丢失消息、消耗者丢失消息 [打印本页]

作者: 自由的羽毛    时间: 2024-8-20 09:54
标题: Spring RabbitMQ 解决 生产者丢失消息、消息列表丢失消息、消耗者丢失消息
SpringBoot整合RabbitMQ

生产环境中使用RabbitMQ时需要包管消息的可靠传输,消息不可靠的环境大概是消息丢失,挟制等原因;

1.简单使用

1.1生产者

  1.         <dependency>
  2.             <groupId>org.springframework.boot</groupId>
  3.             <artifactId>spring-boot-starter-amqp</artifactId>
  4.         </dependency>
复制代码
  1. @Configuration
  2. public class MyRabbitConfig {
  3.     public static final String SECKILL_ROUTING_KEY = "seckill";
  4.     public static final String DIRECT_QUEUE_SECKILL = "queue_seckill";
  5.     public static final String DIRECT_EXCHANGE_SECKILL = "exchange_seckill";
  6.     // 将发送的数据转为JSON
  7.     @Bean
  8.     public MessageConverter messageConverter() {
  9.         return new Jackson2JsonMessageConverter();
  10.     }
  11.     //队列 起名:DirectQueue
  12.     @Bean
  13.     public Queue DirectQueue() {
  14.         return new Queue(DIRECT_QUEUE_SECKILL,true);
  15.     }
  16.     //Direct交换机 起名:DirectExchange
  17.     @Bean
  18.     DirectExchange DirectExchange() {
  19.         return new DirectExchange(DIRECT_EXCHANGE_SECKILL);
  20.     }
  21.     //绑定  将队列和交换机绑定, 并设置用于匹配键:SECKILL_ROUTING_KEY
  22.     @Bean
  23.     Binding bindingDirect() {
  24.         return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with(SECKILL_ROUTING_KEY);
  25.     }
  26. }
复制代码
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.79.129
  4.     port: 5672
  5.     virtual-host: /
复制代码
  1. RabbitTemplate 发送消息
复制代码
1.2消耗者

  1. @Service
  2. @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_SECKILL)
  3. public class RabbitConsumer {
  4.     AtomicInteger count = new AtomicInteger(0);
  5.     @Resource
  6.     GoodsService goodsService;
  7.     // 单线程处理
  8.     @RabbitHandler
  9.     public void handle(String seckillDTOJson) {
  10.         Gson gson = new Gson();
  11.         SeckillDTO seckillDTO = gson.fromJson(seckillDTOJson, SeckillDTO.class);
  12.         goodsService.payOrder(seckillDTO.getUserId(), seckillDTO.getGoodsId());
  13.         System.out.println("处理:" + seckillDTO + count.incrementAndGet());
  14.    
  15. }
复制代码
2.生产者发布确认

修改RabbitTemplate 配置

  1. @Service
  2. public class RabbitService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  3.     // todo 如何知道消费者正确消费了消息?
  4.     @Resource
  5.     RabbitTemplate rabbitTemplate;
  6.     @PostConstruct
  7.     public void init() {
  8.         //  交换机发布确认
  9.         rabbitTemplate.setConfirmCallback(this);
  10.         // 消息回退
  11.         rabbitTemplate.setMandatory(true);
  12.         // 消息回退处理
  13.         rabbitTemplate.setReturnCallback(this);
  14.     }
  15.     public String sendSeckillMessage(SeckillDTO seckillDTO) {
  16.         Gson gson = new Gson();
  17.         String toJson = gson.toJson(seckillDTO);
  18.         rabbitTemplate.convertAndSend(
  19.                 MyRabbitConfig.DIRECT_EXCHANGE_SECKILL, MyRabbitConfig.SECKILL_ROUTING_KEY, toJson);
  20.         return "ok";
  21.     }
  22.     /**
  23.      * @param correlationData 发送消息时我们所绑定的相关数据【rabbitTemplate.convertAndSend中发送】
  24.      * @param ack             是否成功投递到交换机
  25.      * @param cause           原因
  26.      */
  27.     @Override
  28.     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  29.     }
  30.     @Override
  31.     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  32.     }
  33. }
复制代码
3.设置消息持久化

3.1 队列持久化

队列持久化在创建(new)队列时指定
参数:

  1. public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
  2.              Map<String, Object> arguments) {
  3. }
复制代码
3.2 消息持久化

消息持久化在消息创建时指定
  1. // 指定消息属性的对象
  2. MessageProperties messageProperties = new MessageProperties();
  3. // 设置消息交付模式
  4. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  5. // 创建消息
  6. Message message = new Message("消息体".getBytes(StandardCharsets.UTF_8), messageProperties);
  7. // ... 发送消息
复制代码
4.@RabbitHandler的传入参数

@RabbitHandler注解标记的方法可以接收多种类型的参数,详细取决于消息的内容以及配置的MessageConverter
以下是一些常见的参数类型:(重要是Message,其他的内容可以中Message中本身取)
1. Message对象:这是最原始的情势,你可以直接接收整个org.springframework.amqp.core.Message对象,它包含了消息的所有元数据(如消息ID、路由键、交换机等)以及消息体。通过Message.getBody()方法可以得到消息的实际内容,内容通常是字节数组
- message.getBody():获取消息体
- message.getMessageProperties():获取消息元数据
2. Channel对象:获取该消耗者与对应队列的通道并进行一系列重要操作,详情见 [Channel详情](# 5. Channel对象)
  1. @Component
  2. public class MyMessageReceiver {
  3.    
  4.    
  5.     @RabbitHandler
  6.     public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  7.         try {
  8.             // 处理消息的逻辑
  9.             System.out.println("Received message: " + new String(message.getBody());
  10.             // 成功处理完消息后,手动发送确认
  11.             channel.basicAck(tag, false); // 第二个参数false表示只确认当前消息
  12.         } catch (Exception e) {
  13.             // 如果消息处理失败,可以选择拒绝消息(requeue参数决定是否重新入队)
  14.             channel.basicNack(tag, false, true); // 第三个参数true表示重新入队列
  15.             // 或者也可以使用basicReject方法,效果类似但不支持批量确认
  16.             // channel.basicReject(tag, true);
  17.         }
  18.     }
  19.     @RabbitHandler
  20.     public void handleUserMessage(User user) {
  21.         // 直接处理User对象
  22.         System.out.println("Received User: " + user);
  23.     }
  24.     @RabbitHandler
  25.     public void handleMessageWithDetails(Message message) {
  26.         // 处理原始Message对象
  27.         byte[] body = message.getBody();
  28.         String routingKey = message.getMessageProperties().getReceivedRoutingKey();
  29.         System.out.println("Received Message with routingKey: " + routingKey);
  30.     }
  31.     @RabbitHandler
  32.     public void handleMessageWithHeader(@Payload String content, @Headers Map<String, Object> headers) {
  33.         // 获取消息体和消息头
  34.         System.out.println("Content: " + content);
  35.         System.out.println("Headers: " + headers);
  36.     }
  37. }
复制代码
5. Channel对象

在RabbitMQ和Spring AMQP的上下文中,Channel对象是一个非常重要的概念,它提供了与RabbitMQ服务器交互的基础功能。在你的@RabbitHandler方法签名中,Channel参数允许你直接执行一些低级别的AMQP操作,如消息确认、拒绝或 nack。以下是Channel对象的一些关键功能:
:long deliveryTag 可以在发送时手动指定,也可以自动生成
6.消耗者手动确认

在RabbitMQ中,消息的删除通常是自动进行的,并且这一举动依靠于消息的确认模式。RabbitMQ提供了两种消息确认模式来确保消息被精确处理并决定何时可以从队列中删除消息:

  1. @Component
  2. @RabbitListener(queues = "your_queue_name")
  3. public class YourConsumer {
  4.     @RabbitHandler
  5.     public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
  6.         try {
  7.             // 处理消息的逻辑
  8.             System.out.println("Received message: " + new String(message.getBody());
  9.             // 成功处理完消息后,手动发送确认
  10.             channel.basicAck(tag, false); // 第二个参数false表示只确认当前消息
  11.         } catch (Exception e) {
  12.             // 如果消息处理失败,可以选择拒绝消息(requeue参数决定是否重新入队)
  13.             channel.basicNack(tag, false, true); // 第三个参数true表示重新入队列
  14.             // 或者也可以使用basicReject方法,效果类似但不支持批量确认
  15.             // channel.basicReject(tag, true);
  16.         }
  17.     }
  18. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4