【中间件】一文搞懂消息队列 - RabbitMQ

打印 上一主题 下一主题

主题 1607|帖子 1607|积分 4821

【RabbitMQ】

MQ

  MQ 就是消息中间件,消息中间件是指使用高效可靠的消息传递机制举行平台无关的数据交换,并基于数据通讯来举行分布式体系的集成。通过提供消息传递和消息列队模型,在分布式环境下扩展历程间的通讯。主要作用为异步通讯、扩展解耦、流量削峰(抢购)
RabbitMQ 简介

  RabbitMQ 由 Erlang 语言开辟的基于 AMQP 协议的消息署理和队列服务器。具有高性能、可靠性投递、集群模式丰富(安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量)
AMQP 协议

  基于网络线级协议且兼容 JMS(Java 本身的消息协议 API),跨语言、跨平台,消息使用字节数组(二进制)传递。是一个提供同一消息服务的应用层尺度高级消息队列协议。
RabbitMQ 核心组件



  • Message: 消息。消息是不具名的,由消息头和消息体组成。消息体是不透明的,而消息头由可选属性组成:包括 routing-key(路由键)、priority(消息优先权)、delivery-mode(长期性存储)等特性。消息体就是现实数据。
  • Exchange : 交换机。用来接收生产者发送的消息并将这些消息根据规则路由给服务器中的队列。 Exchange 有 4 种类型:direct、fanout、topic、headers。不同类型的交换机转发消息的策略有所区别。具体见下方详解。
  • Queue : 队列。用来保存消息直到被消费者消费。它是消息的容器。一个消息可被投入一个或多个队列。
  • Binding : 绑定。一个绑定就是基于 routing-key(路由键)将交换机和消息队列毗连起来的路由规则,Exchange 和 Queue 的绑定可以是多对多的关系。
  • Virtual Host : 假造主机。逻辑上的概念,表现一批交换机、消息队列和干系对象。假造主机是共享雷同的身份认证和加密环境的独立服务器域。同一个假造主机中的交换机和消息队列名称不能雷同。
  • Channel : 网络信道。在 TCP 中的小管道,真正传递消息的。减少建立 TCP 毗连节省效率。雷同于 JDBC 中的 Session(一个会话)
  • Connection : 网络毗连。应用步伐与 Broker 链接对象
  • Server : 又称 Broker。表现一个消息队列服务器实体,用于接收客户端的链接。就是安装了 RabbitMQ 的计算机。
RabbitMQ 消费流程

  生产者把消息发布到 Exchange 交换机上,消息中的 Routing Key 和交换机与队列的 Binding 规则决定交换机的消息应该发送到哪个队列(一个消息会分配到所有满意规则的队列),消息最终到达队列并被消费者接收。一个消息会被投递到所有符合条件的绑定的队列上,但是一个队列上的消息只会被一个消费者消费,多个消费者同时监听默认接纳轮询机制举行消费。
RabbitMQ 的信道通讯

  Java 步伐与 RabbitMQ 举行 TCP 的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次分手。 如果不消信道,那应用步伐就会频繁创建 TCP 链接 RabbitMQ,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操纵体系每秒处置惩罚 TCP 链接数也是有限定的,必定造成性能瓶颈。 信道的原理是一条线程一条通道,多条线程多条通道同用一条 TCP 链接。一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

【RabbitMQ 部署】

基于 Docker 部署 RabbitMQ

  基于 Docker 部署单节点 RabbitMQ 无需过多配置,下面下令启动即可。然后访问IP:15672,默认账号密码均为 guest
  1. docker run -d
  2.     # 用于客户端应用程序与 RabbitMQ 服务器之间的消息通信端口【AMQP 0-9-1 协议通信】。
  3.     -p 5672:5672 \
  4.     # 用于访问 RabbitMQ 的管理界面【HTTP端口】。
  5.     -p 15672:15672 \
  6.     --name imomei_rabbitmq \
  7.     rabbitmq:management
复制代码

【SpringBoot 整合 RabbitMQ】

SpringBoot 整合 RabbitMQ

  SpringBoot 整合 RabbitMQ 是基于 SpringBoot 提供的 AMQP 协议 Starter 举行整合的,提供了 RabbitTemplate 方便举行调用,发送端直接使用 RabbitTemplate 举行消息的发送,接收端使用@RabbitListener 注解对队列举行监听。启动类上要加@EnableRabbit 注解
  1. <!-- ============================= 依赖 ================================ -->
  2. <!-- RabbitMQ -->
  3. <dependency>
  4.     <groupId>org.springframework.boot</groupId>
  5.     <artifactId>spring-boot-starter-amqp</artifactId>
  6. </dependency>
复制代码
  1. # ============================= 配置文件 ================================
  2. spring:
  3.   rabbitmq:
  4.     host: 0.0.0.0
  5.     port: 5672
  6.     username: guest
  7.     password: guest
  8.     # 虚拟主机
  9.     virtual-host: /
  10.     # RabbitMQ 连接超时时间
  11.     connection-timeout: 10000
复制代码
  1. // ============================= 配置类 ================================
  2. @Configuration
  3. public class RabbitMQConfig {
  4.     @Bean
  5.     public MessageConverter messageConverter() {
  6.         // 自定义Json方式传递数据的Converter(影响消息为对象的情况)
  7.         return new Jackson2JsonMessageConverter();
  8.     }
  9. }
复制代码
  1. // ============================= 常量类 ================================
  2. /**
  3. * RabbitMQ常量类
  4. */
  5. public class RabbitMQConst {
  6.     // 交换机名称
  7.     public static final String IMOMEI_EXCHANGE = "imomei_exchange";
  8.     // 队列名称
  9.     public static final String IMOMEI_QUEUE = "imomei_queue";
  10.     // 路由键名称
  11.     public static final String IMOMEI_ROUTING_KEY = "imomei.test.routing";
  12.     // 绑定规则
  13.     public static final String IMOMEI_BINDING_RULE = "imomei.test.#";
  14. }
复制代码
  1. // ============================= 测试代码 ================================
  2. @Slf4j
  3. @RestController
  4. @RequestMapping("/test/rabbitMQ")
  5. public class RabbitMQController {
  6.     // 注入RabbitTemplate
  7.     @Autowired
  8.     private RabbitTemplate rabbitTemplate;
  9.     @GetMapping("/send")
  10.     // 发送方法
  11.     public void send(String name) {
  12.         User user = new User("1", name, "123", "123@qq.com");
  13.         // 使用RabbitTemplate模板方法发送单播消息。参数:交换机名称,路由键,数据。Fanout填不填路由键无所谓
  14.         rabbitTemplate.convertAndSend(RabbitMQConst.IMOMEI_EXCHANGE, RabbitMQConst.IMOMEI_ROUTING_KEY, user);
  15.     }
  16.     /**
  17.      * 监听方法
  18.      *
  19.      * @param message 消息内容
  20.      * @param headers 消息头
  21.      */
  22.     // 监听队列名称
  23.     @RabbitListener(queues = RabbitMQConst.IMOMEI_QUEUE)
  24.     public void process1(Message message, @Headers Map<String, Object> headers) {
  25.         log.warn("消费【" + RabbitMQConst.IMOMEI_QUEUE + " 】队列消息:" + new String(message.getBody()));
  26.     }
  27. }
复制代码

【Exchange 交换机】

Exchange 交换机

  Exchange 交换机用于分发消息,根据类型的不同分发策略有区别,目前共四种类型

  • Direct : 直连交换机。消息中的 routing key 如果和 baiding key 同等,交换机就将消息发到对应的队列中。路由键与 bangding 需完全匹配。

  • Fanout : 广播交换机。消息会分到所有和当前交换机绑定的队列上去,无需匹配路由键。广播模式,转发消息速度是最快的。

  • Topic : 主题交换机。消息中的 routing key 如果和 baiding key 同等,交换机就将消息发到对应的队列中。baiding key 支持含糊匹配, # 匹配 0 个或多个单词、 * 匹配一个单词。

  • Headers : 通过消息头路由,几乎不使用。
Exchange 交换机常用属性


  • name : 交换机的名称。
  • type : 交换机的类型。分为 direct 直连、fanout 广播、topic 主题、hearders 消息头
  • durability : 是否长期化。Durable 长期化【默认】,Transient 不长期化。
  • auto delete : 当最后一个绑定(队列大概 exchange)被 unbind 之后,该 exchange 主动被删除。【默认 NO】
  • internal : 是否是内部专用 exchange,是的话,就意味着我们不能往该 exchange 里面发消息。【默认 NO】
  • arguments : 用于填写其他参数。

【Queue 队列】

Queue 队列常用属性


  • name: 队列的名称
  • durable: 是否长期化。Durable 长期化【默认】,Transient 不长期化。
  • auto delete: 无消费者,或最后一消费者退订则队列删除【默认 NO】
  • arguments: 用于填写其他参数。
  • 消息过期时间 TTL: 可以通过队列的 x-message-ttl 属性,设置队列中所有消息的过期时间,过期的消息会主动删除(如果有死信队列,会抛到死信队列)。也可以单独设置某一个消息的过期时间。
  1. // ------------------------- 单独设置一条消息过期时间 -------------------------------
  2. public void sendMeg() {
  3.     MessageProperties properties = new MessageProperties();
  4.     // 设置该条消息是具有过期时间的(10S)
  5.     properties.setExpiration("10000");
  6.     Message message = new Message("我是测试数据".getBytes(), properties);
  7.     rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME, "", message);
  8. }
复制代码
优先级队列

  x-max-priority 属性用来设置队列的最大优先级,当投递消息的时间可以附带优先级属性,优先级越大的,越先投递。【并发不高的时间基本没什么用,不具体学习了】
其他队列属性

  包括了设置队列长度、过期时间、消息长度、死信、过期时间等属性。
  1. @Bean
  2. public Queue QUEUE_INFORM_EMAIL(){
  3.     Map<String,Object> args = new HashMap<>();                             // 队列拓展参数数组
  4.     args.put("x-message-ttl", 3000);                                       // 队列中消息最大过期时间:3000毫秒(3秒)
  5.     args.put("x-max-length", 4);                                           // 队列中消息最大个数:4个
  6.     args.put("x-max-length-bytes", 1024);                                  // 队列中单个消息最大长度:1024字节
  7.     args.put("x-max-priority", 10);                                        // 队列的最大优先级设置:10
  8.     args.put("x-dead-letter-exchange", "DLX-EXCHENG");                     // 队列中出现死信消息会进入死信交换机。DLX-EXCHENG
  9.     args.put("x-dead-letter-routing-key", "DLX");                          // 死信消息进入死信交换机的路由键。DLX
  10.    return new Queue("queue_inform_email", true, false, false, args);      // 创建名为 queue_inform_email 的队列
  11. }
复制代码

【RabbitMQ 六种工作模式】

六种工作模式

  RabbitMQ 的六种工作模式现实上就是以现有的交换机、队列、绑定规则等组件延伸出来的业务场景,被叫做工作模式。


  • Simple 简朴模式: 不配置交换机(现实使用了默认的交换机),生产者直接发送给队列,消费者监听队列,队列与消费者是 1 对 1 的关系。
  • work 工作模式: 和简朴模式差不多,同样是不配置交换机,不同的是工作模式多个消费者监听一个队列。

    • 公中分发:在工作模式中,默认情况下多个消费者会依次接收并消费队列中的消息。
    • 不公中分发:在工作模式中,可以在消费者端获取消息时将 channel 的参数 basicQos 设为 1(默认 0),那么就会在消息分发时优先选择空闲的消费者分发。如果不存在空闲队列,那么还是按公中分发。
    • 预取值:可以看作是规定的消费者等待消费队列内部盼望的队列长度。好比消费 C1 是 2,C2 是 3,那么开始的消息会先分配给 C1,直到 C1 中等待消息的消息队列长度为 2 时,下一个消息才会分配给 C2,然后 C2 也积累了 3 个消息后,继承 C1、C2 轮番分配。预期值默以为 0,所以默认情况就是消费者轮番被分配消息。配置方式也是设置消费者端的 channel 对象的 basicQos 参数。

  • publish/subscribe 发布订阅模式: 交换机是 fanout 类型。交换时机将接收的消息发送给所有与其绑定的队列。
  • routing 路由模式: 交换机是 direct 类型。交换时机根据接收消息的 RoutingKey 寻找匹配的 BindingKey,然后发送给对应的队列。BindingKey 是和 RoutingKey 完全匹配的,一对一关系。
  • topic 主题模式: 交换机是 topic 类型。交换时机根据接收消息的 RoutingKey 寻找匹配的 BindingKey,与 routing 模式不同的是,topic 模式消息携带的 BindingKey 可以是一个通配符。交换时机匹配与通配符匹配的 BindingKey 对应的队列。* 表现恣意一个单次,# 表现 0 个或多个单次。如果 RoutingKey 不包括通配符,那么就相当于路由模式,如果 RoutingKey 是 #,那么就相当于发布订阅模式。
  • RPC 模式: 也就是远程调用,RabbitMQ 的 RPC 模式可以实现 RPC 的异步调用。客户端既是发送者也是消费者,在请求发送给队列 rpc_queue 后,服务器会监听这个队列,获取后处置惩罚,处置惩罚完成将返回数据消息发给队列 reply_to,而客户端也会监听这个队列,最终实现得到结果数据。

【死信队列】

死信队列

  消息没有消费者消费,该消息就称为死信消息,该消息会被发送到 DLX 死信队列(RabbitMQ 需要本身建立死信队列,如果没有配置死信队列,消息则会被丢弃)。
死信消息形成场景



  • 消息被拒绝而且设置了不重发。
  • 消息 TTL 过期
  • 队列到达最大长度(开始进入队列的消息会优先进入死信队列)
死信消息特征(消息头修改)

  当一个消息进入死信队列后,消息的 Routing Key 会被修改,消息头中会新增如下几个参数:


  • x-first-death-exchange : 第一次被抛入的交换机的名称
  • x-first-death-reason : 第一次成为死信的原因

    • rejected:消息在重新进入队列时被队列拒绝,由于 default-requeue-rejected 重试参数被设置为 false
    • expired:消息过期:
    • maxlen:队列内消息数目超过队列最大容量

  • x-first-death-queue : 第一次成为死信前地点队列名称
  • x-death : 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新。
死信队列代码演示

  1. // ============================== 配置类 ==============================
  2. @Configuration
  3. public class RabbitMQConfig {
  4.     // 死信 - 交换机
  5.     public static final String DLX_EXCHANGE_NAME = "imomei.exchange.dlx";
  6.     // 死信 - 队列
  7.     public static final String DLX_QUEUE_NAME = "imomei.queue.dlx";
  8.     // 死信 - ROUTINGKEY
  9.     public static final String DLX_ROUTING_KEY = "imomei.routingkey.dlx";
  10.     // 声明死信交换机
  11.     @Bean
  12.     public DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE_NAME);}
  13.     // 声明死信队列
  14.     @Bean
  15.     public Queue queueDlx() {return QueueBuilder.durable(DLX_QUEUE_NAME).build();}
  16.     // 声明绑定关系:死信交换机 - 死信队列
  17.     @Bean
  18.     public Binding dlxExchangeQueue(@Qualifier("queueDlx") Queue queue, @Qualifier("dlxExchange") DirectExchange exchange) {
  19.         return BindingBuilder.bind(queue).to(exchange).with(DLX_ROUTING_KEY);
  20.     }
  21.     // 声明业务队列【这里在原有声明普通队列上新增了两个参数,这样就可以指定当该队列中的消息被清除之后,送到死信交换机/死信队列中去】
  22.     @Bean
  23.     public Queue queue() {
  24.         Map<String, Object> args = new HashMap<>(2);
  25.         args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);       // x-dead-letter-exchange      声明当前队列绑定的死信交换机
  26.         args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);      // x-dead-letter-routing-key   声明当前队列的死信路由key
  27.         return QueueBuilder.durable(B_QUEUE_NAME).withArguments(args).build();        // 这里声明队列携带额外参数使用withArguments方法
  28.     }
  29. }
复制代码
  1. // ============================== 死信监听类 ==============================
  2. // 监听方法:监听死信队列
  3. @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE_NAME)
  4. public void process3(@Payload Message message, @Headers Map<String, Object> headers) {
  5.     // 监听死信队列就会发现消息头中会多了几个参数
  6.     log.warn(headers + "");
  7.     log.warn("消费《 " + RabbitMQConfig.DLX_QUEUE_NAME + " 》队列消息:" + new String(message.getBody()));
  8. }
复制代码
  1. // ============================== 消息过期被投入死信队列的消息头额外信息 ==============================
  2. {
  3.         x - first - death - exchange = imomei.exchange.fanout,
  4.         x - death = [{
  5.           reason = expired,
  6.           original - expiration = 10000,
  7.           count = 1,
  8.           exchange = imomei.exchange.fanout,
  9.           time = Wed Nov 03 16: 43: 07 CST 2021,
  10.           routing - keys = [],
  11.           queue = imomei.queue.b
  12.         }],
  13.         x - first - death - reason = expired,
  14.         x - first - death - queue = imomei.queue.b
  15. }
复制代码

【耽误投递】

耽误投递

  RabbitMQ 本身不支持消息的耽误投递。耽误队列主要是希望在规定时间后举行触发,然后触发相应操纵。相比力于定时使命举行轮询,延时队列更优雅。虽然 RabbitMQ 耽误投递,但是可以通过使用插件大概使用死信队列举行耽误投递。常用的应用场景如下:

  • 订单在非常钟之内未付出则主动取消。
  • 新创建的店铺,如果在十天内都没有上传过商品,则主动发送消息提示。
  • 账单在一周内未付出,则主动结算。
  • 用户注册成功后,如果三天内没有登陆则举行短信提示。
  • 用户发起退款,如果三天内没有得随处置惩罚则关照干系运营人员。
  • 预定会议后,需要在预定的时间点前非常钟关照各个与会人员参加会议。
使用死信队列耽误投递实现思路

  设置一个没有消费者监听的队列而且指定死信队列,将需要耽误的消息设定好过期时间就发送给目标交换机并发送到该队列。由于没有被消费,最终会消息超时而转发到配置的死信队列,死信队列被消费者监听,然后举行消费,就到达了耽误消费的结果。耽误的时间就是消息的过期时间。

【消费积存与限流】

消费端限流

  RabbitMQ 服务器有上万条未处置惩罚的消息,我们打开一个消费端,巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处置惩罚这么多数据。导致服务器崩溃,线上故障。RabbitMQ 提供了 Qos 服务质量保证的功能,即在非主动确认消息情况下,如果肯定数目的消息未被确认,则不消费新消息。
  1. // ====================================== 消费端限流设置 ======================================
  2. // 参数一:prefetchSize:每条消息大小的设置,0是无限制
  3. // 参数二:prefetchCount:标识每次推送多少条消息 一般是一条
  4. // 参数三:global:false表示channel级别的限制、true表示消费者级别的的限制
  5. channel.basicQos(0,1,false);
  6. // 自动确认为false,
  7. channel.basicConsume("队列名称",false,new MyConsumer(channel));
复制代码

【可靠性投递】

消息的可靠性投递

  可靠性投递表现生产端的消息 100%投递成功且被成功消费 ,逻辑上我们总共可以分成四个点。根据前两个点的回调方法,我们能保证消息的最终同等性和部门纠错能力。


  • 一、生产端成功发出消息到交换机。 【解决方案:事务机制、 Confirm 机制】
  • 二、交换机正确路由到盼望队列。 【解决方案:Return 机制】
  • 三、消息在队列中正确储存。【解决方案:集群】
  • 四、消费端正确消费消息。【解决方案:消息确认 ACK】
(一阶段)事务机制

  RabbitMQ 是支持 AMQP 事务机制的,在生产者确认机制之前,事务是确保消息被成功投递到交换机的唯一方法。但是开启事务对性能的影响较大,每次事务的提交都是壅闭式的等待服务器处置惩罚返回结果。除此之外,事务消息需要比普通消息多 4 次与服务器的交互,这就意味着会占用更多的处置惩罚时间,所以如果对消息处置惩罚速度有较高要求时,尽量不要接纳事务机制。
  1. // ================================== 配置类 ==================================
  2. // 配置启用RabbitMQ事务
  3. @Bean
  4. public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
  5.     return new RabbitTransactionManager(connectionFactory);
  6. }
  7. // ================================== 生产者 ==================================
  8. // 开启RabbitMQ事务(如果该方法发生异常,则消息发送不会发送和被消费,否则异常之前的消息均会发送)
  9. @Transactional
  10. public void sendMsg(Object msg) {
  11.     // 设置rabbitTemplate当前会话开启事务
  12.     rabbitTemplate.setChannelTransacted(true);
  13.     rabbitTemplate.convertAndSend(RabbitMQConst.IMOMEI_EXCHANGE_FANOUT, "routingKey", msg);
  14.     // 模拟异常
  15.     int i = 1 / 0;
  16. }
复制代码
(一阶段)Confirm 机制

  生产者确认机制是用于保证生产者是否能正确的将消息发送交换机上的机制。生产者确认机制跟事务是不能一起工作的。由于事务和发布者确认模式都是需要先跟服务器协商,对信道启用的一种模式,不能对同一个信道同时使用两种模式。在生产者确认模式中,消息的确认可以是异步和批量的,所以相比使用事务,性能会更好。而目前只是生产者确认机制的实现,指的是消息成功被交换机接收,但如果找不到能接收该消息的队列,这条消息也会丢失。至于如那边理那些无法被投递到队列的消息,是后面其他机制保障的事情。设置 publisher-confirms 参数可以开启消息发送生产者的确认机制,而且需要实现 ConfirmCallback 接口,用于接收获功失败后回调的提示信息。
(二阶段)Return 机制

  Return 机制保证了消息能够正常的到达队列中,如果没到达,也会有回调处置惩罚方式。设置 publisher-returns 参数标识开启交换机到队列的回调机制,设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。当把 mandotory 参数设置为 true 时,如果交换机无法将消息举行路由时,会将该消息返回给生产者(正常路由的消息不会回调)。而如果该参数设置为 false,如果发现消息无法举行路由,则直接丢弃。
  1. # ================================== 配置文件 ==================================
  2. spring:
  3.   rabbitmq:
  4.     # 开启发送端消息抵达broker的确认(也可以理解为交换机)
  5.     publisher-confirm-type: correlated
  6.     # 开启发送端消息抵达队列的确认
  7.     publisher-returns: true
  8.     # 开启未路由消息返回功能(必须开启,否则ReturnCallback的Message参数接收不到数据)
  9.     template:
  10.       mandatory: true
复制代码
  1. // ================================== 配置类 ==================================
  2. @Slf4j
  3. @Component
  4. public class IMoMeiCallbackConfig {
  5.     @Autowired
  6.     private RabbitTemplate rabbitTemplate;
  7.     /**
  8.      * RabbitMQ ConfirmCallback 与 ReturnCallback 的回调方法配置
  9.      */
  10.     @Autowired
  11.     public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
  12.         // ConfirmCallback回调配置
  13.         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  14.             if (ack) {
  15.                 log.warn("消息成功发送到RabbitMQ: ", correlationData.getId());
  16.             } else {
  17.                 log.warn("消息发送失败: " + correlationData + ", 原因: " + cause);
  18.             }
  19.         });
  20.         // ReturnCallback回调配置
  21.         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  22.             // 发送消息的时候需要设置消息头,否则这里获取不到消息ID
  23.             log.warn("消息被RabbitMQ拒绝: " + message.getMessageProperties().getMessageId());
  24.             log.warn("回复代码: " + replyCode);
  25.             log.warn("回复文本: " + replyText);
  26.             log.warn("交换机: " + exchange);
  27.             log.warn("路由键: " + routingKey);
  28.         });
  29.     }
  30. }
  31. // ================================== 生产者 ==================================
  32.     public void send(String name) {
  33.         User user = new User("1", name, "123", "123@qq.com");
  34.         // 创建消息ID
  35.         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  36.         // 创建消息头(消息头中也要携带消息ID,方便Returnback能够获取到消息ID)
  37.         MessagePostProcessor messagePostProcessor = message -> {
  38.             message.getMessageProperties().setMessageId(correlationData.getId());
  39.             return message;
  40.         };
  41.         // 发送消息,后面两个参数就是消息头和消息ID
  42.         rabbitTemplate.convertAndSend(RabbitMQConst.IMOMEI_EXCHANGE, RabbitMQConst.IMOMEI_ROUTING_KEY, user, messagePostProcessor, correlationData);
  43.     }
复制代码
(二阶段)备份交换机

  上面通过设置 mandatory 参数可以让我们从 Returnback 中了解到哪个消息没有被路由,但是需要优雅的保存这些未被路由的消息,而不是日志输出。所以可以使用备份交换机。当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来举行转发和处置惩罚,通常备份交换机的类型为 Fanout 广播,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以多建立一个报警队列,用独立的消费者来举行监测和报警。(如果设置了备份交换机,就不会收到 mandatory 失败的回调函数了,由于被投递到了备份队列中)。
  1. @Configuration
  2. public class RabbitMQConfig {
  3.     // 备份 - 交换机
  4.     public static final String BAK_EXCHANGE_NAME = "imomei.exchange.bak";
  5.     // 备份 - 队列
  6.     public static final String BAK_QUEUE_NAME = "imomei.queue.bak";
  7.     // 声明DRIECT交换机(并且通过参数设置备份交换机)
  8.     @Bean
  9.     public DirectExchange directExchange() {
  10.         return (DirectExchange) ExchangeBuilder.directExchange(DRIECT_EXCHANGE_NAME)
  11.                 .durable(true)
  12.                 // 这里指定备份交换机的名称,用于绑定备份交换机。BAK_EXCHANGE_NAME备份交换机的名称
  13.                 .withArgument("alternate-exchange", BAK_EXCHANGE_NAME)
  14.                 .build();
  15.     }
  16.     // =========================================== 备份交换机 ==========================================
  17.     // 声明备份交换机
  18.     @Bean
  19.     public FanoutExchange bakExchange() {
  20.         return new FanoutExchange(BAK_EXCHANGE_NAME);
  21.     }
  22.     // 声明备份队列
  23.     @Bean
  24.     public Queue queueBak() {
  25.         return QueueBuilder.durable(BAK_QUEUE_NAME).build();
  26.     }
  27.     // 声明绑定关系:备份交换机 - 备份队列
  28.     @Bean
  29.     public Binding bakExchangeQueue(@Qualifier("queueBak") Queue queue, @Qualifier("bakExchange") FanoutExchange exchange) {
  30.         return BindingBuilder.bind(queue).to(exchange);
  31.     }
  32. }
复制代码
(四阶段)消息确认 ACK 机制

  默认情况 MQ 是收到消息立即主动 ACK,然后慢慢处置惩罚,处置惩罚过程中消费者宕机大概异常就会造成消息的丢失。而 ACK 手动确认机制用于保证消费者消费消息之后的确认方式。手动 ACK 时,消费者接收消息后,消息状态为 Unacked,如果消费的时间没有手动 ACK,则 MQ 中的消息总量不会减少。ACK 核心方法有三个:


  • basicAck():确认消息收到。
  • basicNack():把本次及之前的所有未确认消息全部拒绝。
  • basicReject():拒绝消息。
  1. # ================================ 配置文件 ===================================
  2. spring:
  3.   rabbitmq:
  4.     listener:
  5.       simple:
  6.         # 消费端手动确认模式
  7.         acknowledge-mode: manual
复制代码
  1. // ================================ 消费者 ======================================
  2. @RabbitListener(queues = RabbitMQConst.IMOMEI_QUEUE)
  3. public void process1(Message message, Channel channel) throws InterruptedException, IOException {
  4.     // 确认消息
  5.     channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  6.     // 拒绝消息
  7.     channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  8.     // deliveryTag:代表消息重回队列的次数,
  9.     // multiple:第一个false代表是否批量,如果是true的话将一次性拒绝所有小于deliveryTag的消息
  10.     // requeue:第二个false代表该消息没有被成功消费,并且不会将该消息重新入队列。配置成true会重新进入队列并且deliveryTag++
  11. }
复制代码

【RabbitMQ 集群】

RabbitMQ 集群

  RabbitMQ 集群有普通集群、镜像集群两种模式。
普通集群

  普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每台服务器启动一个 RabbitMQ 实例,多个实例之间举行消息通讯。此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中举行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。当我们消费消息的时间,如果毗连到了另外一个实例,那么那个实例会通过元数据定位到 Queue 地点的位置,然后访问 Queue 地点的实例,拉取数据过来发送给消费者。这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,由于一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了长期化,那么等 RabbitMQ 实例规复后,就可以继承访问了;如果消息没做长期化,那么消息就丢了。
镜像集群

  它和普通集群最大的区别在于 Queue 数据和元数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时间都会主动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器另有一份副本数据可以继承提供服务,也就实现了高可用。
节点类型

  磁盘节点就是将元数据存储在磁盘中,单节点体系只允许磁盘类型的节点,防止重启 RabbitMQ 的时间,丢失体系的配置信息。。节点类型分为磁盘节点和内存节点。内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操纵速度更快。
  RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点参加大概离开集群时,必须要将该变更关照到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法举行其他操纵(增删改查),知道节点规复。为了确保集群信息的可靠性,大概在不确定使用磁盘节点还是内存节点的时间,建议直接使用磁盘节点。具体搭建步调参考
   RabbitMQ 集群搭建:https://blog.csdn.net/dingd1234/article/details/125247546

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

羊蹓狼

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表