RabbitMQ基础知识

打印 上一主题 下一主题

主题 547|帖子 547|积分 1641

RabbitMQ 是一个实现了高级消息队列协议(AMQP)的开源消息署理软件,主要使用 Erlang 编程语言编写。
Erlang 语言具有高并发、分布式、可靠性强等特点,非常适合用于构建像 RabbitMQ 这样的分布式消息中心件。它能够有效地处理大量的并发毗连和消息传递,确保体系的稳固性和可靠性。
以下是对 RabbitMQ 的具体先容:
一、主要特点


  • 可靠性高

    • 确保消息能够可靠地传输,即使在网络故障或服务器故障的环境下也能包管消息不丢失。它通过持久化机制、确认机制和事务机制等多种方式来实现消息的可靠传递。
    • 持久化机制可以将消息存储在磁盘上,即使服务器重启,消息也不会丢失。确认机制要求接收者在接收到消息后发送确认信号,确保消息被正确处理。事务机制则提供了一种原子性的操作方式,包管一组消息的发送和处理要么全部乐成,要么全部失败。

  • 机动的路由

    • 支持多种消息路由模式,可以根据不同的需求将消息发送到不同的队列。RabbitMQ 提供了四种交换机类型,分别是直连交换机(Direct Exchange)、扇形交换机(Fanout Exchange)、主题交换机(Topic Exchange)和头交换机(Headers Exchange)。
    • 直连交换机将消息路由到那些 binding key 与 routing key 完全匹配的队列中。扇形交换机将消息广播到全部绑定到它的队列中,忽略 routing key。主题交换机通过通配符的方式进行消息路由,routing key 由多个单词构成,以点号分隔。头交换机根据消息的头部属性进行路由,队列通过指定消息头的键值对来绑定到交换机。

  • 高吞吐量

    • 能够处理大量的消息,实用于高并发的应用场景。RabbitMQ 接纳了高效的消息存储和传输机制,可以快速地处理大量的消息。它还支持集群摆设,可以通过增长服务器节点来进步体系的吞吐量和可用性。

  • 多种编程语言支持

    • 可以使用多种编程语言进行开辟,如 Java、Python、C# 等。RabbitMQ 提供了丰富的客户端库,方便不同编程语言的开辟者使用。这些客户端库提供了与 RabbitMQ 服务器进行通信的接口,使得开辟者可以轻松地发送和接收消息。

  • 分布式架构

    • 可以在分布式环境中摆设,实现高可用性和可扩展性。RabbitMQ 支持集群摆设,可以将多个服务器节点构成一个集群,进步体系的可用性和吞吐量。在集群中,消息可以在不同的节点之间进行复制和路由,确保消息的可靠传递。此外,RabbitMQ 还支持 Federation 和 Shovel 插件,可以实现跨数据中心的消息传递和复制。

二、工作原理


  • 消息生产者(Producer)将消息发送到 RabbitMQ 服务器。

    • 生产者通过毗连到 RabbitMQ 服务器,创建一个通道(Channel),并使用该通道将消息发送到指定的交换机(Exchange)。在发送消息时,生产者须要指定消息的 routing key,以便 RabbitMQ 根据 routing key 将消息路由到相应的队列(Queue)。

  • 交换机根据路由规则将消息路由到一个或多个队列。

    • RabbitMQ 服务器中的交换机根据不同的类型和路由规则,将接收到的消息路由到一个或多个队列中。交换机的类型包括直连交换机、扇形交换机、主题交换机和头交换机,每种类型的交换机都有不同的路由规则。

  • 消息消耗者(Consumer)从队列中获取消息并进行处理。

    • 消耗者通过毗连到 RabbitMQ 服务器,创建一个通道,并使用该通道从指定的队列中获取消息。消耗者可以接纳推模式(Push)或拉模式(Pull)获取消息。在推模式下,RabbitMQ 服务器会主动将消息推送给消耗者;在拉模式下,消耗者须要主动从队列中拉取消息。

三、应用场景


  • 异步通信

    • 在分布式体系中,不同的组件之间大概须要进行异步通信。比方,在一个电商体系中,当用户下单后,订单体系可以将订单信息发送到消息队列中,然后由库存体系、支付体系等其他组件从消息队列中获取订单信息并进行处理。这样可以克制订单体系等待其他体系的响应,进步体系的响应速度和吞吐量。

  • 解耦

    • 在复杂的体系中,不同的模块之间大概存在精密的耦合关系。通过使用消息队列,可以将不同模块之间的通信解耦,使得各个模块可以独立地进行开辟、测试和摆设。比方,在一个物流体系中,订单体系、运输体系和仓储体系之间可以通过消息队列进行通信,当订单状态发生厘革时,订单体系将消息发送到消息队列中,运输体系和仓储体系从消息队列中获取消息并进行相应的处理。

  • 流量削峰

    • 在高并发的体系中,大概会出现瞬间的流量高峰,导致体系压力过大。通过使用消息队列,可以将瞬间的流量高峰缓存起来,然后由体系逐步处理,从而克制体系因瞬间的流量高峰而崩溃。比方,在一个秒杀体系中,当用户发起秒杀哀求时,体系可以将哀求发送到消息队列中,然后由背景体系逐步处理这些哀求,克制因瞬间的流量高峰而导致体系崩溃。

  • 数据同步

    • 在分布式体系中,不同的数据存储之间大概须要进行数据同步。通过使用消息队列,可以将数据变更变乱发送到消息队列中,然后由其他数据存储从消息队列中获取变乱并进行相应的处理,从而实现数据的同步。比方,在一个分布式数据库体系中,当一个数据库节点的数据发生厘革时,该节点可以将数据变更变乱发送到消息队列中,然后由其他数据库节点从消息队列中获取变乱并进行相应的处理,从而实现数据的同步。

总之,RabbitMQ 是一个功能强大、机动可靠的消息署理软件,广泛应用于分布式体系中的异步通信、解耦、流量削峰和数据同步等场景。其使用 Erlang 编程语言编写,充分发挥了 Erlang 语言在高并发、分布式和可靠性方面的上风。
Direct Exchange(直连交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     public static final String DIRECT_EXCHANGE_NAME = "directExchange";
  10.     public static final String DIRECT_QUEUE_NAME = "directQueue";
  11.     public static final String DIRECT_ROUTING_KEY = "directKey";
  12.     @Bean
  13.     public DirectExchange directExchange() {
  14.         return new DirectExchange(DIRECT_EXCHANGE_NAME);
  15.     }
  16.     @Bean
  17.     public Queue directQueue() {
  18.         return new Queue(DIRECT_QUEUE_NAME);
  19.     }
  20.     @Bean
  21.     public Binding directBinding() {
  22.         return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);
  23.     }
  24. }
复制代码
生产者服务类
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class DirectMessageProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendDirectMessage(String message) {
  9.         rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME, RabbitMQConfig.DIRECT_ROUTING_KEY, message);
  10.     }
  11. }
复制代码
消耗者服务类
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class DirectMessageConsumer {
  5.     @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_NAME)
  6.     public void receiveDirectMessage(String message) {
  7.         System.out.println("Received direct message: " + message);
  8.     }
  9. }
复制代码
Fanout Exchange(扇形交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class FanoutConfig {
  9.     public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
  10.     public static final String FANOUT_QUEUE_1_NAME = "fanoutQueue1";
  11.     public static final String FANOUT_QUEUE_2_NAME = "fanoutQueue2";
  12.     @Bean
  13.     public FanoutExchange fanoutExchange() {
  14.         return new FanoutExchange(FANOUT_EXCHANGE_NAME);
  15.     }
  16.     @Bean
  17.     public Queue fanoutQueue1() {
  18.         return new Queue(FANOUT_QUEUE_1_NAME);
  19.     }
  20.     @Bean
  21.     public Queue fanoutQueue2() {
  22.         return new Queue(FANOUT_QUEUE_2_NAME);
  23.     }
  24.     @Bean
  25.     public Binding fanoutBinding1() {
  26.         return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
  27.     }
  28.     @Bean
  29.     public Binding fanoutBinding2() {
  30.         return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
  31.     }
  32. }
复制代码
生产者服务类
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class FanoutMessageProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendFanoutMessage(String message) {
  9.         rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, "", message);
  10.     }
  11. }
复制代码
消耗者服务类 1
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class FanoutMessageConsumer1 {
  5.     @RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_1_NAME)
  6.     public void receiveFanoutMessage1(String message) {
  7.         System.out.println("Received fanout message 1: " + message);
  8.     }
  9. }
复制代码
消耗者服务类 2
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class FanoutMessageConsumer2 {
  5.     @RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_2_NAME)
  6.     public void receiveFanoutMessage2(String message) {
  7.         System.out.println("Received fanout message 2: " + message);
  8.     }
  9. }
复制代码
Topic Exchange(主题交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.TopicExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class TopicConfig {
  9.     public static final String TOPIC_EXCHANGE_NAME = "topicExchange";
  10.     public static final String TOPIC_QUEUE_1_NAME = "topicQueue1";
  11.     public static final String TOPIC_QUEUE_2_NAME = "topicQueue2";
  12.     public static final String TOPIC_ROUTING_KEY_1 = "topic.key1";
  13.     public static final String TOPIC_ROUTING_KEY_2 = "topic.key2.*";
  14.     @Bean
  15.     public TopicExchange topicExchange() {
  16.         return new TopicExchange(TOPIC_EXCHANGE_NAME);
  17.     }
  18.     @Bean
  19.     public Queue topicQueue1() {
  20.         return new Queue(TOPIC_QUEUE_1_NAME);
  21.     }
  22.     @Bean
  23.     public Queue topicQueue2() {
  24.         return new Queue(TOPIC_QUEUE_2_NAME);
  25.     }
  26.     @Bean
  27.     public Binding topicBinding1() {
  28.         return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);
  29.     }
  30.     @Bean
  31.     public Binding topicBinding2() {
  32.         return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);
  33.     }
  34. }
复制代码
生产者服务类
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class TopicMessageProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendTopicMessage(String routingKey, String message) {
  9.         rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, routingKey, message);
  10.     }
  11. }
复制代码
消耗者服务类 1
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class TopicMessageConsumer1 {
  5.     @RabbitListener(queues = TopicConfig.TOPIC_QUEUE_1_NAME)
  6.     public void receiveTopicMessage1(String message) {
  7.         System.out.println("Received topic message 1: " + message);
  8.     }
  9. }
复制代码
消耗者服务类 2
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class TopicMessageConsumer2 {
  5.     @RabbitListener(queues = TopicConfig.TOPIC_QUEUE_2_NAME)
  6.     public void receiveTopicMessage2(String message) {
  7.         System.out.println("Received topic message 2: " + message);
  8.     }
  9. }
复制代码
Headers Exchange(头交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.HeadersExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. @Configuration
  10. public class HeadersConfig {
  11.     public static final String HEADERS_EXCHANGE_NAME = "headersExchange";
  12.     public static final String HEADERS_QUEUE_1_NAME = "headersQueue1";
  13.     public static final String HEADERS_QUEUE_2_NAME = "headersQueue2";
  14.     @Bean
  15.     public HeadersExchange headersExchange() {
  16.         return new HeadersExchange(HEADERS_EXCHANGE_NAME);
  17.     }
  18.     @Bean
  19.     public Queue headersQueue1() {
  20.         return new Queue(HEADERS_QUEUE_1_NAME);
  21.     }
  22.     @Bean
  23.     public Queue headersQueue2() {
  24.         return new Queue(HEADERS_QUEUE_2_NAME);
  25.     }
  26.     @Bean
  27.     public Binding headersBinding1() {
  28.         Map<String, Object> headers1 = new HashMap<>();
  29.         headers1.put("type", "important");
  30.         return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(headers1).match();
  31.     }
  32.     @Bean
  33.     public Binding headersBinding2() {
  34.         Map<String, Object> headers2 = new HashMap<>();
  35.         headers2.put("type", "normal");
  36.         return BindingBuilder.bind(headersQueue2()).to(headersExchange()).whereAll(headers2).match();
  37.     }
  38. }
复制代码
生产者服务类
  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.core.MessageProperties;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class HeadersMessageProducer {
  8.     @Autowired
  9.     private RabbitTemplate rabbitTemplate;
  10.     public void sendHeadersMessage(String type, String message) {
  11.         MessageProperties properties = new MessageProperties();
  12.         properties.setHeader("type", type);
  13.         Message amqpMessage = new Message(message.getBytes(), properties);
  14.         rabbitTemplate.send(HeadersConfig.HEADERS_EXCHANGE_NAME, "", amqpMessage);
  15.     }
  16. }
复制代码
消耗者服务类 1
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class HeadersMessageConsumer1 {
  5.     @RabbitListener(queues = HeadersConfig.HEADERS_QUEUE_1_NAME)
  6.     public void receiveHeadersMessage1(String message) {
  7.         System.out.println("Received important headers message: " + message);
  8.     }
  9. }
复制代码
消耗者服务类 2
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class HeadersMessageConsumer2 {
  5.     @RabbitListener(queues = HeadersConfig.HEADERS_QUEUE_2_NAME)
  6.     public void receiveHeadersMessage2(String message) {
  7.         System.out.println("Received normal headers message: " + message);
  8.     }
  9. }
复制代码
死信队列实现步骤


  • 添加依赖

    • 在项目的pom.xml文件中添加 RabbitMQ 的依赖:

  1.      <dependency>
  2.          <groupId>org.springframework.boot</groupId>
  3.          <artifactId>spring-boot-starter-amqp</artifactId>
  4.      </dependency>
复制代码

  • 配置 RabbitMQ

    • 在application.properties或application.yml文件中配置 RabbitMQ 的毗连信息:

  1.      spring.rabbitmq.host=localhost
  2.      spring.rabbitmq.port=5672
  3.      spring.rabbitmq.username=guest
  4.      spring.rabbitmq.password=guest
复制代码

  • 界说普通队列、死信交换器和死信队列

    • 使用@Bean注解在配置类中界说普通队列、死信交换器和死信队列:

  1.      import org.springframework.amqp.core.Binding;
  2.      import org.springframework.amqp.core.BindingBuilder;
  3.      import org.springframework.amqp.core.DirectExchange;
  4.      import org.springframework.amqp.core.Queue;
  5.      import org.springframework.context.annotation.Bean;
  6.      import org.springframework.context.annotation.Configuration;
  7.      @Configuration
  8.      public class RabbitMQConfig {
  9.          // 普通队列名称
  10.          public static final String NORMAL_QUEUE_NAME = "normalQueue";
  11.          // 死信队列名称
  12.          public static final String DEAD_LETTER_QUEUE_NAME = "deadLetterQueue";
  13.          // 普通交换器名称
  14.          public static final String NORMAL_EXCHANGE_NAME = "normalExchange";
  15.          // 死信交换器名称
  16.          public static final String DEAD_LETTER_EXCHANGE_NAME = "deadLetterExchange";
  17.          @Bean
  18.          public Queue normalQueue() {
  19.              return new Queue(NORMAL_QUEUE_NAME, true, false, false);
  20.          }
  21.          @Bean
  22.          public Queue deadLetterQueue() {
  23.              return new Queue(DEAD_LETTER_QUEUE_NAME, true, false, false);
  24.          }
  25.          @Bean
  26.          public DirectExchange normalExchange() {
  27.              return new DirectExchange(NORMAL_EXCHANGE_NAME);
  28.          }
  29.          @Bean
  30.          public DirectExchange deadLetterExchange() {
  31.              return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  32.          }
  33.          @Bean
  34.          public Binding normalQueueBinding() {
  35.              return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRoutingKey");
  36.          }
  37.          @Bean
  38.          public Binding deadLetterQueueBinding() {
  39.              return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("deadLetterRoutingKey");
  40.          }
  41.      }
复制代码

  • 设置死信参数

    • 在界说普通队列时,设置死信交换器和死信路由键等参数:

  1.      @Bean
  2.      public Queue normalQueue() {
  3.          return QueueBuilder.durable(NORMAL_QUEUE_NAME)
  4.                 .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
  5.                 .withArgument("x-dead-letter-routing-key", "deadLetterRoutingKey")
  6.                 .build();
  7.      }
复制代码

  • 消耗者和生产者

    • 生产者发送消息到普通队列:

  1.      import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class MessageProducer {
  5.          private final RabbitTemplate rabbitTemplate;
  6.          public MessageProducer(RabbitTemplate rabbitTemplate) {
  7.              this.rabbitTemplate = rabbitTemplate;
  8.          }
  9.          public void sendMessage(String message) {
  10.              rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_NAME, "normalRoutingKey", message);
  11.          }
  12.      }
复制代码


  • 消耗者从死信队列中获取消息进行处理:
  1.      import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class DeadLetterQueueConsumer {
  5.          @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
  6.          public void consumeDeadLetterMessage(String message) {
  7.              System.out.println("Received dead letter message: " + message);
  8.          }
  9.      }
复制代码
延时消息队列实现步骤


  • 安装 RabbitMQ Delayed Message Exchange 插件

    • RabbitMQ 自己不支持原生的延时消息队列,须要安装 Delayed Message Exchange 插件来实现。可以从 RabbitMQ 官网下载插件并按照阐明进行安装。

  • 配置延时交换器

    • 在配置类中界说延时交换器:

  1.      import org.springframework.amqp.core.Binding;
  2.      import org.springframework.amqp.core.BindingBuilder;
  3.      import org.springframework.amqp.core.CustomExchange;
  4.      import org.springframework.context.annotation.Bean;
  5.      import org.springframework.context.annotation.Configuration;
  6.      @Configuration
  7.      public class DelayedQueueConfig {
  8.          // 延时交换器名称
  9.          public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";
  10.          // 延时队列名称
  11.          public static final String DELAYED_QUEUE_NAME = "delayedQueue";
  12.          @Bean
  13.          public CustomExchange delayedExchange() {
  14.              return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false);
  15.          }
  16.          @Bean
  17.          public Binding delayedQueueBinding() {
  18.              return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayedRoutingKey").noargs();
  19.          }
  20.      }
复制代码

  • 发送延时消息

    • 生产者发送延时消息到延时交换器:

  1.      import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class DelayedMessageProducer {
  5.          private final RabbitTemplate rabbitTemplate;
  6.          public DelayedMessageProducer(RabbitTemplate rabbitTemplate) {
  7.              this.rabbitTemplate = rabbitTemplate;
  8.          }
  9.          public void sendDelayedMessage(String message, long delayInMilliseconds) {
  10.              rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, "delayedRoutingKey", message, msg -> {
  11.                  msg.getMessageProperties().setDelay(delayInMilliseconds);
  12.                  return msg;
  13.              });
  14.          }
  15.      }
复制代码

  • 消耗者处理延时消息

    • 消耗者从延时队列中获取消息进行处理:

  1.      import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class DelayedQueueConsumer {
  5.          @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
  6.          public void consumeDelayedMessage(String message) {
  7.              System.out.println("Received delayed message: " + message);
  8.          }
  9.      }
复制代码
通过以上步骤,就可以在 Spring Boot 3 中实现 RabbitMQ 的死信队列和延时消息队列。在实际应用中,可以根据具体的业务需求进行调解和扩展。
RabbitMQ 的常晤面试题及答案

一、基础概念类


  • 请简要先容 RabbitMQ 的作用和特点。

    • 作用:RabbitMQ 是一个实现了高级消息队列协议(AMQP)的开源消息署理软件,主要用于在分布式体系中存储和转发消息,实现应用步伐之间的异步通信。
    • 特点:高可靠、高可用、可扩展性强、支持多种消息协议、机动的路由机制、提供多种交换器类型等。

  • RabbitMQ 中有哪些主要的组件?

    • 队列(Queue):用于存储消息,消耗者从队列中获取消息进行处理。
    • 交换器(Exchange):接收生产者发送的消息,并根据肯定的规则将消息路由到一个或多个队列。
    • 绑定(Binding):界说了交换器和队列之间的关系,通过路由键将两者关联起来。
    • 通道(Channel):在客户端和 RabbitMQ 服务器之间建立的通信通道,用于发送和接收消息。

  • 表明一下 RabbitMQ 的工作流程。

    • 生产者将消息发送到交换器。
    • 交换器根据预先配置的路由规则将消息路由到一个或多个队列。
    • 消耗者从队列中获取消息并进行处理。

二、技术实现类


  • 如何在 Java 项目中使用 RabbitMQ?

    • 添加 RabbitMQ 的客户端库依赖,如com.rabbitmq:amqp-client。
    • 创建毗连工厂,设置毗连参数(如主机名、端口、用户名、密码等)。
    • 通过毗连工厂创建毗连,再从毗连中创建通道。
    • 使用通道声明队列、交换器和绑定关系。
    • 生产者使用通道发送消息,消耗者使用通道从队列中获取消息并处理。

  • 如何声明一个队列和交换器,并将它们绑定起来?

    • 使用通道的queueDeclare方法声明队列,可以设置队列名称、是否持久化、是否独占、是否自动删除等参数。
    • 使用通道的exchangeDeclare方法声明交换器,可以设置交换器名称、类型、是否持久化等参数。
    • 使用通道的queueBind方法将队列和交换器绑定起来,指定路由键。

  • 在 Java 中如何发送和接收消息?

    • 发送消息:生产者使用通道的basicPublish方法将消息发送到指定的交换器和路由键。
    • 接收消息:消耗者使用通道的basicConsume方法订阅一个队列,当有消息到达队列时,会自动调用注册的回调方法进行处理。

三、高级特性类


  • RabbitMQ 的消息确认机制是怎样的?

    • RabbitMQ 提供了两种消息确认方式:自动确认和手动确认。
    • 自动确认:消耗者在接收到消息后立即自动确认,不管消息是否被乐成处理。这种方式大概会导致消息丢失,如果消耗者在处理消息过程中出现非常崩溃,而消息已经被确认,那么该消息就不会被重新投递。
    • 手动确认:消耗者在处理完消息后,显式地调用channel.basicAck方法进行确认。如果处理过程中出现非常,可以调用channel.basicNack或channel.basicReject方法拒绝消息,让 RabbitMQ 重新投递该消息。

  • 如何实现 RabbitMQ 的消息持久化?

    • 对于队列,可以在声明队列时设置durable=true,这样队列在 RabbitMQ 服务器重启后不会丢失。
    • 对于消息,可以在发送消息时设置消息的deliveryMode为 2,表示持久化消息。这样即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  • 如那边理 RabbitMQ 中的死信队列?

    • 当消息被拒绝(basic.reject/basic.nack)且requeue=false,大概消息过期、队列到达最大长度等环境时,消息会被放入死信队列。
    • 可以创建一个专门的死信队列,并在正常队列的声明中指定死信交换器和路由键,当满足死信条件时,消息会被路由到死信队列,然后可以由专门的消耗者来处理死信消息。

  • RabbitMQ 如何实现消息的负载均衡?

    • 可以启动多个消耗者,每个消耗者订阅同一个队列。RabbitMQ 会自动将队列中的消息均衡地分配给各个消耗者,实现负载均衡。

四、性能优化类


  • 如何进步 RabbitMQ 的性能?

    • 公道设置队列和交换器的参数,如队列的大小、持久化选项等。
    • 使用批量发送和接收消息的方式,淘汰网络开销。
    • 优化消耗者的处理逻辑,进步消息处理速度。
    • 调解 RabbitMQ 的内存和磁盘配置,以适应不同的负载环境。

  • 在高并发场景下,如何包管 RabbitMQ 的稳固性?

    • 增长 RabbitMQ 服务器的硬件资源,如内存、CPU 等。
    • 公道设置毗连数、通道数等参数,克制资源耗尽。
    • 监控 RabbitMQ 的运行状态,及时发现和处理问题。
    • 使用集群模式摆设 RabbitMQ,进步体系的可用性和可扩展性。

在 RabbitMQ 中,如何包管消息的次序性?

一、单线程消耗
一个简单的方法是确保同一个队列的消息始终由同一个消耗者以单线程的方式进行处理。这样可以包管消息按照在队列中的次序被依次处理。
二、拆分队列

  • 业务分析:

    • 如果一个业务场景中有多个不同类型的消息混合在一个队列中,大概会导致消息处理次序混乱。可以根据业务类型将消息拆分成不同的队列,每个队列由独立的消耗者进行处理。
    • 比方,在一个电商体系中,订单创建、订单支付和订单发货的消息如果混在一个队列中,大概由于不同类型消息处理时间的差别而导致次序混乱。可以将这三种类型的消息分别放入不同的队列进行处理。

  • 实现方式:

    • 生产者在发送消息时,根据消息的类型将其发送到对应的队列中。
    • 每个队列都有一个专门的消耗者进行处理,确保同一类型的消息按照次序处理。

三、克制重试机制影响次序性

  • 问题分析:

    • 当消息处理失败须要重试时,如果不加以控制,重试的消息大概会被插入到队列的中心位置,从而破坏消息的次序性。
    • 比方,消息 M1、M2、M3 依次进入队列,M2 处理失败进行重试。如果不做特殊处理,重试的 M2 大概会在 M3 被处理之前再次进入队列并被处理,导致次序混乱。

  • 办理方案:

    • 可以将重试的消息放入一个专门的重试队列,而不是直接插入到原始队列中。当消息处理失败时,将其发送到重试队列,并设置一个延迟时间,等延迟时间过后再从重试队列中取出消息进行处理。
    • 大概使用消息版本号的方式,每次重试时增长消息的版本号,消耗者在处理消息时,根据版本号判定是否是重试的消息,并按照次序进行处理。

须要留意的是,在分布式环境中,完全包管消息的次序性黑白常困难的,因为大概会出现网络延迟、节点故障等各种不可预测的环境。但通过以上方法,可以在肯定程度上进步消息处理的次序性。
在 RabbitMQ 中,如何包管消息的幂等性?

一、数据库唯一束缚

  • 实用场景:

    • 当消息的处理结果会影响数据库中的数据时,可以使用数据库的唯一束缚来包管幂等性。
    • 比方,一个订单处理体系,接收到订单创建的消息后,会在数据库中插入订单记录。如果重复收到雷同的订单创建消息,须要确保不会重复插入订单记录。

  • 实现方式:

    • 在数据库表中,为关键字段设置唯一束缚,比方订单号。
    • 当消耗者接收到消息并进行处理时,尝试将数据插入数据库。如果插入操作因为唯一束缚失败,阐明该消息已经被处理过,直接忽略即可。

二、使用唯一标识符和缓存

  • 实用场景:

    • 对于一些不直接操作数据库,但须要进行复杂业务处理的场景,可以使用唯一标识符和缓存来实现幂等性。
    • 比如,一个消息处理过程中须要调用多个外部服务,而且处理结果不直接存储在数据库中。

  • 实现方式:

    • 生产者在发送消息时,为每条消息生成一个唯一标识符(如 UUID),并将其包罗在消息中。
    • 消耗者接收到消息后,提取唯一标识符,并检查缓存中是否已经存在该标识符。如果存在,阐明该消息已经被处理过,直接忽略;如果不存在,将标识符存入缓存,并进行消息的处理。

三、记录消息处理状态

  • 实用场景:

    • 当消息的处理过程比较复杂,大概涉及多个步骤或状态厘革时,可以通过记录消息的处理状态来包管幂等性。
    • 比方,一个审批流程体系,消息表示一个审批使命,审批过程大概经过多个阶段。须要确保每个审批使命在每个阶段只被处理一次。

  • 实现方式:

    • 创建一个消息处理状态表,记录每个消息的处理状态和进度。
    • 消耗者接收到消息后,根据消息的唯一标识查询状态表。如果消息已经处于已处理状态,直接忽略;如果消息处于未处理状态,进行处理,并更新状态表中的状态信息。

总之,包管消息的幂等性须要根据具体的业务场景选择合适的方法。在实际应用中,可以结合多种方法来进步幂等性的包管程度。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

络腮胡菲菲

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表