RabbitMQ个人实践

打印 上一主题 下一主题

主题 667|帖子 667|积分 2001

前言

MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。
RabbitMq

案例

Springboot整合RabbitMQ简单案例
基本概念



  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • Producer:消息生产者,就是投递消息的程序。
  • Consumer:消息消费者,就是接受消息的程序。
发布消息到RabbitMQ需要经过两步:

  • producer → exchange
  • exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列
工作流程

了解了RabbitMQ的一些概念,我们来捋捋使用RabbitMQ的流程:

  • 创建Exchange
  • 创建Queue
  • 将Queue绑定进Exchange中(此处会设置routing key)
  • 生产者发布消息
  • 消费者订阅消息
交换机(Exchange)

交换机可以绑定队列,绑定时可以给队列指定路由(Routing key)和参数(Arguments)
所有的消息发送都是经过交换机转发到队列的,而不是直接到队列中
交换机类型:

  • direct
    根据确定的路由(routing key)转发消息到队列中(一条消息可以发到多个队列,只要路由相同)
  • fanout
    路由无效,只要和该交换机绑定的队列,都能接收到消息
  • topic
    允许路由使用*和#来进行模糊匹配
    *表示一个单词
    表示任意数量(零个或多个)单词

    例如:如果队列的路由为com.#   那么往交换机发消息是,路由填com.ccc   队列就可以收到消息
  • headers
    忽略路由,由参数(Arguments)来确定转发的队列
消息过期时间TTL

有两种方式设置TTL,创建队列时设置整个队列的TTL或者在发送消息时单独设置每条消息的TTL,消息存活时间取两者的最小值。

  • 创建队列时设置
    是消息的存活时间,不是队列的存活时间,别搞混了。
    1. @Bean
    2. public Queue queue(){
    3.     Map<String, Object> args = new HashMap<>();
    4.     args.put("x-message-ttl", 5000); // 设置队列中的消息5秒过期
    5.     return new Queue("queueName",true, false, false, args);
    6. }
    复制代码
  • 发送消息时设置
    1. public void makeOrder(String userid,String productid,int num){
    2.     String exchangeName = "ttl_exchange";
    3.     String routingKey = "ttlmessage";
    4.     //给消息设置过期时间
    5.     MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
    6.         public Message postProcessMessage(Message message){
    7.             // 设置消息5秒过期
    8.             message.getMessageProperties().setExpiration("5000");
    9.             return message;
    10.         }
    11.     }
    12.     rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);
    13. }
    复制代码
死信队列

死信队列也是一个正常队列,只是当绑定了死信队列的队列满足相应条件,就会将满足条件的消息转移到死信队列中。
进入死信队列的条件:

  • 消息被拒绝
  • 消息过期(超时)
  • 队列达到最大长度
死信队列的配置:

  • 按照正常步骤定义一个队列(交换机、队列、绑定)
  • 给需要绑定死信队列的队列添加x-dead-letter-exchange(死信队列的交换机)和x-dead-letter-routing-key(死信队列的路由)参数
    1. @Bean
    2. public Queue queue(){
    3.     Map<String, Object> args = new HashMap<>();
    4.                 args.put("x-dead-letter-exchange", "死信队列交换机名称");
    5.                 args.put("x-dead-letter-routing-key", "死信队列路由");
    6.     return new Queue("queueName",true, false, false, args);
    7. }
    复制代码
如何保证MQ消息正确送达与消费

可靠性生产和推送

步骤:

  • 发送消息前数据库保存MQ消息发送日志
  • MQ消息发送后使用回调更新日志状态
实现:
上面我们讲了,发布消息到RabbitMQ需要经过两步:
producer → exchange
exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列
所以,发布消息的确认也分两个部分,以下是确认步骤:

  • 修改MQ应答机制(yml)
    1. spring:
    2.   rabbitmq:
    3.     username: rmq
    4.     password: 123456
    5.     virtual-host: /
    6.     host: localhost
    7.     port: 5672
    8.     # 发送消息确认,producer -> exchange
    9.     publisher-confirm-type: correlated
    10.     # 发送消息确认,exchange -> queue
    11.     publisher-returns: true
    复制代码
  • 新增mq的回调方法
    1. /**
    2. * PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。
    3. * Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。
    4. * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
    5. * PostConstruct在构造函数之后执行,init()方法之前执行。
    6. * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
    7. */
    8. @PostConstruct
    9. private void regCallBack() {
    10.     // producer -> exchange 成功或失败都会触发此回调
    11.     rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    12.         @Override
    13.         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    14.             // 这个id是在消息发送的时候传入的
    15.             String id = correlationData.getId();
    16.             // 如果ack为true代表消息被mq成功接收
    17.             if (!ack) {
    18.                 // 应答失败,修改日志状态
    19.                 System.out.println("exchange 应答失败,做失败处理!");
    20.             } else {
    21.                 // 应答成功,修改日志状态
    22.                 System.out.println("exchange 成功处理");
    23.             }
    24.         }
    25.     });
    26.     // 这个回调只有exchange -> queue 失败时才会触发
    27.     rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    28.         @Override
    29.         public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    30.             System.out.println("exchange -> queue 发送失败");
    31.         }
    32.     });
    33. }
    复制代码
  • 修改MQ发送消息的方法,增加日志id的传递
    1. String correlationId = "这是日志id";
    2. rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() {
    3.     @Override
    4.     public Message postProcessMessage(Message message) throws AmqpException {
    5.         // 消费者需要correlationId才做这个处理
    6.         message.getMessageProperties().setCorrelationId(correlationId);
    7.         return message;
    8.     }
    9. }, new CorrelationData(correlationId));
    10. // 如果消费者不需要获取correlationId,则用下面这种即可
    11. rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));
    复制代码
可靠性消费

步骤:

  • 开启手动应答
  • 监听器增加手动应答逻辑
实现:

  • 开启手动应答
    1. spring:
    2.   rabbitmq:
    3.     username: rmq
    4.     password: 123456
    5.     virtual-host: /
    6.     host: localhost
    7.     port: 5672
    8.     listener:
    9.       simple:
    10.         acknowledge-mode: manual # 将自动应答ack模式改成手动应答
    复制代码
    acknowledge-mode有三种类型:

    • nome:不进行ack,rabbitmq默认消费者正确处理所有请求
    • munual:手动确认
    • auto:自动确认消息(默认类型)。如果消费者抛出异常,则消息重回队列。

  • 监听器增加手动应答逻辑
    1. @RabbitListener(queues = {"队列名字"})
    2. public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{
    3.     // 需要producer做相应处理,consumer才能拿到correlationId
    4.     String correlationId = messages.getMessageProperties().getCorrelationId();
    5.     System.out.println("消息为:" + orderMsg);
    6.     long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString());
    7.     try {
    8.         // 消费成功,进行确认
    9.         channel.basicAck(tag, false);
    10.     } catch (IOException e) {
    11.         // 消费失败,重发
    12.         // requeue代表是否重发,为false则直接将消息丢弃,有死信就进入死信队列
    13.         channel.basicNack(tag, false, true);
    14.     }
    15. }
    复制代码
总结

本文介绍了RabbitMQ的一些概念和简单使用,有不少东西其实是没有讲清楚的,比如publisher-confirm-type和acknowledge-mode的几种类型的区别等等。主要是在官方文档找不到相关的细致描述,查文档的能力还是有待提高。。。
参考资料
RabbitMq 技术文档 - 腾讯云开发者社区-腾讯云 (tencent.com)
Spring AMQP

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

伤心客

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表