RabbitMQ底子——work queues、三种交换机、声明队列与交换机以及消息转换 ...

打印 上一主题 下一主题

主题 1485|帖子 1485|积分 4455

目次
work queues
Fanout交换机(广播)
Direct交换机(定向)
Topic交换机(话题)
声明队列与交换机
方式一(基于Bean)
方式二(基于注解)
消息转换器


work queues

work queues,即任务模型,即让多个消费者绑定到同一个队列,共同消费队列中的消息。

一般不额外设置的情况下,MQ会将消息依次轮询投递给绑定在队列上的每一个消费者,此时并未考虑消费者是否已经处理惩罚完成消息,容易造成消息的堆积。
为了减少由于性能差异造成的运行效果的区别,实现“能者多劳”的局面,可以在消费者的application.yml设置文件中设置:
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         prefetch: 1
复制代码
prefetch设置为1表示同一时刻最多投递给消费者1条消息,当消费者处理惩罚完成之后,MQ才能投递下一条消息。
总结work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理惩罚速率;
  • 同一条消息只会被一个消费者使用;
  • 可以通过设置prefetch参数来控制消费者预取的消息数量,实现能者多劳。

现实中的生产情况需要通过exchange来发送消息,而不是直接发送到队列。
假如没有交换机,就无法实现一个消息被多个消费者消费的场景(好比一个下单成功的消息需要被购物车服务和消费系统服务消费)。
Fanout交换机(广播)


Fanout交换时机将接收到的消息广播到每一个与其绑定的queue,所以也叫广播模式(一条消息大家都能收到)。

每一个微服务一般需要有自己的队列,可以在一个队列上绑定多个相同功能的微服务模块。
在生产者阶段,发送消息到交换机需要指定RoutingKey(作用接下来会说),可以填写null:
  1. @Test
  2.     void testSendFanout() {
  3.         String exchangeName = "hmall.fanout";
  4.         String msg = "hello, everyone!";
  5.         rabbitTemplate.convertAndSend(exchangeName, null, msg);
  6.     }
复制代码

Direct交换机(定向)

场景:不同的微服务需要接收不同种类的消息。(好比支付成功需要发送短信、失败则不需要)
Direct交换时机将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。


  • 每一个Queue都与Exchange设置一个大概多个BindingKey;
  • 生产者发布消息时,会指定消息的RoutingKey;
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
由此可见,特殊情况下,Direct交换机也可以成为Fanout交换机。
  1. @Test
  2.     void testSendDirect() {
  3.         String exchangeName = "hmall.direct";
  4.         String msg = "RoutingKey为blue的通知~";
  5.         rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
  6.     }
复制代码

Topic交换机(话题)

Topic交换机与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,并且以“.”分割。


声明队列与交换机

方式一(基于Bean)

SpringAMQP提供了几个类,用于声明队列、交换机及其绑定关系。

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建;
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建;
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
示例代码:
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class FanoutConfiguration {
  9.     @Bean
  10.     public FanoutExchange fanoutExchange(){
  11.         // ExchangeBuilder.fanoutExchange("").build();
  12.         return new FanoutExchange("hmall.fanout2");
  13.     }
  14.     @Bean
  15.     public Queue fanoutQueue3(){
  16.         // QueueBuilder.durable("ff").build();
  17.         return new Queue("fanout.queue3");
  18.     }
  19.     @Bean
  20.     public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
  21.         //绑定方法1
  22.         return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
  23.     }
  24.     @Bean
  25.     public Queue fanoutQueue4(){
  26.         return new Queue("fanout.queue4");
  27.     }
  28.     @Bean
  29.     public Binding fanoutBinding4(){
  30.         //绑定方法2
  31.         //此处的fanoutQueue4()不是在调用本类中的fanoutQueue4()方法
  32.         //在Spring中,凡是加了@Bean注解的都会被动态代理,当调用fanoutQueue4()时,Spring首先会检查Spring容器中有没有这个Bean
  33.         //即名为fanoutQueue4的Bean,如果有,就不会执行fanoutQueue4()方法中的代码而是会直接返回这个Bean,如果没有,才会去执行这个方法生成Bean。
  34.         return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
  35.     }
  36. }
复制代码
该方法的缺点:每一个绑定都需要重写一个方法,较为繁琐。

方式二(基于注解)

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
示例(在消费者的listener文件中声明):
  1. @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "direct.queue1", durable = "true"),
  3.             exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
  4.             key = {"red", "blue"}
  5.     ))
  6.     public void listenDirectQueue1(String msg) throws InterruptedException {
  7.         System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
  8.     }
复制代码

消息转换器

所有的数据传输都是转成字节的形式传递的,当传递map大概类对象的时间就会造成占用更多内存的情况。
内部是什么原理?简朴查看一下convertAndSend方法的源码:
  1. public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
  2.         this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
  3.     }
复制代码
继续跟进其中的convertMessageIfNecessary()方法:
  1. protected Message convertMessageIfNecessary(Object object) {
  2.         return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
  3.     }
复制代码
可以看到这段代码是在判断当前对象是否为Message类,假如是,就不做处理惩罚。
此时,对于Map类型,不属于Message类型 ,会进行getRequiredMessageConverter(),即获取消息转换器,可以看到,默认的消息转换器是SimpleMessageConverter()。

在SimpleMessageConverter.class的源码为(已添加解释):
  1. /**
  2. * 根据传入的对象创建消息(Message),并根据对象类型配置消息属性(MessageProperties)。
  3. *
  4. * @param object 要转换的消息内容对象
  5. * @param messageProperties 消息属性配置对象
  6. * @return 创建的消息对象(Message)
  7. * @throws MessageConversionException 对象转换失败时抛出
  8. * @throws IllegalArgumentException 对象类型不支持时抛出
  9. */
  10. protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  11.     // 初始化字节数组,用于存储对象转换后的二进制数据
  12.     byte[] bytes = null;
  13.     // 根据对象类型处理不同的序列化逻辑
  14.     if (object instanceof byte[]) {
  15.         // 情况1:对象本身已经是字节数组
  16.         bytes = (byte[]) object; // 直接赋值
  17.         messageProperties.setContentType("application/octet-stream"); // 设置内容类型为二进制流
  18.     } else if (object instanceof String) {
  19.         // 情况2:对象是字符串类型
  20.         try {
  21.             // 使用默认字符集将字符串转换为字节数组
  22.             bytes = ((String) object).getBytes(this.defaultCharset);
  23.         } catch (UnsupportedEncodingException var6) {
  24.             // 捕获不支持的字符集异常,包装后抛出
  25.             UnsupportedEncodingException e = var6;
  26.             throw new MessageConversionException("failed to convert to Message content", e);
  27.         }
  28.         // 设置内容类型为纯文本,并指定字符集编码
  29.         messageProperties.setContentType("text/plain");
  30.         messageProperties.setContentEncoding(this.defaultCharset);
  31.     } else if (object instanceof Serializable) {
  32.         // 情况3:对象实现了Serializable接口(可序列化对象)
  33.         try {
  34.             // 使用序列化工具将对象序列化为字节数组
  35.             bytes = SerializationUtils.serialize(object);
  36.         } catch (IllegalArgumentException var5) {
  37.             // 捕获序列化异常(如对象不可序列化),包装后抛出
  38.             IllegalArgumentException e = var5;
  39.             throw new MessageConversionException("failed to convert to serialized Message content", e);
  40.         }
  41.         // 设置内容类型为Java序列化对象
  42.         messageProperties.setContentType("application/x-java-serialized-object");
  43.     }
  44.     // 最终处理逻辑
  45.     if (bytes != null) {
  46.         // 若成功转换出字节数组:
  47.         messageProperties.setContentLength((long) bytes.length); // 设置消息内容的长度
  48.         return new Message(bytes, messageProperties); // 创建并返回消息对象
  49.     } else {
  50.         // 若未处理任何类型(对象类型不受支持),抛出异常
  51.         throw new IllegalArgumentException(
  52.             this.getClass().getSimpleName() +
  53.             " only supports String, byte[] and Serializable payloads, received: " +
  54.             object.getClass().getName()
  55.         );
  56.     }
  57. }
复制代码
从这段源码中可以看出,对几种对象有以下几种处理惩罚方式:

  • byte[]:直接作为二进制数据,标志内容类型为 application/octet-stream;
  • String:按默认字符集转为字节数组,标志为 text/plain 并指定编码;
  • Serializable对象:序列化为字节省,标志为 application/x-java-serialized-object;
  • 其他类型:直接抛出 IllegalArgumentException,拒绝处理惩罚。
即所有有用类型最终均以字节数组存储,并在消息属性中明确数据类型和长度。
这种JDK的序列化的问题在于:


  • 转为字节数组的形式之后,体积太大了;
  • 代码可读性变低;
  • 有安全毛病,由于字节码文件是可变的,容易受到篡改。
为相识决这种缺陷,保举使用JSON序列化代替默认的JDK序列化,需要完成以下设置:
(1)在生产者和消费者中都要引入jackson依赖
  1. <!--Jackson-->
  2. <dependency>
  3.     <groupId>com.fasterxml.jackson.dataformat</groupId>
  4.     <artifactId>jackson-dataformat-xml</artifactId>
  5. </dependency>
复制代码
(2)在生产者和消费者中都要设置MessageConverter
  1. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  2. import org.springframework.amqp.support.converter.MessageConverter;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. @SpringBootApplication
  7. public class PublisherApplication {
  8.     public static void main(String[] args) {
  9.         SpringApplication.run(PublisherApplication.class);
  10.     }
  11.     @Bean
  12.     public MessageConverter jacksonMessageConvertor(){
  13.         return new Jackson2JsonMessageConverter();
  14.     }
  15. }
复制代码
生产者同样在启动类中设置。
这样就解决了原先的问题。

本文内容源自于黑马程序员的视频学习。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表