RabbitMQ 高级特性——TTL

打印 上一主题 下一主题

主题 787|帖子 787|积分 2361



  
前言

对于前面讲到的重试机制中,当确认策略为 MANUAL 手动确认的时候,如果消费者出现了程序逻辑错误,那么消息就无法被夺取处理,那么就会执行 basicNack 方法,如果我们的 basicNack 方法的第三个参数的值为 true 的话,在只有这一个消费者的情况下,这个消息就会反复重新进入队列并且返回投递给这个消费者,那么在这个队列中的背面的消息就无法被提递给消费者,这样就导致了消息积存,那么怎样处理这个问题呢?答案就是为消息大概队列设置 TTL(ime-To-Live)生存时间,当消息大概队列存在一段时间后就会被丢弃大概投递到死信队列中。那么这篇文章将介绍 RabbitMQ 中的 TTL。
TTL

TTL(Time-to-Live)过期时间,RabibtMQ 可以对队列和消息设置 TTL。当消息到达存活时间之后,如果该消息还没有被消费,那么就会被主动处理掉。
就是我们平时购物的时候,如果下单超过 24 消失还没有付款的话,订单就会被主动取消。
设置消息的 TTL

RabbitMQ 有两种设置 TTL 的方法,一是设置队列的 TTL,队列中的所有消息都有相同的过期时间,而就是对消息单独设置 TTL,每条消息的 TTL 可以不同。如果两种方式一起利用,则过期时间为两者的较小值。
先来看看怎样对消息设置 TTL。
  1. public static final String TTL_EXCHANGE = "ttl.exchange";
  2. public static final String TTL_QUEUE = "ttl.queue";
复制代码
  1. @Bean("ttlExchange")
  2. public DirectExchange ttlExchange() {
  3.     return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
  4. }
  5.    
  6. @Bean("ttlQueue")
  7. public Queue ttlQueue() {
  8.     return QueueBuilder.durable(Constants.TTL_QUEUE).build();
  9. }
  10.    
  11. @Bean("ttlBinding")
  12. public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange) {
  13.     return BindingBuilder.bind(queue).to(exchange).with("ttl");
  14. }
复制代码
  1. @RequestMapping("/ttl")
  2. public String ttl() {
  3.         //设置消息的过期时间
  4.     MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  5.         @Override
  6.         public Message postProcessMessage(Message message) throws AmqpException {
  7.             message.getMessageProperties().setExpiration("10000");
  8.             return message;
  9.         }
  10.     };
  11.     rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","rabbitmq ttl",messagePostProcessor);
  12.     return "消息发送成功";
  13. }
复制代码
这里消费者的代码我们就不写了,就通过 RabbitMQ 的管理页面观察消息的过期:

启动之后,并且让生产者生产消息,可以发现队列中已经存在一条消息了,然后等待一段时间再观察会发现刚刚生产的消息已经不存在了:

这是设置消息的过期时间,那么下面我们来看看怎样设置队列的时间。
设置队列的 TTL

设置队列的 TTL 是在我们声明队列的时候设置的,声明队列的时候参加 x-message-ttl 参数实现的,单位是毫秒。
  1. //队列设置TTL的第一种方法
  2. @Bean("ttlQueue")
  3. public Queue ttlQueue() {
  4.     return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20*1000).build();
  5. }
  6. //队列设置TTL的第二种方法
  7. @Bean("ttlQueue")
  8. public Queue ttlQueue() {
  9.     Map<String, Object> arguments = new HashMap<>();
  10.     arguments.put("x-message-ttl",20000);
  11.     return QueueBuilder.durable(Constants.TTL_QUEUE).withArguments(arguments).build();
  12. }
  13. //也可以将map平铺出来
  14. @Bean("ttlQueue")
  15. public Queue ttlQueue() {
  16.     Map<String, Object> arguments = new HashMap<>();
  17.     arguments.put("x-message-ttl",20000);
  18.     return QueueBuilder.durable(Constants.TTL_QUEUE).withArgument("x-message-ttl",20000).build();
  19. }
复制代码
这里创建两个队列,一个是设置了 TTL 的队列,一个是没有设置 TTL 的对了,然后看两个队列的差异:
  1. @Bean("ttlExchange")
  2. public DirectExchange ttlExchange() {
  3.     return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
  4. }
  5. @Bean("ttlQueue")
  6. public Queue ttlQueue() {
  7.     Map<String, Object> arguments = new HashMap<>();
  8.     arguments.put("x-message-ttl",20000);
  9.     return QueueBuilder.durable(Constants.TTL_QUEUE).withArgument("x-message-ttl",20000).build();
  10. }
  11. @Bean("normalQueue")
  12. public Queue normalQueue() {
  13.     return QueueBuilder.durable(Constants.NORMAL_QUEUE).build();
  14. }
  15. @Bean("ttlBinding")
  16. public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange) {
  17.     return BindingBuilder.bind(queue).to(exchange).with("ttl");
  18. }
  19. @Bean("ttlBinding2")
  20. public Binding ttlBinding2(@Qualifier("normalQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange) {
  21.     return BindingBuilder.bind(queue).to(exchange).with("ttl");
  22. }
复制代码


过一段时间之后再观察:

设置队列的过期时间和设置消息的过期时间的区别
设置队列TTL属性的方法,一旦消息过期,就会从队列中删除。
设置消息TTL的方法,即使消息过期,也不会立刻从队列中删除,而是在即将投递到消费者之前举行判定的。
为什么这两种方法处理的方式不一样?
因为设置队列过期时间,队列中已过期的消息肯定在队列头部(假设是按照时间次序入队的),RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
而设置消息TTL的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,这样服从较低。因此,RabbitMQ选择等到此消息即将被消费时再判定是否过期,如果过期再举行删除即可。跟 Redis 中的惰性删除是一个原理。这种方式避免了不须要的全队列扫描,提高了服从。
注意:这里的“即将投递到消费者之前”通常指的是在消息被发送到消费者之前,RabbitMQ会检查该消息是否已过期。如果消息已过期,RabbitMQ会将其删除,不会将其发送给消费者。这种机制确保了消费者不会接收到过期的消息。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表