死信队列详解

打印 上一主题 下一主题

主题 666|帖子 666|积分 1998

什么是死信队列?

在消息队列中,执行异步使命时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息大概无法正常地被处理和斲丧,比方:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。
为了避免这些未成功处理的消息导致步伐异常或对系统造成影响,我们必要使用死信队列(Dead Letter Queue)。当我们设置死信队列后,全部无法成功处理的消息将被捕获并重定向到指定的死信交换机中。消费者可以从该交换机中读取并处理这些“死信”。
死信队列的长处

使用死信队列有以下长处:


  • 提高系统可靠性:避免因未处理的死信而导致步伐异常,提高系统的可靠性。
  • 实现延迟消息:可以通过设置TTL时间,将超时未消费的消息转移到死信队列中,实现延迟消息的功能。
  • 防止滥用:当某些生产者恶意发送低质量的消息或进行滥用时,可以通过丢弃或重定向死信消息来防止滥用和恶意攻击。
死信队列的应用场景

死信队列在以下场景下黑白常有用的:


  • 消息格式错误:当消息格式错误时,大概会导致消费者无法正确地解析或处理该消息,这个问题通常与生产者的代码有关。为了避免消息失效,并提高系统可靠性,我们可以使用死信队列。
  • 消费者故障:另一个常见的场景是消息处理者无法正确地处理或响应到推入到队列中的消息,比方消费者创建了一个协程并在逻辑执行完成后未正确地关闭该协程。由于该协程始终处于打开状态,它将不停制止该消费者对其他消息进行正确消费。为了避免这种消息挂起并影响其他消息的正常处理,可以将其参加死信中心。
死信队列的实现方式

下面通过RabbitMQ和Spring Boot,演示如何实现死信队列。
RabbitMQ实现

创建交换机和队列

  1. import pika
  2. def main():
  3.     credentials = pika.PlainCredentials('guest', 'guest')
  4.     parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
  5.     # 创建死信交换机
  6.     dlx_exchnage_name = 'my-dlx-exchange'
  7.     with pika.BlockingConnection(parameters) as connection:
  8.         channel = connection.channel()
  9.         channel.exchange_declare(exchange=dlx_exchnage_name, exchange_type='fanout', durable=True)
  10.     # 创建死信队列和交换机
  11.     dead_letter_queue_name = 'dead-letter-queue'
  12.     with pika.BlockingConnection(parameters) as connection:
  13.         channel = connection.channel()
  14.         channel.queue_declare(queue=dead_letter_queue_name, durable=True)
  15.         channel.queue_bind(queue=dead_letter_queue_name, exchange=dlx_exchnage_name)
  16.     # 创建消息队列,并将其绑定到死信队列上  
  17.     queue_name = "job-queue"
  18.     arguments = {"x-dead-letter-exchange": dlx_exchnage_name}
  19.     with pika.BlockingConnection(parameters) as connection:
  20.         channel = connection.channel()
  21.         channel.queue_declare(queue=queue_name, durable=True,
  22.                               arguments=arguments)
  23.         channel.queue_bind(exchange='', queue=queue_name, routing_key=queue_name)
  24.                               
  25.     print("Queue is created")
  26.                               
  27. if __name__ == '__main__':
  28.     main()
复制代码
以上代码创建了两个队列,一个是my-dlx-exchange,一个是dead-letter-queue。同时创建另外一个名为job-queue的队列,它绑定了dead-letter-exchange这个交换机。
在发送消息时,必要提供一些属性来指定该队列应采取哪些步骤来防止该类丢失的消息。 这里我们可以使用x-dead-letter-exchange和x-message-ttl两个特殊属性,告诉RabbitMQ,如果消息在某段时间内无法正确处理,则将其放入死信队列中。
发送和吸收消息

  1. import pika
  2. def send_message():
  3.     credentials = pika.PlainCredentials('guest', 'guest')
  4.     parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
  5.     connection = pika.BlockingConnection(parameters)
  6.     channel = connection.channel()
  7.     # 发送一个消息5秒后过期,并且未被消费端确认
  8.     queue_name = "job-queue"
  9.     properties = pika.BasicProperties(delivery_mode=2,
  10.                                       expiration="5000")
  11.     channel.basic_publish(exchange='', routing_key=queue_name, body='Hello World!', properties=properties)
  12.     channel.close()
  13.     connection.close()
  14. def receive_message():
  15.     credentials = pika.PlainCredentials('guest', 'guest')
  16.     parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
  17.     connection = pika.BlockingConnection(parameters)
  18.     channel = connection.channel()
  19.     dlx_exchnage_name = 'my-dlx-exchange'
  20.     def callback(ch, method, properties, body):
  21.         print("Receivedmessage: %r" % body)
  22.         ch.basic_ack(delivery_tag=method.delivery_tag)
  23.     # 消费来自job-queue的消息
  24.     queue_name = "job-queue"
  25.     channel.basic_consume(queue_name, callback)
  26.     channel.start_consuming()
  27. if __name__ == '__main__':
  28.     send_message()
  29.     receive_message()
复制代码
以上代码是一个简朴的生产者和消费者。send_message()函数发送一个消息,并且未被消费端确认;receive_message()函数从job-queue中吸收消息。
Spring Boot实现

在Spring Boot中,我们可以使用RabbitMQ来实现死信队列。
添加依赖

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
设置文件

  1. spring:
  2.   rabbitmq:
  3.     host: localhost
  4.     port: 5672
  5.     username: guest
  6.     password: guest
复制代码
创建死信交换机、队列和绑定

  1. @Configuration
  2. public class RabbitConfig {
  3.     // 死信交换机
  4.     public static final String DLX_EXCHANGE_NAME = "my-dlx-exchange";
  5.     // 死信队列
  6.     public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";
  7.     // 业务处理队列
  8.     public static final String PROCESS_QUEUE_NAME = "process-queue";
  9.     @Bean
  10.     public TopicExchange dlxExchange() {
  11.         return new TopicExchange(DLX_EXCHANGE_NAME);
  12.     }
  13.     @Bean
  14.     public Queue deadLetterQueue() {
  15.         return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME)
  16.                 .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
  17.                 .build();
  18.     }
  19.     @Bean
  20.     public Queue processQueue() {
  21.         return QueueBuilder.durable(PROCESS_QUEUE_NAME)
  22.                 .withArgument("x-message-ttl", 5000)
  23.                 .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
  24.                 .build();
  25.     }
  26.     @Bean
  27.     public Binding dlxBinding(Queue deadLetterQueue, TopicExchange dlxExchange) {
  28.         return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("#");
  29.     }
  30. }
复制代码
以上代码创建了my-dlx-exchange死信交换机和dead-letter-queue死信队列。同时创建一个process-queue业务处理队列,该队列设置了消息的生存时间为5s,并在该时间内未被消费者消费,则将该消息转移到死信队列中。
发送和吸收消息

  1. @Component
  2. public class MessageSender {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     public void send(String message) {
  6.         rabbitTemplate.convertAndSend("", RabbitConfig.PROCESS_QUEUE_NAME, message);
  7.     }
  8. }
  9. @Component
  10. public class MessageReceiver {
  11.     @RabbitListener(queues = "process-queue")
  12.     public void receive(String message) throws Exception {
  13.         System.out.println("Received message: " + message);
  14.         Thread.sleep(10000);
  15.     }
  16. }
复制代码
以上代码是一个简朴的生产者和消费者。生产者使用RabbitTemplate来发送消息到process-queue队列中;消费者通过使用注解@RabbitListener监听process-queue队列的消息并进行处理。
结语

通过本篇文章,我们具体先容了什么是死信队列、死信队列的长处、应用场景以及如何实现死信队列。通过RabbitMQ和Spring Boot的实现方式,不难看出死信队列在项目中的重要性和实际价值。在工程实践中,我们可以根据具体业务需求,结合技能选型,机动运用死信队列来提高系统的可靠性和稳定性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

九天猎人

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表