梦见你的名字 发表于 2025-4-15 05:23:19

RabbitMQ底子——work queues、三种交换机、声明队列与交换机以及消息转换器

目次
work queues
Fanout交换机(广播)
Direct交换机(定向)
Topic交换机(话题)
声明队列与交换机
方式一(基于Bean)
方式二(基于注解)
消息转换器

work queues

work queues,即任务模型,即让多个消费者绑定到同一个队列,共同消费队列中的消息。
https://i-blog.csdnimg.cn/direct/3fee7ae37ae442f9a51fdf82a768ceb6.png
一般不额外设置的情况下,MQ会将消息依次轮询投递给绑定在队列上的每一个消费者,此时并未考虑消费者是否已经处理惩罚完成消息,容易造成消息的堆积。
为了减少由于性能差异造成的运行效果的区别,实现“能者多劳”的局面,可以在消费者的application.yml设置文件中设置:
spring:
rabbitmq:
    listener:
      simple:
      prefetch: 1 prefetch设置为1表示同一时刻最多投递给消费者1条消息,当消费者处理惩罚完成之后,MQ才能投递下一条消息。
总结work模型的使用:

[*]多个消费者绑定到一个队列,可以加快消息处理惩罚速率;
[*]同一条消息只会被一个消费者使用;
[*]可以通过设置prefetch参数来控制消费者预取的消息数量,实现能者多劳。

现实中的生产情况需要通过exchange来发送消息,而不是直接发送到队列。
假如没有交换机,就无法实现一个消息被多个消费者消费的场景(好比一个下单成功的消息需要被购物车服务和消费系统服务消费)。
Fanout交换机(广播)


Fanout交换时机将接收到的消息广播到每一个与其绑定的queue,所以也叫广播模式(一条消息大家都能收到)。
https://i-blog.csdnimg.cn/direct/92651f3c130c4ca28a729a473972d331.png
每一个微服务一般需要有自己的队列,可以在一个队列上绑定多个相同功能的微服务模块。
在生产者阶段,发送消息到交换机需要指定RoutingKey(作用接下来会说),可以填写null:
@Test
    void testSendFanout() {
      String exchangeName = "hmall.fanout";
      String msg = "hello, everyone!";
      rabbitTemplate.convertAndSend(exchangeName, null, msg);
    }
Direct交换机(定向)

场景:不同的微服务需要接收不同种类的消息。(好比支付成功需要发送短信、失败则不需要)
Direct交换时机将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。


[*]每一个Queue都与Exchange设置一个大概多个BindingKey;
[*]生产者发布消息时,会指定消息的RoutingKey;
[*]Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
由此可见,特殊情况下,Direct交换机也可以成为Fanout交换机。
@Test
    void testSendDirect() {
      String exchangeName = "hmall.direct";
      String msg = "RoutingKey为blue的通知~";
      rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    }
Topic交换机(话题)

Topic交换机与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,并且以“.”分割。
https://i-blog.csdnimg.cn/direct/e081b7a3dd4b4c2388486f82fe858f78.png

声明队列与交换机

方式一(基于Bean)

SpringAMQP提供了几个类,用于声明队列、交换机及其绑定关系。

[*]Queue:用于声明队列,可以用工厂类QueueBuilder构建;
[*]Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建;
[*]Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
示例代码:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

    @Bean
    public FanoutExchange fanoutExchange(){
      // ExchangeBuilder.fanoutExchange("").build();
      return new FanoutExchange("hmall.fanout2");
    }

    @Bean
    public Queue fanoutQueue3(){
      // QueueBuilder.durable("ff").build();
      return new Queue("fanout.queue3");
    }

    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
      //绑定方法1
      return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }

    @Bean
    public Queue fanoutQueue4(){
      return new Queue("fanout.queue4");
    }

    @Bean
    public Binding fanoutBinding4(){
      //绑定方法2
      //此处的fanoutQueue4()不是在调用本类中的fanoutQueue4()方法
      //在Spring中,凡是加了@Bean注解的都会被动态代理,当调用fanoutQueue4()时,Spring首先会检查Spring容器中有没有这个Bean
      //即名为fanoutQueue4的Bean,如果有,就不会执行fanoutQueue4()方法中的代码而是会直接返回这个Bean,如果没有,才会去执行这个方法生成Bean。
      return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    }
}
该方法的缺点:每一个绑定都需要重写一个方法,较为繁琐。

方式二(基于注解)

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
示例(在消费者的listener文件中声明):
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1", durable = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) throws InterruptedException {
      System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
    }
消息转换器

所有的数据传输都是转成字节的形式传递的,当传递map大概类对象的时间就会造成占用更多内存的情况。
内部是什么原理?简朴查看一下convertAndSend方法的源码:
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
      this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
    } 继续跟进其中的convertMessageIfNecessary()方法:
protected Message convertMessageIfNecessary(Object object) {
      return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
    } 可以看到这段代码是在判断当前对象是否为Message类,假如是,就不做处理惩罚。
此时,对于Map类型,不属于Message类型 ,会进行getRequiredMessageConverter(),即获取消息转换器,可以看到,默认的消息转换器是SimpleMessageConverter()。
https://i-blog.csdnimg.cn/direct/a2f1317fa9604eb380d79ab7cbc04972.png
在SimpleMessageConverter.class的源码为(已添加解释):
/**
* 根据传入的对象创建消息(Message),并根据对象类型配置消息属性(MessageProperties)。
*
* @param object 要转换的消息内容对象
* @param messageProperties 消息属性配置对象
* @return 创建的消息对象(Message)
* @throws MessageConversionException 对象转换失败时抛出
* @throws IllegalArgumentException 对象类型不支持时抛出
*/
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    // 初始化字节数组,用于存储对象转换后的二进制数据
    byte[] bytes = null;

    // 根据对象类型处理不同的序列化逻辑
    if (object instanceof byte[]) {
      // 情况1:对象本身已经是字节数组
      bytes = (byte[]) object; // 直接赋值
      messageProperties.setContentType("application/octet-stream"); // 设置内容类型为二进制流
    } else if (object instanceof String) {
      // 情况2:对象是字符串类型
      try {
            // 使用默认字符集将字符串转换为字节数组
            bytes = ((String) object).getBytes(this.defaultCharset);
      } catch (UnsupportedEncodingException var6) {
            // 捕获不支持的字符集异常,包装后抛出
            UnsupportedEncodingException e = var6;
            throw new MessageConversionException("failed to convert to Message content", e);
      }
      // 设置内容类型为纯文本,并指定字符集编码
      messageProperties.setContentType("text/plain");
      messageProperties.setContentEncoding(this.defaultCharset);
    } else if (object instanceof Serializable) {
      // 情况3:对象实现了Serializable接口(可序列化对象)
      try {
            // 使用序列化工具将对象序列化为字节数组
            bytes = SerializationUtils.serialize(object);
      } catch (IllegalArgumentException var5) {
            // 捕获序列化异常(如对象不可序列化),包装后抛出
            IllegalArgumentException e = var5;
            throw new MessageConversionException("failed to convert to serialized Message content", e);
      }
      // 设置内容类型为Java序列化对象
      messageProperties.setContentType("application/x-java-serialized-object");
    }

    // 最终处理逻辑
    if (bytes != null) {
      // 若成功转换出字节数组:
      messageProperties.setContentLength((long) bytes.length); // 设置消息内容的长度
      return new Message(bytes, messageProperties); // 创建并返回消息对象
    } else {
      // 若未处理任何类型(对象类型不受支持),抛出异常
      throw new IllegalArgumentException(
            this.getClass().getSimpleName() +
            " only supports String, byte[] and Serializable payloads, received: " +
            object.getClass().getName()
      );
    }
} 从这段源码中可以看出,对几种对象有以下几种处理惩罚方式:

[*] byte[]:直接作为二进制数据,标志内容类型为 application/octet-stream;
[*] String:按默认字符集转为字节数组,标志为 text/plain 并指定编码;
[*] Serializable对象:序列化为字节省,标志为 application/x-java-serialized-object;
[*] 其他类型:直接抛出 IllegalArgumentException,拒绝处理惩罚。
即所有有用类型最终均以字节数组存储,并在消息属性中明确数据类型和长度。
这种JDK的序列化的问题在于:


[*]转为字节数组的形式之后,体积太大了;
[*]代码可读性变低;
[*]有安全毛病,由于字节码文件是可变的,容易受到篡改。
为相识决这种缺陷,保举使用JSON序列化代替默认的JDK序列化,需要完成以下设置:
(1)在生产者和消费者中都要引入jackson依赖
<!--Jackson-->
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency> (2)在生产者和消费者中都要设置MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
      SpringApplication.run(PublisherApplication.class);
    }

    @Bean
    public MessageConverter jacksonMessageConvertor(){
      return new Jackson2JsonMessageConverter();
    }
} 生产者同样在启动类中设置。
这样就解决了原先的问题。

本文内容源自于黑马程序员的视频学习。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ底子——work queues、三种交换机、声明队列与交换机以及消息转换器