ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ基础知识 [打印本页]

作者: 络腮胡菲菲    时间: 2024-9-2 00:54
标题: RabbitMQ基础知识
RabbitMQ 是一个实现了高级消息队列协议(AMQP)的开源消息署理软件,主要使用 Erlang 编程语言编写。
Erlang 语言具有高并发、分布式、可靠性强等特点,非常适合用于构建像 RabbitMQ 这样的分布式消息中心件。它能够有效地处理大量的并发毗连和消息传递,确保体系的稳固性和可靠性。
以下是对 RabbitMQ 的具体先容:
一、主要特点

二、工作原理

三、应用场景

总之,RabbitMQ 是一个功能强大、机动可靠的消息署理软件,广泛应用于分布式体系中的异步通信、解耦、流量削峰和数据同步等场景。其使用 Erlang 编程语言编写,充分发挥了 Erlang 语言在高并发、分布式和可靠性方面的上风。
Direct Exchange(直连交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     public static final String DIRECT_EXCHANGE_NAME = "directExchange";
  10.     public static final String DIRECT_QUEUE_NAME = "directQueue";
  11.     public static final String DIRECT_ROUTING_KEY = "directKey";
  12.     @Bean
  13.     public DirectExchange directExchange() {
  14.         return new DirectExchange(DIRECT_EXCHANGE_NAME);
  15.     }
  16.     @Bean
  17.     public Queue directQueue() {
  18.         return new Queue(DIRECT_QUEUE_NAME);
  19.     }
  20.     @Bean
  21.     public Binding directBinding() {
  22.         return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);
  23.     }
  24. }
复制代码
生产者服务类
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class DirectMessageProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendDirectMessage(String message) {
  9.         rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME, RabbitMQConfig.DIRECT_ROUTING_KEY, message);
  10.     }
  11. }
复制代码
消耗者服务类
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class DirectMessageConsumer {
  5.     @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_NAME)
  6.     public void receiveDirectMessage(String message) {
  7.         System.out.println("Received direct message: " + message);
  8.     }
  9. }
复制代码
Fanout Exchange(扇形交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class FanoutConfig {
  9.     public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
  10.     public static final String FANOUT_QUEUE_1_NAME = "fanoutQueue1";
  11.     public static final String FANOUT_QUEUE_2_NAME = "fanoutQueue2";
  12.     @Bean
  13.     public FanoutExchange fanoutExchange() {
  14.         return new FanoutExchange(FANOUT_EXCHANGE_NAME);
  15.     }
  16.     @Bean
  17.     public Queue fanoutQueue1() {
  18.         return new Queue(FANOUT_QUEUE_1_NAME);
  19.     }
  20.     @Bean
  21.     public Queue fanoutQueue2() {
  22.         return new Queue(FANOUT_QUEUE_2_NAME);
  23.     }
  24.     @Bean
  25.     public Binding fanoutBinding1() {
  26.         return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
  27.     }
  28.     @Bean
  29.     public Binding fanoutBinding2() {
  30.         return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
  31.     }
  32. }
复制代码
生产者服务类
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class FanoutMessageProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendFanoutMessage(String message) {
  9.         rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, "", message);
  10.     }
  11. }
复制代码
消耗者服务类 1
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class FanoutMessageConsumer1 {
  5.     @RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_1_NAME)
  6.     public void receiveFanoutMessage1(String message) {
  7.         System.out.println("Received fanout message 1: " + message);
  8.     }
  9. }
复制代码
消耗者服务类 2
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class FanoutMessageConsumer2 {
  5.     @RabbitListener(queues = FanoutConfig.FANOUT_QUEUE_2_NAME)
  6.     public void receiveFanoutMessage2(String message) {
  7.         System.out.println("Received fanout message 2: " + message);
  8.     }
  9. }
复制代码
Topic Exchange(主题交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.TopicExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class TopicConfig {
  9.     public static final String TOPIC_EXCHANGE_NAME = "topicExchange";
  10.     public static final String TOPIC_QUEUE_1_NAME = "topicQueue1";
  11.     public static final String TOPIC_QUEUE_2_NAME = "topicQueue2";
  12.     public static final String TOPIC_ROUTING_KEY_1 = "topic.key1";
  13.     public static final String TOPIC_ROUTING_KEY_2 = "topic.key2.*";
  14.     @Bean
  15.     public TopicExchange topicExchange() {
  16.         return new TopicExchange(TOPIC_EXCHANGE_NAME);
  17.     }
  18.     @Bean
  19.     public Queue topicQueue1() {
  20.         return new Queue(TOPIC_QUEUE_1_NAME);
  21.     }
  22.     @Bean
  23.     public Queue topicQueue2() {
  24.         return new Queue(TOPIC_QUEUE_2_NAME);
  25.     }
  26.     @Bean
  27.     public Binding topicBinding1() {
  28.         return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);
  29.     }
  30.     @Bean
  31.     public Binding topicBinding2() {
  32.         return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);
  33.     }
  34. }
复制代码
生产者服务类
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class TopicMessageProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendTopicMessage(String routingKey, String message) {
  9.         rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, routingKey, message);
  10.     }
  11. }
复制代码
消耗者服务类 1
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class TopicMessageConsumer1 {
  5.     @RabbitListener(queues = TopicConfig.TOPIC_QUEUE_1_NAME)
  6.     public void receiveTopicMessage1(String message) {
  7.         System.out.println("Received topic message 1: " + message);
  8.     }
  9. }
复制代码
消耗者服务类 2
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class TopicMessageConsumer2 {
  5.     @RabbitListener(queues = TopicConfig.TOPIC_QUEUE_2_NAME)
  6.     public void receiveTopicMessage2(String message) {
  7.         System.out.println("Received topic message 2: " + message);
  8.     }
  9. }
复制代码
Headers Exchange(头交换机)案例

配置类
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.HeadersExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. @Configuration
  10. public class HeadersConfig {
  11.     public static final String HEADERS_EXCHANGE_NAME = "headersExchange";
  12.     public static final String HEADERS_QUEUE_1_NAME = "headersQueue1";
  13.     public static final String HEADERS_QUEUE_2_NAME = "headersQueue2";
  14.     @Bean
  15.     public HeadersExchange headersExchange() {
  16.         return new HeadersExchange(HEADERS_EXCHANGE_NAME);
  17.     }
  18.     @Bean
  19.     public Queue headersQueue1() {
  20.         return new Queue(HEADERS_QUEUE_1_NAME);
  21.     }
  22.     @Bean
  23.     public Queue headersQueue2() {
  24.         return new Queue(HEADERS_QUEUE_2_NAME);
  25.     }
  26.     @Bean
  27.     public Binding headersBinding1() {
  28.         Map<String, Object> headers1 = new HashMap<>();
  29.         headers1.put("type", "important");
  30.         return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(headers1).match();
  31.     }
  32.     @Bean
  33.     public Binding headersBinding2() {
  34.         Map<String, Object> headers2 = new HashMap<>();
  35.         headers2.put("type", "normal");
  36.         return BindingBuilder.bind(headersQueue2()).to(headersExchange()).whereAll(headers2).match();
  37.     }
  38. }
复制代码
生产者服务类
  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.core.MessageProperties;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class HeadersMessageProducer {
  8.     @Autowired
  9.     private RabbitTemplate rabbitTemplate;
  10.     public void sendHeadersMessage(String type, String message) {
  11.         MessageProperties properties = new MessageProperties();
  12.         properties.setHeader("type", type);
  13.         Message amqpMessage = new Message(message.getBytes(), properties);
  14.         rabbitTemplate.send(HeadersConfig.HEADERS_EXCHANGE_NAME, "", amqpMessage);
  15.     }
  16. }
复制代码
消耗者服务类 1
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class HeadersMessageConsumer1 {
  5.     @RabbitListener(queues = HeadersConfig.HEADERS_QUEUE_1_NAME)
  6.     public void receiveHeadersMessage1(String message) {
  7.         System.out.println("Received important headers message: " + message);
  8.     }
  9. }
复制代码
消耗者服务类 2
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class HeadersMessageConsumer2 {
  5.     @RabbitListener(queues = HeadersConfig.HEADERS_QUEUE_2_NAME)
  6.     public void receiveHeadersMessage2(String message) {
  7.         System.out.println("Received normal headers message: " + message);
  8.     }
  9. }
复制代码
死信队列实现步骤

  1.      <dependency>
  2.          <groupId>org.springframework.boot</groupId>
  3.          <artifactId>spring-boot-starter-amqp</artifactId>
  4.      </dependency>
复制代码
  1.      spring.rabbitmq.host=localhost
  2.      spring.rabbitmq.port=5672
  3.      spring.rabbitmq.username=guest
  4.      spring.rabbitmq.password=guest
复制代码
  1.      import org.springframework.amqp.core.Binding;
  2.      import org.springframework.amqp.core.BindingBuilder;
  3.      import org.springframework.amqp.core.DirectExchange;
  4.      import org.springframework.amqp.core.Queue;
  5.      import org.springframework.context.annotation.Bean;
  6.      import org.springframework.context.annotation.Configuration;
  7.      @Configuration
  8.      public class RabbitMQConfig {
  9.          // 普通队列名称
  10.          public static final String NORMAL_QUEUE_NAME = "normalQueue";
  11.          // 死信队列名称
  12.          public static final String DEAD_LETTER_QUEUE_NAME = "deadLetterQueue";
  13.          // 普通交换器名称
  14.          public static final String NORMAL_EXCHANGE_NAME = "normalExchange";
  15.          // 死信交换器名称
  16.          public static final String DEAD_LETTER_EXCHANGE_NAME = "deadLetterExchange";
  17.          @Bean
  18.          public Queue normalQueue() {
  19.              return new Queue(NORMAL_QUEUE_NAME, true, false, false);
  20.          }
  21.          @Bean
  22.          public Queue deadLetterQueue() {
  23.              return new Queue(DEAD_LETTER_QUEUE_NAME, true, false, false);
  24.          }
  25.          @Bean
  26.          public DirectExchange normalExchange() {
  27.              return new DirectExchange(NORMAL_EXCHANGE_NAME);
  28.          }
  29.          @Bean
  30.          public DirectExchange deadLetterExchange() {
  31.              return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  32.          }
  33.          @Bean
  34.          public Binding normalQueueBinding() {
  35.              return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRoutingKey");
  36.          }
  37.          @Bean
  38.          public Binding deadLetterQueueBinding() {
  39.              return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("deadLetterRoutingKey");
  40.          }
  41.      }
复制代码
  1.      @Bean
  2.      public Queue normalQueue() {
  3.          return QueueBuilder.durable(NORMAL_QUEUE_NAME)
  4.                 .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
  5.                 .withArgument("x-dead-letter-routing-key", "deadLetterRoutingKey")
  6.                 .build();
  7.      }
复制代码
  1.      import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class MessageProducer {
  5.          private final RabbitTemplate rabbitTemplate;
  6.          public MessageProducer(RabbitTemplate rabbitTemplate) {
  7.              this.rabbitTemplate = rabbitTemplate;
  8.          }
  9.          public void sendMessage(String message) {
  10.              rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_NAME, "normalRoutingKey", message);
  11.          }
  12.      }
复制代码

  1.      import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class DeadLetterQueueConsumer {
  5.          @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
  6.          public void consumeDeadLetterMessage(String message) {
  7.              System.out.println("Received dead letter message: " + message);
  8.          }
  9.      }
复制代码
延时消息队列实现步骤

  1.      import org.springframework.amqp.core.Binding;
  2.      import org.springframework.amqp.core.BindingBuilder;
  3.      import org.springframework.amqp.core.CustomExchange;
  4.      import org.springframework.context.annotation.Bean;
  5.      import org.springframework.context.annotation.Configuration;
  6.      @Configuration
  7.      public class DelayedQueueConfig {
  8.          // 延时交换器名称
  9.          public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";
  10.          // 延时队列名称
  11.          public static final String DELAYED_QUEUE_NAME = "delayedQueue";
  12.          @Bean
  13.          public CustomExchange delayedExchange() {
  14.              return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false);
  15.          }
  16.          @Bean
  17.          public Binding delayedQueueBinding() {
  18.              return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayedRoutingKey").noargs();
  19.          }
  20.      }
复制代码
  1.      import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class DelayedMessageProducer {
  5.          private final RabbitTemplate rabbitTemplate;
  6.          public DelayedMessageProducer(RabbitTemplate rabbitTemplate) {
  7.              this.rabbitTemplate = rabbitTemplate;
  8.          }
  9.          public void sendDelayedMessage(String message, long delayInMilliseconds) {
  10.              rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, "delayedRoutingKey", message, msg -> {
  11.                  msg.getMessageProperties().setDelay(delayInMilliseconds);
  12.                  return msg;
  13.              });
  14.          }
  15.      }
复制代码
  1.      import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.      import org.springframework.stereotype.Component;
  3.      @Component
  4.      public class DelayedQueueConsumer {
  5.          @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
  6.          public void consumeDelayedMessage(String message) {
  7.              System.out.println("Received delayed message: " + message);
  8.          }
  9.      }
复制代码
通过以上步骤,就可以在 Spring Boot 3 中实现 RabbitMQ 的死信队列和延时消息队列。在实际应用中,可以根据具体的业务需求进行调解和扩展。
RabbitMQ 的常晤面试题及答案

一、基础概念类

二、技术实现类

三、高级特性类

四、性能优化类

在 RabbitMQ 中,如何包管消息的次序性?

一、单线程消耗
一个简单的方法是确保同一个队列的消息始终由同一个消耗者以单线程的方式进行处理。这样可以包管消息按照在队列中的次序被依次处理。
二、拆分队列
三、克制重试机制影响次序性
须要留意的是,在分布式环境中,完全包管消息的次序性黑白常困难的,因为大概会出现网络延迟、节点故障等各种不可预测的环境。但通过以上方法,可以在肯定程度上进步消息处理的次序性。
在 RabbitMQ 中,如何包管消息的幂等性?

一、数据库唯一束缚
二、使用唯一标识符和缓存
三、记录消息处理状态
总之,包管消息的幂等性须要根据具体的业务场景选择合适的方法。在实际应用中,可以结合多种方法来进步幂等性的包管程度。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4