一给 发表于 2025-11-5 22:38:10

RabbitMQ---事件及消息分发

(一)事件

   RabbitMQ是基于AMQP协议实现的,该协议实现了事件机制,以是RabbitMQ也支持事件机制,他的事件答应开辟者确保消息的发送和吸取时原子性的,要么全部乐成,要么全部失败
 我们设置事件有三步,起首就是开启事件
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvNGM0MzcwYzZjOWNkNDNkMTg3ZjE1YjBiM2RlOTVjZTEucG5n由于我们是针对rabbitTemplate来作设置的,以是会影响此rabbitTemplate的全部消息,这里我们新开了一个
然后我们利用时要加一个注解
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvOTk4NjBiMWM0ZjMxNGEwMzk0OTczNTdiZGQxZjYzMjEucG5n
 末了一步我们须要加上事件管理
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvMjllMjY3NGZmNjA4NGMwNDk5NmU3N2Y1YTgxMjVmZmMucG5n
如许我们就乐成开启了事件
这三步是必不可少的,缺少一步,都无法乐成开启事件
(二)消息分发

1.概念

  RabbitMQ队列有多个消耗者时,队列会把消息分给差别的消耗者,每条消息只发给一个消耗者,默认情况下,RabbitMQ是按照轮询的方式分发的,不管消耗者是否已经消耗并确认消息,以是这种方式是很有大概造成消息积存的,由于有的消耗者处理处罚消息的速率快,有的就慢,如许会导致团体吞吐量降落
  那至于我们要如那边理,我们可以通过channelbasic()这个方法来限定信道上的最大消息数目,RabbitMQ每向该消耗者发送消息,就会使消息计数+1,消耗者消耗消息就会使消息计数-1,当到达了上线,RabbitMQ就不会再发送消息了,知道消耗者又消耗了一条消息
2.应用场景

 我们可以将消息分发应用在限流.非公中分发(负载均衡中)
1)限流

 我们来看一个例子,我们有一个订单体系,每秒可以处理处罚5000个哀求,在正常情况下,是不会有题目的,但是在一些特别时间比如11.11,哀求量就会突增,如果这些哀叱责部直接发送到消耗者(订单体系),那么就会把我们订单体系给弄崩了
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvOGZhNmQ2MWEyNDEwNDZlMTlmMjRhODNkN2FhZDgzMTkucG5n
  RabbitMQ提供了限流机制,可以控制消耗者一次最多拉取多少个哀求,通过在设置文件中设置prefetch参数即可 
我们来看代码
由于要限流,限定拉取的哀求,以是我们这里要设置手动确认,如果我们设置成主动确认,那我们收到消息就确认,实在跟没有设置限流差别是不大的
还是先看设置代码

spring:
rabbitmq:
    addresses: amqp://student:student@62.234.46.219:5672/test
    listener:
      simple:
      #      acknowledge-mode: NONE
      #      acknowledge-mode: AUTO
      acknowledge-mode: MANUAL
      prefetch: 5
      retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 5000ms # 初始失败等待时⻓为5秒
          max-attempts: 5
#    publisher-confirm-type: correlated#消息发送确认
这里我们设置一次最多拉取5条 
然后我们生产者一次发送10条消息

@RestController
@RequestMapping("producer")
public class Qos {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @RequestMapping("/qos")
    public String qosConsumer(){
      for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","qos "+i);
      }
      return "发送成功";
    }
}
然后消耗者消耗消息,但是这里我们没有举行确认,重要是为了方便观察结果,如果我们确认了,自己我们消息就不多而且代码中没什么复杂逻辑,处理处罚的很快,肉眼上就像一次全部获取了一样
 

@Component
public class QosConsumer {
    @RabbitListener(queues = Constant.QOS_QUEUE)
    public void listenerQueue(Message message, Channel channel) throws UnsupportedEncodingException {
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      System.out.printf("接收到消息: %s, deliveryTag: %d%n",
                new String(message.getBody(),"UTF-8"), deliveryTag);
    }
}
之后我们来看结果 
 https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvYTZlY2NjMzQxMTI4NDA3MmJlMzQ2YWM5MzQwMWEzZDMucG5n
吸取到了5条消息
然后我们看看控制台,发现确实有5条还没有发送,有5条已经确认了
 https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvMWJjOTdmYjEwNTM3NGFkZmJkN2E5ZjcyMDY5ZjVmYjcucG5n
那我们再把刚刚设置文件的限定取消了,再看看结果
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvYzY4MzI4YTczZWJlNDExYjgxMWJlZDA0MDBiNzQ3YzQucG5n 我们发现消息会直接全部发送到消耗者
2)负载均衡

我们也可以用这个设置来实现负载均衡
 这里我们说的负载均衡,并不是说两个消耗者每个人都处理处罚雷同数目的消息,而是两个人谁干的快谁就多干,谁干的慢谁就少干
 如图,我们有两个消耗者,一个处理处罚的快,一个处理处罚的满,就会有消耗者快的处理处罚完后不知道干什么,而另一个消耗者还不停在处理处罚,这是由于RabbitMQ只是在消息进入队列时分配消息,并不思量消耗者未确认消息的数目 
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvOTUzMGQ5MTExNWYwNGRmY2JjMTEwNTcyNTA4MGQyZDQucG5n  我们可以设置prefetch为1,如许每个消耗者都会只有一条消息,在这条消息处理处罚完之前,就不会有其他消息
   对于消耗者来说就是,干的越快的干的越多
  那实在我个人以为限流和负载均衡是很像的,由于负载均衡就是多个消耗者一起限流,然后谁干的快,谁就再拿,而且上述例子上的prdfetch为1也不愿定,我们也可以是其他值
 接下来我们看代码
这里我们只须要多加一个消耗者然后把设置文件prefetch改为1即可

@Component
public class QosConsumer {
    @RabbitListener(queues = Constant.QOS_QUEUE)
    public void listenerQueue(Message message, Channel channel) throws IOException, InterruptedException {
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      Thread.sleep(1000);
      System.out.printf("1接收到消息: %s, deliveryTag: %d%n",
                new String(message.getBody(),"UTF-8"), deliveryTag);
      channel.basicAck(deliveryTag,false);
    }
    @RabbitListener(queues = Constant.QOS_QUEUE)
    public void listenerQueue2(Message message, Channel channel) throws IOException, InterruptedException {
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      Thread.sleep(2000);
      System.out.printf("2接收到消息: %s, deliveryTag: %d%n",
                new String(message.getBody(),"UTF-8"), deliveryTag);
      channel.basicAck(deliveryTag,false);
    }
}然后我们看征象,我们发现只有我们确认了之后,消息才会在再发送给这个消耗者,来到达负载均衡的结果 
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9kaXJlY3QvMGNkYzVkNjA0ZWI2NDVlNmIxOTRkNTI0OTU5YTM0NmYucG5n
deliveryTag有重复是由于两个消耗者使⽤的是差别的Channel,每个Channel上的 deliveryTag 是独⽴计数的

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ---事件及消息分发