第五章 RabbitMQ高级

打印 上一主题 下一主题

主题 801|帖子 801|积分 2403

1. 生产者消息确认机制


1.1 设置文件

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.137.110 # rabbitMQ的ip地址
  4.     port: 5672 # 端口
  5.     username: itcast
  6.     password: 123456
  7.     virtual-host: /
  8.     publisher-confirm-type: correlated #异步
  9.     publisher-returns: true
  10.     template:
  11.       mandatory: true
复制代码
1.2 编写设置类

  1. package cn.itcast.mq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  5. import org.springframework.amqp.support.converter.MessageConverter;
  6. import org.springframework.beans.BeansException;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.ApplicationContext;
  9. import org.springframework.context.ApplicationContextAware;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. /**
  13. * @ClassName CommonConfig
  14. * @Description rabbitMQ生产者消息确认配置类
  15. * @Author 孙克旭
  16. * @Date 2024/11/24 14:47
  17. */
  18. @Configuration
  19. @Slf4j
  20. public class CommonConfig implements ApplicationContextAware {
  21.     @Override
  22.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  23.         //获取RabbitTemplate对象
  24.         RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  25.         //配置ReturnCallback
  26.         rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
  27.             //记录日志
  28.             log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
  29.                     replyCode, replyText, exchange, routingKey, message.toString());
  30.         }));
  31.     }
  32. }
复制代码
1.3 发送消息时携带关联数据

  1. package cn.itcast.mq.spring;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.test.context.junit4.SpringRunner;
  10. import java.util.UUID;
  11. @Slf4j
  12. @RunWith(SpringRunner.class)
  13. @SpringBootTest
  14. public class SpringAmqpTest {
  15.     @Autowired
  16.     private RabbitTemplate rabbitTemplate;
  17.     @Test
  18.     public void testSendMessage2SimpleQueue() throws InterruptedException {
  19.         String routingKey = "simple.test";
  20.         String message = "hello, spring amqp!";
  21.         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  22.         correlationData.getFuture().addCallback(result -> {
  23.             //判断结果
  24.             if (result.isAck()) {
  25.                 //ACK
  26.                 log.debug("消息成功投递到交换机,消息ID:{}", correlationData.getId());
  27.             } else {
  28.                 //NACK
  29.                 log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
  30.             }
  31.         }, ex -> {
  32.             //记录日志
  33.             log.error("消息发送失败!", ex);
  34.         });
  35.         rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
  36.     }
  37. }
复制代码
1.4 SpringAMQP中处置处罚消息确认的几种情况:

(1)publisher-comfirm:
消息成功发送到exchange,返回ack
消息发送失败,没有到达交换机,返回nack
消息发送过程中出现非常,没有收到回执
(2)消息成功发送到exchange,但没有路由到queue,调用ReturnCallback
2. 消息持久化

直接创建交换机或队列、发送消息时,数据都是持久的。
2.1 交换机和队列持久化

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  4. import org.springframework.amqp.support.converter.MessageConverter;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @ClassName CommonConfig
  9. * @Description 消息持久化配置类
  10. * @Author 孙克旭
  11. * @Date 2024/11/24 15:34
  12. */
  13. @Configuration
  14. public class CommonConfig {
  15.     /**
  16.      * 交换机持久化
  17.      *
  18.      * @return
  19.      */
  20.     @Bean
  21.     public DirectExchange simpleDirect() {
  22.         return new DirectExchange("simple.direct", true, false);
  23.     }
  24.     /**
  25.      * 队列持久化
  26.      *
  27.      * @return
  28.      */
  29.     @Bean
  30.     public Queue simpleQueue() {
  31.         return QueueBuilder.durable("simple.queue").build();
  32.     }
  33. }
复制代码
2.2 数据持久化

发送消息是指定数据发布的模式:
  1.    @Test
  2.     public void testDurableMessage() {
  3.         //1.准备消息
  4.         Message message = MessageBuilder.withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
  5.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
  6.                 .build();
  7.         //2.发送消息
  8.         rabbitTemplate.convertAndSend("simple.queue", message); //没有指定交换机名称,会使用默认交换机
  9.     }
复制代码
3. 消息重发机制

RabbitMQ的消息没有被消费者读取后,会自动返回队列,再次发送至消费者,并一直循环,直到该消息被消费。如许会极大的浪费mq的性能,所以需要手动设置消息重发机制。
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.137.110 # rabbitMQ的ip地址
  4.     port: 5672 # 端口
  5.     username: itcast
  6.     password: 123456
  7.     virtual-host: /
  8.     listener:
  9.       simple:
  10.         prefetch: 1 #消费者每次只能读取一个消息
  11.         acknowledge-mode: auto #消费者自动确认消息,表明消息已经被成功处理,然后RabbitMQ会从队列中移除这条消息
  12.         retry:
  13.           enabled: true #开启消费者失败重试
  14.           initial-interval: 1000 #初始的失败等待时长为1秒
  15.           multiplier: 3 #下次失败的等待时长倍数,下次等待时长=multiplier * last-interval
  16.           max-attempts: 4 #最大重试次数
  17.           stateless: true #ture 无状态;false 有状态。如果业务中包含事务,这里改为false
复制代码
3.1 重发失败消息应对策略


消费者将错误消息发送至指定的交换机。
  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  8. import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. /**
  12. * @ClassName ErrorMessageConfig
  13. * @Description 重发失败消息应对策略配置类
  14. * @Author 孙克旭
  15. * @Date 2024/11/24 16:28
  16. */
  17. @Configuration
  18. public class ErrorMessageConfig {
  19.     /**
  20.      * 接受错误消息的交换机
  21.      *
  22.      * @return
  23.      */
  24.     @Bean
  25.     public DirectExchange errorMessageExchange() {
  26.         return new DirectExchange("error.direct");
  27.     }
  28.     /**
  29.      * 接收错误消息的队列
  30.      *
  31.      * @return
  32.      */
  33.     @Bean
  34.     public Queue errorQueue() {
  35.         return new Queue("error.queue");
  36.     }
  37.     /**
  38.      * 将队列绑定到交换机
  39.      *
  40.      * @return
  41.      */
  42.     @Bean
  43.     public Binding errorMessageBinding() {
  44.         return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
  45.     }
  46.     /**
  47.      * 错误消息应对策略
  48.      * 在消息处理失败时重新发布消息到指定的交换机和队列
  49.      *
  50.      * @param rabbitTemplate
  51.      * @return
  52.      */
  53.     @Bean
  54.     public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
  55.         return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  56.     }
  57. }
复制代码
3.2 如何确保RabbitMQ消息的可靠性?

1.开启生产者确认机制,确保生产者的消息能到达队列
2.开启持久化功能,确保消息未消费前在队列中不会丢失
3.开启消费者确认机制为auto,由spring确认消息处置处罚成功后完成ack
4.开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到非常交换机,交由人工处置处罚
4. 死信交换机

死信会被队列发送给死信交换机
4.1 什么样的消息会成为死信?

(1)消息被消费者reject或者返回nack
(2)消息超时未消费
(3)队列满了,早期的消息被抛弃
4.2 如何给队列绑定死信交换机?

(1)给队列设置dead-letter-exchange属性,指定一个交换机
(2)给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

4.2.1 代码实现

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @ClassName TtlMessageConfig
  7. * @Description 死信消息路由规则配置类
  8. * @Author 孙克旭
  9. * @Date 2024/11/25 10:02
  10. */
  11. @Configuration
  12. public class TtlMessageConfig {
  13.     /**
  14.      * 交换机
  15.      *
  16.      * @return
  17.      */
  18.     @Bean
  19.     public DirectExchange ttlDirectExchange() {
  20.         return new DirectExchange("ttl.direct");
  21.     }
  22.     /**
  23.      * 定义队列,指定死信交换机和routing key
  24.      * 并设置消息存活时间
  25.      *
  26.      * @return
  27.      */
  28.     @Bean
  29.     public Queue ttlQueue() {
  30.         return QueueBuilder.durable("ttl.queue")
  31.                 .ttl(10000)
  32.                 .deadLetterExchange("dl.direct")
  33.                 .deadLetterRoutingKey("dl")
  34.                 .build();
  35.     }
  36.     /**
  37.      * 定义绑定规则
  38.      *
  39.      * @return
  40.      */
  41.     @Bean
  42.     public Binding ttlBinding() {
  43.         return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
  44.     }
  45. }
复制代码

监听器:
  1. package cn.itcast.mq.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * @ClassName SpringRabbitListener
  10. * @Description 队列监听器
  11. * @Author 孙克旭
  12. * @Date 2024/11/25 9:53
  13. */
  14. @Component
  15. @Slf4j
  16. public class SpringRabbitListener {
  17.     @RabbitListener(queues = "simple.queue")
  18.     public void listenSimpleQueue(String msg) {
  19.         System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
  20.     }
  21.     /**
  22.      * 监听死信队列消息
  23.      *
  24.      * @param msg
  25.      */
  26.     @RabbitListener(bindings = @QueueBinding(
  27.             value = @Queue(name = "dl.queue", durable = "true"),
  28.             exchange = @Exchange(name = "dl.direct"),
  29.             key = "dl"
  30.     ))
  31.     public void listenDlQueue(String msg) {
  32.         log.info("消费者接收到了dl.queue的延迟消息:{}", msg);
  33.     }
  34. }
复制代码

测试代码:
  1.    @Test
  2.     public void testTTLMessage() {
  3.         //1.准备消息
  4.         Message message = MessageBuilder.withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
  5.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
  6.                 .setExpiration("5000") //定义消息存活时间
  7.                 .build();
  8.         //2.发送消息
  9.         rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
  10.     }
复制代码
4.3 消息超时的两种方式是?

(1)给队列设置ttl属性
(2)给消息设置ttl属性
(3)两者共存时,以时间短的ttl为准
5. 延迟消息

5.1 安装rabbitmq插件


进入容器内部后,执行下面命令开启插件:
  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
5.2 代码实现

消费者:
  1.     /**
  2.      * 监听延迟消息
  3.      * 创建队列
  4.      * 创建延迟交换机,指定交换机类型为direct
  5.      * 指定routing key
  6.      *
  7.      * @param msg
  8.      */
  9.     @RabbitListener(bindings = @QueueBinding(
  10.             value = @Queue(name = "delay.queue", durable = "true"),
  11.             exchange = @Exchange(name = "delay.direct", type = "direct", delayed = "true"),
  12.             key = "delay"))
  13.     public void listenDelayExchange(String msg) {
  14.         log.info("消费者接收到了delay.queue的延迟消息:{}", msg);
  15.     }
复制代码
生产者测试代码:
  1.     /**
  2.      * 测试延迟消息
  3.      */
  4.     @Test
  5.     public void testDelayMessage() {
  6.         //1.准备消息
  7.         Message message = MessageBuilder
  8.                 .withBody("hello RabbitMQ".getBytes(StandardCharsets.UTF_8))
  9.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
  10.                 .setHeader("x-delay", 5000) //设置延迟时间
  11.                 .build();
  12.         //2.发送消息
  13.         rabbitTemplate.convertAndSend("delay.direct", "delay", message);
  14.     }
复制代码
5.3 延迟插件的使用步调有哪些?

(1)声明一个交换机,添加delayed属性为true
(2)发送消息时,添加x-delay头,值为延时时间
6. 惰性队列

6.1 消息堆积问题


6.1.1 解决方案

(1)增加更多的消费者,进步消费速度
(2)在消费者内开启线程池加速消息处置处罚速度
(3)扩大队列容积,进步堆积上限
6.2 惰性队列特征

(1)接收到消息直接存入磁盘而非内存
(2)消费者要消费消息时才会从磁盘读取并加载到内存
(3)支持数百万条的消息存储
6.2.1 长处

(1)基于磁盘存储,消息上限高
(2)没有间歇性的page-out(页面置换),性能比较稳固
  1. 这句话的意思是,在传统的队列模式下,RabbitMQ 会将消息存储在内存中,当内存不足时,会将内存中的消息换页至磁盘中,
  2. 这个操作会耗费较长的时间,并且可能会阻塞队列的操作,导致无法接收新的消息。
  3. 这种由于内存不足而导致的消息换页操作就是所谓的间歇性page-out。
  4. 而惰性队列由于一开始就将消息存储在磁盘上,避免了在内存和磁盘之间频繁地进行页面置换,因此减少了因页面置换导致的性能波动和
  5. 延迟,使得性能更加稳定。这意味着,即使在高负载或者消费者处理能力不足导致消息积压的情况下,惰性队列也能保持较好的性能表现,
  6. 不会因为内存压力而导致处理能力下降。
复制代码
6.2.2 缺点

(1)基于磁盘存储,消息时效性会降低
(2)性能受限于磁盘IO
6.3 代码实现

在消费者中定义惰性队列:
  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.amqp.core.QueueBuilder;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName LazyConfig
  8. * @Description 惰性队列配置类
  9. * @Author 孙克旭
  10. * @Date 2024/11/25 13:23
  11. */
  12. @Configuration
  13. public class LazyConfig {
  14.     /**
  15.      * 声明一个惰性队列
  16.      *
  17.      * @return
  18.      */
  19.     @Bean
  20.     public Queue LazyQueue() {
  21.         return QueueBuilder.durable("lazy.queue")
  22.                 .lazy().build();
  23.     }
  24. }
复制代码
7. RabbitMQ集群

在设置文件中添加集群地址:

7.1 平凡集群

平凡分布式集群,将队列分散到集群的各个节点,从而进步整个集群的并发能力。每个节点有其他节点的元数据信息,但是没有其他节点的数据;当消费者读取一个节点时,若哀求的队列不在该节点,该节点会自动根据元数据查找指定的队列,并返回给消费者。
7.2 镜像集群

镜像集群:是一种主从集群,平凡集群的基础上,添加了主从备份功能,进步集群的数据可用性。

7.3 仲裁队列


Raft协议是一种分布式一致性协议,它用于在分布式体系中的多个节点之间告竣一致性。

8. 踩坑记录

1.在学习死信交换机时,在设置类中手动定义了ttl.exchange和ttl.queue,但是没有定义死信交换机和队列,测试也能正常通过。为啥死信交换机和队列能被自动创建?
原来是定义监听器时,该交换机和队列可以被持久化
  1.    /**
  2.      * 监听死信队列消息
  3.      *
  4.      * @param msg
  5.      */
  6.     @RabbitListener(bindings = @QueueBinding(
  7.             value = @Queue(name = "dl.queue", durable = "true"),
  8.             exchange = @Exchange(name = "dl.direct"),
  9.             key = "dl"
  10.     ))
  11.     public void listenDlQueue(String msg) {
  12.         log.info("消费者接收到了dl.queue的延迟消息:{}", msg);
  13.     }
复制代码
当你使用 @Queue 和 @Exchange 注解定义队列和交换机时,Spring AMQP 会自动声明这些队列和交换机。这意味着,假如这些队列和交换机在 RabbitMQ 中不存在,Spring AMQP 会在应用启动时自动创建它们。这大大简化了设置,由于你不需要手动创建这些资源。
2.docker创建mq容器时指定了目次,但是不能运行容器。
后来发现是缺少参数:
  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=itcast \
  3. -e RABBITMQ_DEFAULT_PASS=123456 \
  4. -v $PWD/mq-plugins:/plugins \
  5. --name mq \
  6. --hostname mq1 \
  7. -p 15672:15672 \
  8. -p 5672:5672 \
  9. --privileged=true
  10. -d \
  11. rabbitmq:3.8-management
复制代码
--privileged=true 表示开放权限,即当运行这个实例以后,完成容器内部和宿主机中指定的绝对路径实现了信息的共享。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小秦哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表