【Rabbitmq篇】高级特性----事务,消息分发

打印 上一主题 下一主题

主题 910|帖子 910|积分 2730

目录
事务
消息分发 
应用场景
 1. 限流
2.负载均衡 

事务

   RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.SpringAMQP也提供了对事务相关的操作.RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部乐成,要么全部失败.
    何为原子性(面试重点)?
  例如: 当A向B转账1000元,会履历俩个步调
  1.A 向 B 转账 1000元 A的账号将会减去1000元
  2.B将会收到1000元 B的账号将会增长1000元
  但是,如果遇到极端环境,当A向B转账1000元时,A-1000元已完成,这个时间体系出现故障,导致A-1000 但是B却没有接收到 那么1000元将无缘无故丢失了 ,肯定不会允许这种事情发生,不然谁还敢转账。
  此时就是将1操作和2操作绑定在一起,要么同时完成,要么一个都不执行
  当出现1执行失败的时间,将1操作举行“回滚”,回到原来的状态,就当一切都没发生过
  接下来实现rabbitmq的事务
声明队列:
  1.     //事务
  2.     public static final String TRANS_QUEUE = "trans_queue";
  3.     @Bean("transQueue")
  4.     public Queue transQueue() {
  5.         return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
  6.     }
复制代码
 配置事务管理器:
  1.     @Bean
  2.     public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
  3.         return new RabbitTransactionManager(connectionFactory);
  4.     }
  5.     @Bean("transRabbitTemple")
  6.     public RabbitTemplate transRabbitTemple(ConnectionFactory connectionFactory) {
  7.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  8.         //开启事务
  9.         rabbitTemplate.setChannelTransacted(true);
  10.         return  rabbitTemplate;
  11.     }
复制代码
生产者代码编写:
  1.     @RequestMapping("/trans")
  2.     public String trans() {
  3.         System.out.println("trans test...");
  4.         transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
  5.         int num = 5/0;
  6.         transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
  7.         return "消息发送成功";
  8.     }
复制代码
测试:
1)不带 @Transactional 带异常的发送 看看会发生什么?
 

   此时只有发送的第一条消息,紧接着发生了异常导致第二条消息未发送乐成  
   
2) 带 @Transactional 带异常的发送 看看会发生什么? 
  1.     @Transactional    @RequestMapping("/trans")
  2.     public String trans() {
  3.         System.out.println("trans test...");
  4.         transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
  5.         int num = 5/0;
  6.         transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
  7.         return "消息发送成功";
  8.     }
复制代码

 

   此时发生异常 原来发送了一条消息 但有异常,举行了回滚,当做没发生
  也证明了我们事务的可靠性 
   3)带 @Transactional 不带异常的发送 看看会发生什么?
  1.     @Transactional
  2.     @RequestMapping("/trans")
  3.     public String trans() {
  4.         System.out.println("trans test...");
  5.         transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
  6. //        int num = 5/0;
  7.         transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
  8.         return "消息发送成功";
  9.     }
复制代码

   
 此结果一切正常
  
消息分发 

   RabbitMQ队列拥有多个消耗者时,队列会把收到的消息分派给不同的消耗者.每条消息只会发送给订阅列表里的⼀个消耗者.这种方式⾮常适合扩展,如果如今负载加重,那么只需要创建更多的消耗者来消耗处理消息即可.
  
  
默认环境下,RabbitMQ是以轮询的方法举行分发的,而不管消耗者是否已经消耗并已经确认了消息.这种方式是不太公道的,试想⼀下,如果某些消耗者消耗速度慢,而某些消耗者消耗速度快,就可能会导致某些消耗者消息积压,某些消耗者空闲,进而应用整体的吞吐量降落.

  这样A都做完了10个使命,B还在写第一个使命,这样将会大大影响服从,从而导致整个的服从降落
  
  怎样处理呢我们可以利用前面章节讲到的channel.basicQos(intprefetchCount)方法,来限制当前信道上的消耗者所能保持的最大未确认消息的数量
  比如:消耗端调用了channelbasicQos(1),
  此时A接收1条信息,并且消耗1条 B同时也接收1条信息 但是它服从比较慢 全部它还在消耗 而A处理完1条消息又接着处理第二条消息,属于多劳多得,并不会因为B影响整体的服从
  应用场景

 1. 限流

   如下利用场景:
订单体系每秒最多处理5000哀求,正常环境下,订单体系可以正常满足需求
但是在秒杀时间点,哀求刹时增多,每秒1万个哀求,如果这些哀责备部通过MQ发送到订单体系,无疑会把订单体系压垮.

  RabbitMQ提供了限流机制,可以控制消耗端⼀次只拉取N个哀求
通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答
prefetchCount:控制消耗者从队列中预取(prefetch)消息的数量,以此来实现流控制和负载均衡.

  1) 配置prefetch参数,设置应答方式为手动应答 

 2) 配置交换机,队列
  1. package com.bite.extensions.config;
  2. import com.bite.extensions.constant.Constants;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class QosConfig {
  9.     @Bean("qosQueue")
  10.     public Queue qosQueue() {
  11.         return QueueBuilder.durable(Constants.QOS_QUEUE).build();
  12.     }
  13.     @Bean("qosExchange")
  14.     public DirectExchange qosExchange() {
  15.         return  ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
  16.     }
  17.     @Bean("qosBinding")
  18.     public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") DirectExchange directExchange) {
  19.         return BindingBuilder.bind(queue).to(directExchange).with("qos");
  20.     }
  21. }
复制代码
3) 生产者
  1.     @RequestMapping("/qos")
  2.     public String qos() {
  3.         System.out.println("qos test...");
  4.         for (int i = 0; i < 15; i++) {
  5.             rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test i..."+i);
  6.         }
  7.         return "消息发送成功";
  8.     }
复制代码
4)消耗者
  1. package com.bite.extensions.listener;
  2. import com.bite.extensions.constant.Constants;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class QosListener {
  9.     @RabbitListener(queues = Constants.QOS_QUEUE)
  10.     public void handleMessage(Message message, Channel channel) throws Exception {
  11.         //消费者逻辑
  12.         long deliverTag = message.getMessageProperties().getDeliveryTag();
  13.         try {
  14.             System.out.printf("[qos.queue]接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
  15. /*            //业务逻辑处理
  16.             System.out.println("业务逻辑处理!");
  17.             //肯定确认
  18.             channel.basicAck(deliverTag,false);*/
  19.         } catch (Exception e) {
  20.             //否定确认
  21.             channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
  22.         }
  23.     }
  24. }
复制代码
 5)测试1 未设置肯定确认环境





   此时将会只接收到5条,并且会壅闭住,达到一个限流的状态  测试2
把 prefetch: 5 注掉 再观看结果

   此时将会一次性把队列的消息全部发送,并且全部消耗
  2.负载均衡 

   如下图,在有两个消耗者的环境下,⼀个消耗者处理使命非常快,另⼀个非常慢,就会造成⼀个消耗者会⼀直很忙,而另⼀个消耗者很闲.这是因为RabbitMQ只是在消息进入队列时分派消息.它不考虑消耗者未确认消息的数量.
  

   我们可以利用设置prefetch=1的⽅式,告诉RabbitMQ⼀次只给⼀个消耗者⼀条消息,也就是说,在处理并确认前⼀条消息之前,不要向该消耗者发送新消息.相反,它会将它分派给下⼀个不忙的消耗者. 

  消耗者: 
  1. package com.bite.extensions.listener;
  2. import com.bite.extensions.constant.Constants;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class QosListener {
  9.     @RabbitListener(queues = Constants.QOS_QUEUE)
  10.     public void handleMessage(Message message, Channel channel) throws Exception {
  11.         //消费者逻辑
  12.         long deliverTag = message.getMessageProperties().getDeliveryTag();
  13.         try {
  14.             System.out.printf("第一个消费者 接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
  15.             Thread.sleep(3000);
  16.             channel.basicAck(deliverTag,false);
  17.         } catch (Exception e) {
  18.             //否定确认
  19.             channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
  20.         }
  21.     }
  22.     @RabbitListener(queues = Constants.QOS_QUEUE)
  23.     public void handleMessage2(Message message, Channel channel) throws Exception {
  24.         //消费者逻辑
  25.         long deliverTag = message.getMessageProperties().getDeliveryTag();
  26.         try {
  27.             System.out.printf("第二个消费者 接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
  28.             Thread.sleep(1000);
  29.             channel.basicAck(deliverTag,false);
  30.         } catch (Exception e) {
  31.             //否定确认
  32.             channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
  33.         }
  34.     }
  35. }
复制代码
 

 结果:

   这里可以看出每个消耗者以不同的速度完成某项使命 以防止一个消耗者未完成等很久的环境
  
结语: 写博客不仅仅是为了分享学习履历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何题目标还请指出,接受大家的品评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莫张周刘王

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表