小秦哥 发表于 2024-12-12 23:22:19

第五章 RabbitMQ高级

1. 生产者消息确认机制

https://i-blog.csdnimg.cn/direct/cc9b5435a93f457bac759ce700f6cb36.png
1.1 设置文件

spring:
rabbitmq:
    host: 192.168.137.110 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123456
    virtual-host: /
    publisher-confirm-type: correlated #异步
    publisher-returns: true
    template:
      mandatory: true
1.2 编写设置类

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @ClassName CommonConfig
* @Description rabbitMQ生产者消息确认配置类
* @Author 孙克旭
* @Date 2024/11/24 14:47
*/
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      //获取RabbitTemplate对象
      RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
      //配置ReturnCallback
      rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                  replyCode, replyText, exchange, routingKey, message.toString());
      }));
    }
}

1.3 发送消息时携带关联数据

package cn.itcast.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
      String routingKey = "simple.test";
      String message = "hello, spring amqp!";
      CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
      correlationData.getFuture().addCallback(result -> {
            //判断结果
            if (result.isAck()) {
                //ACK
                log.debug("消息成功投递到交换机,消息ID:{}", correlationData.getId());
            } else {
                //NACK
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
            }
      }, ex -> {
            //记录日志
            log.error("消息发送失败!", ex);
      });
      rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
    }
}

1.4 SpringAMQP中处置处罚消息确认的几种情况:

(1)publisher-comfirm:
消息成功发送到exchange,返回ack
消息发送失败,没有到达交换机,返回nack
消息发送过程中出现非常,没有收到回执
(2)消息成功发送到exchange,但没有路由到queue,调用ReturnCallback
2. 消息持久化

直接创建交换机或队列、发送消息时,数据都是持久的。
2.1 交换机和队列持久化

package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @ClassName CommonConfig
* @Description 消息持久化配置类
* @Author 孙克旭
* @Date 2024/11/24 15:34
*/
@Configuration
public class CommonConfig {
    /**
   * 交换机持久化
   *
   * @return
   */
    @Bean
    public DirectExchange simpleDirect() {
      return new DirectExchange("simple.direct", true, false);
    }

    /**
   * 队列持久化
   *
   * @return
   */
    @Bean
    public Queue simpleQueue() {
      return QueueBuilder.durable("simple.queue").build();
    }
}

2.2 数据持久化

发送消息是指定数据发布的模式:
   @Test
    public void testDurableMessage() {
      //1.准备消息
      Message message = MessageBuilder.withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
                .build();
      //2.发送消息
      rabbitTemplate.convertAndSend("simple.queue", message); //没有指定交换机名称,会使用默认交换机
    }
3. 消息重发机制

RabbitMQ的消息没有被消费者读取后,会自动返回队列,再次发送至消费者,并一直循环,直到该消息被消费。如许会极大的浪费mq的性能,所以需要手动设置消息重发机制。
spring:
rabbitmq:
    host: 192.168.137.110 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123456
    virtual-host: /
    listener:
      simple:
      prefetch: 1 #消费者每次只能读取一个消息
      acknowledge-mode: auto #消费者自动确认消息,表明消息已经被成功处理,然后RabbitMQ会从队列中移除这条消息
      retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000 #初始的失败等待时长为1秒
          multiplier: 3 #下次失败的等待时长倍数,下次等待时长=multiplier * last-interval
          max-attempts: 4 #最大重试次数
          stateless: true #ture 无状态;false 有状态。如果业务中包含事务,这里改为false
3.1 重发失败消息应对策略

https://i-blog.csdnimg.cn/direct/9555f4b05112476b9271a202a4dbd92a.png
消费者将错误消息发送至指定的交换机。
package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @ClassName ErrorMessageConfig
* @Description 重发失败消息应对策略配置类
* @Author 孙克旭
* @Date 2024/11/24 16:28
*/
@Configuration
public class ErrorMessageConfig {
    /**
   * 接受错误消息的交换机
   *
   * @return
   */
    @Bean
    public DirectExchange errorMessageExchange() {
      return new DirectExchange("error.direct");
    }

    /**
   * 接收错误消息的队列
   *
   * @return
   */
    @Bean
    public Queue errorQueue() {
      return new Queue("error.queue");
    }

    /**
   * 将队列绑定到交换机
   *
   * @return
   */
    @Bean
    public Binding errorMessageBinding() {
      return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    /**
   * 错误消息应对策略
   * 在消息处理失败时重新发布消息到指定的交换机和队列
   *
   * @param rabbitTemplate
   * @return
   */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
      return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}



3.2 如何确保RabbitMQ消息的可靠性?

1.开启生产者确认机制,确保生产者的消息能到达队列
2.开启持久化功能,确保消息未消费前在队列中不会丢失
3.开启消费者确认机制为auto,由spring确认消息处置处罚成功后完成ack
4.开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到非常交换机,交由人工处置处罚
4. 死信交换机

死信会被队列发送给死信交换机
4.1 什么样的消息会成为死信?

(1)消息被消费者reject或者返回nack
(2)消息超时未消费
(3)队列满了,早期的消息被抛弃
4.2 如何给队列绑定死信交换机?

(1)给队列设置dead-letter-exchange属性,指定一个交换机
(2)给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
https://i-blog.csdnimg.cn/direct/cbcc2fe998904c79912d63c5fdf25da3.png
4.2.1 代码实现

package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @ClassName TtlMessageConfig
* @Description 死信消息路由规则配置类
* @Author 孙克旭
* @Date 2024/11/25 10:02
*/
@Configuration
public class TtlMessageConfig {
    /**
   * 交换机
   *
   * @return
   */
    @Bean
    public DirectExchange ttlDirectExchange() {
      return new DirectExchange("ttl.direct");
    }

    /**
   * 定义队列,指定死信交换机和routing key
   * 并设置消息存活时间
   *
   * @return
   */
    @Bean
    public Queue ttlQueue() {
      return QueueBuilder.durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    /**
   * 定义绑定规则
   *
   * @return
   */
    @Bean
    public Binding ttlBinding() {
      return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }

}

监听器:
package cn.itcast.mq.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @ClassName SpringRabbitListener
* @Description 队列监听器
* @Author 孙克旭
* @Date 2024/11/25 9:53
*/
@Component
@Slf4j
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
      System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
    }

    /**
   * 监听死信队列消息
   *
   * @param msg
   */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
      log.info("消费者接收到了dl.queue的延迟消息:{}", msg);
    }


}

测试代码:
   @Test
    public void testTTLMessage() {
      //1.准备消息
      Message message = MessageBuilder.withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
                .setExpiration("5000") //定义消息存活时间
                .build();
      //2.发送消息
      rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
    }
4.3 消息超时的两种方式是?

(1)给队列设置ttl属性
(2)给消息设置ttl属性
(3)两者共存时,以时间短的ttl为准
5. 延迟消息

5.1 安装rabbitmq插件

https://i-blog.csdnimg.cn/direct/21cc1b72fea44cdd97700a1ebb78d2d6.png
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
5.2 代码实现

消费者:
    /**
   * 监听延迟消息
   * 创建队列
   * 创建延迟交换机,指定交换机类型为direct
   * 指定routing key
   *
   * @param msg
   */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", type = "direct", delayed = "true"),
            key = "delay"))
    public void listenDelayExchange(String msg) {
      log.info("消费者接收到了delay.queue的延迟消息:{}", msg);
    }
生产者测试代码:
    /**
   * 测试延迟消息
   */
    @Test
    public void testDelayMessage() {
      //1.准备消息
      Message message = MessageBuilder
                .withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
                .setHeader("x-delay", 5000) //设置延迟时间
                .build();
      //2.发送消息
      rabbitTemplate.convertAndSend("delay.direct", "delay", message);
    }
5.3 延迟插件的使用步调有哪些?

(1)声明一个交换机,添加delayed属性为true
(2)发送消息时,添加x-delay头,值为延时时间
6. 惰性队列

6.1 消息堆积问题

https://i-blog.csdnimg.cn/direct/68c9ca3085214d1db61dcbf16dcd3338.png
6.1.1 解决方案

(1)增加更多的消费者,进步消费速度
(2)在消费者内开启线程池加速消息处置处罚速度
(3)扩大队列容积,进步堆积上限
6.2 惰性队列特征

(1)接收到消息直接存入磁盘而非内存
(2)消费者要消费消息时才会从磁盘读取并加载到内存
(3)支持数百万条的消息存储
6.2.1 长处

(1)基于磁盘存储,消息上限高
(2)没有间歇性的page-out(页面置换),性能比较稳固
这句话的意思是,在传统的队列模式下,RabbitMQ 会将消息存储在内存中,当内存不足时,会将内存中的消息换页至磁盘中,
这个操作会耗费较长的时间,并且可能会阻塞队列的操作,导致无法接收新的消息。
这种由于内存不足而导致的消息换页操作就是所谓的间歇性page-out。

而惰性队列由于一开始就将消息存储在磁盘上,避免了在内存和磁盘之间频繁地进行页面置换,因此减少了因页面置换导致的性能波动和
延迟,使得性能更加稳定。这意味着,即使在高负载或者消费者处理能力不足导致消息积压的情况下,惰性队列也能保持较好的性能表现,
不会因为内存压力而导致处理能力下降。
6.2.2 缺点

(1)基于磁盘存储,消息时效性会降低
(2)性能受限于磁盘IO
6.3 代码实现

在消费者中定义惰性队列:
package cn.itcast.mq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* @ClassName LazyConfig
* @Description 惰性队列配置类
* @Author 孙克旭
* @Date 2024/11/25 13:23
*/
@Configuration
public class LazyConfig {

    /**
   * 声明一个惰性队列
   *
   * @return
   */
    @Bean
    public Queue LazyQueue() {
      return QueueBuilder.durable("lazy.queue")
                .lazy().build();
    }
}

7. RabbitMQ集群

在设置文件中添加集群地址:
https://i-blog.csdnimg.cn/direct/9567f68a93ff4355af8ce094bc1464bf.png
7.1 平凡集群

平凡分布式集群,将队列分散到集群的各个节点,从而进步整个集群的并发能力。每个节点有其他节点的元数据信息,但是没有其他节点的数据;当消费者读取一个节点时,若哀求的队列不在该节点,该节点会自动根据元数据查找指定的队列,并返回给消费者。
7.2 镜像集群

镜像集群:是一种主从集群,平凡集群的基础上,添加了主从备份功能,进步集群的数据可用性。
https://i-blog.csdnimg.cn/direct/ef93503a3a4c42008c53a29a85b53792.png
7.3 仲裁队列

https://i-blog.csdnimg.cn/direct/c4b3ca43736d44beb7485fcc45439429.png
Raft协议是一种分布式一致性协议,它用于在分布式体系中的多个节点之间告竣一致性。
https://i-blog.csdnimg.cn/direct/0c29345b7c764bd99c585e521829cf05.png
8. 踩坑记录

1.在学习死信交换机时,在设置类中手动定义了ttl.exchange和ttl.queue,但是没有定义死信交换机和队列,测试也能正常通过。为啥死信交换机和队列能被自动创建?
原来是定义监听器时,该交换机和队列可以被持久化
   /**
   * 监听死信队列消息
   *
   * @param msg
   */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
      log.info("消费者接收到了dl.queue的延迟消息:{}", msg);
    }
当你使用 @Queue 和 @Exchange 注解定义队列和交换机时,Spring AMQP 会自动声明这些队列和交换机。这意味着,假如这些队列和交换机在 RabbitMQ 中不存在,Spring AMQP 会在应用启动时自动创建它们。这大大简化了设置,由于你不需要手动创建这些资源。
2.docker创建mq容器时指定了目次,但是不能运行容器。
后来发现是缺少参数:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v $PWD/mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
--privileged=true
-d \
rabbitmq:3.8-management
--privileged=true 表示开放权限,即当运行这个实例以后,完成容器内部和宿主机中指定的绝对路径实现了信息的共享。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 第五章 RabbitMQ高级