目录
事务
消息分发
应用场景
1. 限流
2.负载均衡
事务
RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.SpringAMQP也提供了对事务相关的操作.RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部乐成,要么全部失败.
何为原子性(面试重点)?
例如: 当A向B转账1000元,会履历俩个步调
1.A 向 B 转账 1000元 A的账号将会减去1000元
2.B将会收到1000元 B的账号将会增长1000元
但是,如果遇到极端环境,当A向B转账1000元时,A-1000元已完成,这个时间体系出现故障,导致A-1000 但是B却没有接收到 那么1000元将无缘无故丢失了 ,肯定不会允许这种事情发生,不然谁还敢转账。
此时就是将1操作和2操作绑定在一起,要么同时完成,要么一个都不执行
当出现1执行失败的时间,将1操作举行“回滚”,回到原来的状态,就当一切都没发生过
接下来实现rabbitmq的事务
声明队列:
- //事务
- public static final String TRANS_QUEUE = "trans_queue";
- @Bean("transQueue")
- public Queue transQueue() {
- return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
- }
复制代码 配置事务管理器:
- @Bean
- public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
- return new RabbitTransactionManager(connectionFactory);
- }
- @Bean("transRabbitTemple")
- public RabbitTemplate transRabbitTemple(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- //开启事务
- rabbitTemplate.setChannelTransacted(true);
- return rabbitTemplate;
- }
复制代码 生产者代码编写:
- @RequestMapping("/trans")
- public String trans() {
- System.out.println("trans test...");
- transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
- int num = 5/0;
- transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
- return "消息发送成功";
- }
复制代码 测试:
1)不带 @Transactional 带异常的发送 看看会发生什么?
此时只有发送的第一条消息,紧接着发生了异常导致第二条消息未发送乐成
2) 带 @Transactional 带异常的发送 看看会发生什么?
- @Transactional @RequestMapping("/trans")
- public String trans() {
- System.out.println("trans test...");
- transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
- int num = 5/0;
- transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
- return "消息发送成功";
- }
复制代码
此时发生异常 原来发送了一条消息 但有异常,举行了回滚,当做没发生
也证明了我们事务的可靠性
3)带 @Transactional 不带异常的发送 看看会发生什么?
- @Transactional
- @RequestMapping("/trans")
- public String trans() {
- System.out.println("trans test...");
- transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
- // int num = 5/0;
- transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
- return "消息发送成功";
- }
复制代码
此结果一切正常
消息分发
RabbitMQ队列拥有多个消耗者时,队列会把收到的消息分派给不同的消耗者.每条消息只会发送给订阅列表里的⼀个消耗者.这种方式⾮常适合扩展,如果如今负载加重,那么只需要创建更多的消耗者来消耗处理消息即可.
默认环境下,RabbitMQ是以轮询的方法举行分发的,而不管消耗者是否已经消耗并已经确认了消息.这种方式是不太公道的,试想⼀下,如果某些消耗者消耗速度慢,而某些消耗者消耗速度快,就可能会导致某些消耗者消息积压,某些消耗者空闲,进而应用整体的吞吐量降落.
这样A都做完了10个使命,B还在写第一个使命,这样将会大大影响服从,从而导致整个的服从降落
怎样处理呢我们可以利用前面章节讲到的channel.basicQos(intprefetchCount)方法,来限制当前信道上的消耗者所能保持的最大未确认消息的数量
比如:消耗端调用了channelbasicQos(1),
此时A接收1条信息,并且消耗1条 B同时也接收1条信息 但是它服从比较慢 全部它还在消耗 而A处理完1条消息又接着处理第二条消息,属于多劳多得,并不会因为B影响整体的服从
应用场景
1. 限流
如下利用场景:
订单体系每秒最多处理5000哀求,正常环境下,订单体系可以正常满足需求
但是在秒杀时间点,哀求刹时增多,每秒1万个哀求,如果这些哀责备部通过MQ发送到订单体系,无疑会把订单体系压垮.
RabbitMQ提供了限流机制,可以控制消耗端⼀次只拉取N个哀求
通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答
prefetchCount:控制消耗者从队列中预取(prefetch)消息的数量,以此来实现流控制和负载均衡.
1) 配置prefetch参数,设置应答方式为手动应答
2) 配置交换机,队列
- package com.bite.extensions.config;
- import com.bite.extensions.constant.Constants;
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class QosConfig {
- @Bean("qosQueue")
- public Queue qosQueue() {
- return QueueBuilder.durable(Constants.QOS_QUEUE).build();
- }
- @Bean("qosExchange")
- public DirectExchange qosExchange() {
- return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
- }
- @Bean("qosBinding")
- public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") DirectExchange directExchange) {
- return BindingBuilder.bind(queue).to(directExchange).with("qos");
- }
- }
复制代码 3) 生产者
- @RequestMapping("/qos")
- public String qos() {
- System.out.println("qos test...");
- for (int i = 0; i < 15; i++) {
- rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test i..."+i);
- }
- return "消息发送成功";
- }
复制代码 4)消耗者
- package com.bite.extensions.listener;
- import com.bite.extensions.constant.Constants;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class QosListener {
- @RabbitListener(queues = Constants.QOS_QUEUE)
- public void handleMessage(Message message, Channel channel) throws Exception {
- //消费者逻辑
- long deliverTag = message.getMessageProperties().getDeliveryTag();
- try {
- System.out.printf("[qos.queue]接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
- /* //业务逻辑处理
- System.out.println("业务逻辑处理!");
- //肯定确认
- channel.basicAck(deliverTag,false);*/
- } catch (Exception e) {
- //否定确认
- channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
- }
- }
- }
复制代码 5)测试1 未设置肯定确认环境
此时将会只接收到5条,并且会壅闭住,达到一个限流的状态 测试2
把 prefetch: 5 注掉 再观看结果
此时将会一次性把队列的消息全部发送,并且全部消耗
2.负载均衡
如下图,在有两个消耗者的环境下,⼀个消耗者处理使命非常快,另⼀个非常慢,就会造成⼀个消耗者会⼀直很忙,而另⼀个消耗者很闲.这是因为RabbitMQ只是在消息进入队列时分派消息.它不考虑消耗者未确认消息的数量.
我们可以利用设置prefetch=1的⽅式,告诉RabbitMQ⼀次只给⼀个消耗者⼀条消息,也就是说,在处理并确认前⼀条消息之前,不要向该消耗者发送新消息.相反,它会将它分派给下⼀个不忙的消耗者.
消耗者:
- package com.bite.extensions.listener;
- import com.bite.extensions.constant.Constants;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- @Component
- public class QosListener {
- @RabbitListener(queues = Constants.QOS_QUEUE)
- public void handleMessage(Message message, Channel channel) throws Exception {
- //消费者逻辑
- long deliverTag = message.getMessageProperties().getDeliveryTag();
- try {
- System.out.printf("第一个消费者 接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
- Thread.sleep(3000);
- channel.basicAck(deliverTag,false);
- } catch (Exception e) {
- //否定确认
- channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
- }
- }
- @RabbitListener(queues = Constants.QOS_QUEUE)
- public void handleMessage2(Message message, Channel channel) throws Exception {
- //消费者逻辑
- long deliverTag = message.getMessageProperties().getDeliveryTag();
- try {
- System.out.printf("第二个消费者 接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
- Thread.sleep(1000);
- channel.basicAck(deliverTag,false);
- } catch (Exception e) {
- //否定确认
- channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
- }
- }
- }
复制代码 
结果:
这里可以看出每个消耗者以不同的速度完成某项使命 以防止一个消耗者未完成等很久的环境
结语: 写博客不仅仅是为了分享学习履历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何题目标还请指出,接受大家的品评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |