1. 生产者消息确认机制
1.1 设置文件
- spring:
- rabbitmq:
- host: 192.168.137.110 # rabbitMQ的ip地址
- port: 5672 # 端口
- username: itcast
- password: 123456
- virtual-host: /
- publisher-confirm-type: correlated #异步
- publisher-returns: true
- template:
- mandatory: true
复制代码 1.2 编写设置类
- package cn.itcast.mq.config;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @ClassName CommonConfig
- * @Description rabbitMQ生产者消息确认配置类
- * @Author 孙克旭
- * @Date 2024/11/24 14:47
- */
- @Configuration
- @Slf4j
- public class CommonConfig implements ApplicationContextAware {
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- //获取RabbitTemplate对象
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- //配置ReturnCallback
- rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
- //记录日志
- log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
- replyCode, replyText, exchange, routingKey, message.toString());
- }));
- }
- }
复制代码 1.3 发送消息时携带关联数据
- package cn.itcast.mq.spring;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
- import java.util.UUID;
- @Slf4j
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SpringAmqpTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Test
- public void testSendMessage2SimpleQueue() throws InterruptedException {
- String routingKey = "simple.test";
- String message = "hello, spring amqp!";
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- correlationData.getFuture().addCallback(result -> {
- //判断结果
- if (result.isAck()) {
- //ACK
- log.debug("消息成功投递到交换机,消息ID:{}", correlationData.getId());
- } else {
- //NACK
- log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
- }
- }, ex -> {
- //记录日志
- log.error("消息发送失败!", ex);
- });
- rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
- }
- }
复制代码 1.4 SpringAMQP中处置处罚消息确认的几种情况:
(1)publisher-comfirm:
消息成功发送到exchange,返回ack
消息发送失败,没有到达交换机,返回nack
消息发送过程中出现非常,没有收到回执
(2)消息成功发送到exchange,但没有路由到queue,调用ReturnCallback
2. 消息持久化
直接创建交换机或队列、发送消息时,数据都是持久的。
2.1 交换机和队列持久化
- package cn.itcast.mq.config;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @ClassName CommonConfig
- * @Description 消息持久化配置类
- * @Author 孙克旭
- * @Date 2024/11/24 15:34
- */
- @Configuration
- public class CommonConfig {
- /**
- * 交换机持久化
- *
- * @return
- */
- @Bean
- public DirectExchange simpleDirect() {
- return new DirectExchange("simple.direct", true, false);
- }
- /**
- * 队列持久化
- *
- * @return
- */
- @Bean
- public Queue simpleQueue() {
- return QueueBuilder.durable("simple.queue").build();
- }
- }
复制代码 2.2 数据持久化
发送消息是指定数据发布的模式:
- @Test
- public void testDurableMessage() {
- //1.准备消息
- Message message = MessageBuilder.withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
- .build();
- //2.发送消息
- rabbitTemplate.convertAndSend("simple.queue", message); //没有指定交换机名称,会使用默认交换机
- }
复制代码 3. 消息重发机制
RabbitMQ的消息没有被消费者读取后,会自动返回队列,再次发送至消费者,并一直循环,直到该消息被消费。如许会极大的浪费mq的性能,所以需要手动设置消息重发机制。
- spring:
- rabbitmq:
- host: 192.168.137.110 # rabbitMQ的ip地址
- port: 5672 # 端口
- username: itcast
- password: 123456
- virtual-host: /
- listener:
- simple:
- prefetch: 1 #消费者每次只能读取一个消息
- acknowledge-mode: auto #消费者自动确认消息,表明消息已经被成功处理,然后RabbitMQ会从队列中移除这条消息
- retry:
- enabled: true #开启消费者失败重试
- initial-interval: 1000 #初始的失败等待时长为1秒
- multiplier: 3 #下次失败的等待时长倍数,下次等待时长=multiplier * last-interval
- max-attempts: 4 #最大重试次数
- stateless: true #ture 无状态;false 有状态。如果业务中包含事务,这里改为false
复制代码 3.1 重发失败消息应对策略
消费者将错误消息发送至指定的交换机。
- package cn.itcast.mq.config;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @ClassName ErrorMessageConfig
- * @Description 重发失败消息应对策略配置类
- * @Author 孙克旭
- * @Date 2024/11/24 16:28
- */
- @Configuration
- public class ErrorMessageConfig {
- /**
- * 接受错误消息的交换机
- *
- * @return
- */
- @Bean
- public DirectExchange errorMessageExchange() {
- return new DirectExchange("error.direct");
- }
- /**
- * 接收错误消息的队列
- *
- * @return
- */
- @Bean
- public Queue errorQueue() {
- return new Queue("error.queue");
- }
- /**
- * 将队列绑定到交换机
- *
- * @return
- */
- @Bean
- public Binding errorMessageBinding() {
- return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
- }
- /**
- * 错误消息应对策略
- * 在消息处理失败时重新发布消息到指定的交换机和队列
- *
- * @param rabbitTemplate
- * @return
- */
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
复制代码 3.2 如何确保RabbitMQ消息的可靠性?
1.开启生产者确认机制,确保生产者的消息能到达队列
2.开启持久化功能,确保消息未消费前在队列中不会丢失
3.开启消费者确认机制为auto,由spring确认消息处置处罚成功后完成ack
4.开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到非常交换机,交由人工处置处罚
4. 死信交换机
死信会被队列发送给死信交换机
4.1 什么样的消息会成为死信?
(1)消息被消费者reject或者返回nack
(2)消息超时未消费
(3)队列满了,早期的消息被抛弃
4.2 如何给队列绑定死信交换机?
(1)给队列设置dead-letter-exchange属性,指定一个交换机
(2)给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
4.2.1 代码实现
- package cn.itcast.mq.config;
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @ClassName TtlMessageConfig
- * @Description 死信消息路由规则配置类
- * @Author 孙克旭
- * @Date 2024/11/25 10:02
- */
- @Configuration
- public class TtlMessageConfig {
- /**
- * 交换机
- *
- * @return
- */
- @Bean
- public DirectExchange ttlDirectExchange() {
- return new DirectExchange("ttl.direct");
- }
- /**
- * 定义队列,指定死信交换机和routing key
- * 并设置消息存活时间
- *
- * @return
- */
- @Bean
- public Queue ttlQueue() {
- return QueueBuilder.durable("ttl.queue")
- .ttl(10000)
- .deadLetterExchange("dl.direct")
- .deadLetterRoutingKey("dl")
- .build();
- }
- /**
- * 定义绑定规则
- *
- * @return
- */
- @Bean
- public Binding ttlBinding() {
- return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
- }
- }
复制代码 监听器:
- package cn.itcast.mq.listener;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- /**
- * @ClassName SpringRabbitListener
- * @Description 队列监听器
- * @Author 孙克旭
- * @Date 2024/11/25 9:53
- */
- @Component
- @Slf4j
- public class SpringRabbitListener {
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue(String msg) {
- System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
- }
- /**
- * 监听死信队列消息
- *
- * @param msg
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "dl.queue", durable = "true"),
- exchange = @Exchange(name = "dl.direct"),
- key = "dl"
- ))
- public void listenDlQueue(String msg) {
- log.info("消费者接收到了dl.queue的延迟消息:{}", msg);
- }
- }
复制代码 测试代码:
- @Test
- public void testTTLMessage() {
- //1.准备消息
- Message message = MessageBuilder.withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
- .setExpiration("5000") //定义消息存活时间
- .build();
- //2.发送消息
- rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
- }
复制代码 4.3 消息超时的两种方式是?
(1)给队列设置ttl属性
(2)给消息设置ttl属性
(3)两者共存时,以时间短的ttl为准
5. 延迟消息
5.1 安装rabbitmq插件
进入容器内部后,执行下面命令开启插件:
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码 5.2 代码实现
消费者:
- /**
- * 监听延迟消息
- * 创建队列
- * 创建延迟交换机,指定交换机类型为direct
- * 指定routing key
- *
- * @param msg
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue", durable = "true"),
- exchange = @Exchange(name = "delay.direct", type = "direct", delayed = "true"),
- key = "delay"))
- public void listenDelayExchange(String msg) {
- log.info("消费者接收到了delay.queue的延迟消息:{}", msg);
- }
复制代码 生产者测试代码:
- /**
- * 测试延迟消息
- */
- @Test
- public void testDelayMessage() {
- //1.准备消息
- Message message = MessageBuilder
- .withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
- .setHeader("x-delay", 5000) //设置延迟时间
- .build();
- //2.发送消息
- rabbitTemplate.convertAndSend("delay.direct", "delay", message);
- }
复制代码 5.3 延迟插件的使用步调有哪些?
(1)声明一个交换机,添加delayed属性为true
(2)发送消息时,添加x-delay头,值为延时时间
6. 惰性队列
6.1 消息堆积问题
6.1.1 解决方案
(1)增加更多的消费者,进步消费速度
(2)在消费者内开启线程池加速消息处置处罚速度
(3)扩大队列容积,进步堆积上限
6.2 惰性队列特征
(1)接收到消息直接存入磁盘而非内存
(2)消费者要消费消息时才会从磁盘读取并加载到内存
(3)支持数百万条的消息存储
6.2.1 长处
(1)基于磁盘存储,消息上限高
(2)没有间歇性的page-out(页面置换),性能比较稳固
- 这句话的意思是,在传统的队列模式下,RabbitMQ 会将消息存储在内存中,当内存不足时,会将内存中的消息换页至磁盘中,
- 这个操作会耗费较长的时间,并且可能会阻塞队列的操作,导致无法接收新的消息。
- 这种由于内存不足而导致的消息换页操作就是所谓的间歇性page-out。
- 而惰性队列由于一开始就将消息存储在磁盘上,避免了在内存和磁盘之间频繁地进行页面置换,因此减少了因页面置换导致的性能波动和
- 延迟,使得性能更加稳定。这意味着,即使在高负载或者消费者处理能力不足导致消息积压的情况下,惰性队列也能保持较好的性能表现,
- 不会因为内存压力而导致处理能力下降。
复制代码 6.2.2 缺点
(1)基于磁盘存储,消息时效性会降低
(2)性能受限于磁盘IO
6.3 代码实现
在消费者中定义惰性队列:
- package cn.itcast.mq.config;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.QueueBuilder;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- /**
- * @ClassName LazyConfig
- * @Description 惰性队列配置类
- * @Author 孙克旭
- * @Date 2024/11/25 13:23
- */
- @Configuration
- public class LazyConfig {
- /**
- * 声明一个惰性队列
- *
- * @return
- */
- @Bean
- public Queue LazyQueue() {
- return QueueBuilder.durable("lazy.queue")
- .lazy().build();
- }
- }
复制代码 7. RabbitMQ集群
在设置文件中添加集群地址:
7.1 平凡集群
平凡分布式集群,将队列分散到集群的各个节点,从而进步整个集群的并发能力。每个节点有其他节点的元数据信息,但是没有其他节点的数据;当消费者读取一个节点时,若哀求的队列不在该节点,该节点会自动根据元数据查找指定的队列,并返回给消费者。
7.2 镜像集群
镜像集群:是一种主从集群,平凡集群的基础上,添加了主从备份功能,进步集群的数据可用性。
7.3 仲裁队列
Raft协议是一种分布式一致性协议,它用于在分布式体系中的多个节点之间告竣一致性。
8. 踩坑记录
1.在学习死信交换机时,在设置类中手动定义了ttl.exchange和ttl.queue,但是没有定义死信交换机和队列,测试也能正常通过。为啥死信交换机和队列能被自动创建?
原来是定义监听器时,该交换机和队列可以被持久化
- /**
- * 监听死信队列消息
- *
- * @param msg
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "dl.queue", durable = "true"),
- exchange = @Exchange(name = "dl.direct"),
- key = "dl"
- ))
- public void listenDlQueue(String msg) {
- log.info("消费者接收到了dl.queue的延迟消息:{}", msg);
- }
复制代码 当你使用 @Queue 和 @Exchange 注解定义队列和交换机时,Spring AMQP 会自动声明这些队列和交换机。这意味着,假如这些队列和交换机在 RabbitMQ 中不存在,Spring AMQP 会在应用启动时自动创建它们。这大大简化了设置,由于你不需要手动创建这些资源。
2.docker创建mq容器时指定了目次,但是不能运行容器。
后来发现是缺少参数:
- docker run \
- -e RABBITMQ_DEFAULT_USER=itcast \
- -e RABBITMQ_DEFAULT_PASS=123456 \
- -v $PWD/mq-plugins:/plugins \
- --name mq \
- --hostname mq1 \
- -p 15672:15672 \
- -p 5672:5672 \
- --privileged=true
- -d \
- rabbitmq:3.8-management
复制代码 --privileged=true 表示开放权限,即当运行这个实例以后,完成容器内部和宿主机中指定的绝对路径实现了信息的共享。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |