RabbitMQ如何实现延时队列?

打印 上一主题 下一主题

主题 790|帖子 790|积分 2370

1.什么是RabbitMQ?
RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)。它允许不同的应用步伐通过消息举行通信,并且能够可靠地通报、路由和存储消息。
2.利用RabbitMQ实现延时队列的方法
2.1利用死信交换机(Dead - Letter - Exchange,DLX)来实现
原理:
当消息在队列中逾期后,会被发送到死信交换机,消费者从毗连死信交换机的队列中获取逾期后的消息举行处理。
步调示例:
声明交换机和队列
起首需要声明一个平凡交换机(比如direct类型的交换机)和一个死信交换机(同样可以是direct类型),以及两个队列,一个是平凡队列用于接收消息,另一个是毗连死信交换机的队列用于接收逾期消息。
假设利用 Java 和 RabbitMQ 的 Java 客户端amqp - client来实现,代码如下:
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import java.util.concurrent.TimeoutException;
  8. public class DelayedQueueExample {
  9.     private static final String NORMAL_EXCHANGE = "normal_exchange";
  10.     private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
  11.     private static final String NORMAL_QUEUE = "normal_queue";
  12.     private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
  13.     public static void main(String[] args) throws IOException, TimeoutException {
  14.         ConnectionFactory factory = new ConnectionFactory();
  15.         factory.setHost("localhost");
  16.         Connection connection = factory.newConnection();
  17.         Channel channel = connection.createChannel();
  18.         // 声明死信交换机
  19.         channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
  20.         // 声明普通交换机
  21.         channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
  22.         // 声明死信队列
  23.         channel.queueDeclare(DEAD_LETTER_QUEUE, false, false, false, null);
  24.         // 声明普通队列,并设置死信交换机和路由键
  25.         Map<String, Object> args = new HashMap<>();
  26.         args.put("x - dead - letter - exchange", DEAD_LETTER_EXCHANGE);
  27.         args.put("x - dead - letter - routing - key", "dead_letter_key");
  28.         channel.queueDeclare(NORMAL_QUEUE, false, false, false, args);
  29.         // 将普通队列绑定到普通交换机
  30.         channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal_key");
  31.         // 将死信队列绑定到死信交换机
  32.         channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_key");
  33.     }
  34. }
复制代码
发送消息并设置逾期时间
当队列和交换机声明完成后,可以通过平凡交换机发送消息到平凡队列,并设置消息的逾期时间(expiration属性)。例如,发送一条文本消息并设置逾期时间为 5000 毫秒(5 秒):
  1. String message = "Delayed Message";
  2. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  3.        .expiration("5000")
  4.        .build();
  5. channel.basicPublish(NORMAL_EXCHANGE, "normal_key", properties, message.getBytes());
复制代码
消费者从死信队列中接收消息举行处理。可以利用basicConsume方法来设置消费者从死信队列接收消息。例如:
  1. Consumer consumer = new DefaultConsumer(channel) {
  2.     @Override
  3.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4.         String receivedMessage = new String(body, "UTF - 8");
  5.         System.out.println("Received delayed message: " + receivedMessage);
  6.     }
  7. };
  8. channel.basicConsume(DEAD_LETTER_QUEUE, true, consumer);
复制代码
2.2 利用rabbitmq_delayed_message_exchange 插件
原理:RabbitMQ 提供了一个名为rabbitmq_delayed_message_exchange的插件来实现延时队列。这个插件允许创建一种特别的交换机,称为耽误交换机(Delayed Message Exchange)。当消息发送到这种交换机时,可以指定一个耽误时间,在耽误时间事后,消息才会被路由到相应的队列,然后消费者就可以从队列中获取并处理消息。
步调示例:
安装插件:
起首需要安装rabbitmq_delayed_message_exchange插件。对于不同的操作系统和 RabbitMQ 安装方式,安装步调会有所不同。在基于 Debian 或 Ubuntu 的系统中,如果是通过包管理方式安装的 RabbitMQ,可以利用以下下令安装插件:
sudo rabbitmq - plugins enable rabbitmq_delayed_message_exchange
安装完成后,需要重启 RabbitMQ 服务使插件生效。
声明耽误交换机和队列:
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class DelayedQueuePluginExample {
  7.     private static final String DELAYED_EXCHANGE = "delayed_exchange";
  8.     private static final String QUEUE = "delayed_queue";
  9.     public static void main(String[] args) throws IOException, TimeoutException {
  10.         ConnectionFactory factory = new ConnectionFactory();
  11.         factory.setHost("localhost");
  12.         Connection connection = factory.newConnection();
  13.         Channel channel = connection.createChannel();
  14.         // 声明延迟交换机,设置类型为x - delayed - message
  15.         channel.exchangeDeclare(DELAYED_EXCHANGE, "x - delayed - message");
  16.         channel.queueDeclare(QUEUE, false, false, false, null);
  17.         // 将队列绑定到延迟交换机
  18.         channel.queueBind(QUEUE, DELAYED_EXCHANGE, "");
  19.     }
  20. }
复制代码
发送耽误消息:
  1. String message = "Delayed Message via Plugin";
  2. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  3.        .headers(new HashMap<String, Object>() {
  4.             {
  5.                 put("x - delayed - type", "direct");
  6.                 put("x - delay", 10000);
  7.             }
  8.         })
  9.        .build();
  10. channel.basicPublish(DELAYED_EXCHANGE, "", properties, message.getBytes());
复制代码
后续消费者接收消息就宁静凡交换机一样。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

温锦文欧普厨电及净水器总代理

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表