目次
work queues
Fanout交换机(广播)
Direct交换机(定向)
Topic交换机(话题)
声明队列与交换机
方式一(基于Bean)
方式二(基于注解)
消息转换器
work queues
work queues,即任务模型,即让多个消费者绑定到同一个队列,共同消费队列中的消息。
一般不额外设置的情况下,MQ会将消息依次轮询投递给绑定在队列上的每一个消费者,此时并未考虑消费者是否已经处理惩罚完成消息,容易造成消息的堆积。
为了减少由于性能差异造成的运行效果的区别,实现“能者多劳”的局面,可以在消费者的application.yml设置文件中设置:
- spring:
- rabbitmq:
- listener:
- simple:
- prefetch: 1
复制代码 prefetch设置为1表示同一时刻最多投递给消费者1条消息,当消费者处理惩罚完成之后,MQ才能投递下一条消息。
总结work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理惩罚速率;
- 同一条消息只会被一个消费者使用;
- 可以通过设置prefetch参数来控制消费者预取的消息数量,实现能者多劳。
现实中的生产情况需要通过exchange来发送消息,而不是直接发送到队列。
假如没有交换机,就无法实现一个消息被多个消费者消费的场景(好比一个下单成功的消息需要被购物车服务和消费系统服务消费)。
Fanout交换机(广播)
Fanout交换时机将接收到的消息广播到每一个与其绑定的queue,所以也叫广播模式(一条消息大家都能收到)。
每一个微服务一般需要有自己的队列,可以在一个队列上绑定多个相同功能的微服务模块。
在生产者阶段,发送消息到交换机需要指定RoutingKey(作用接下来会说),可以填写null:
- @Test
- void testSendFanout() {
- String exchangeName = "hmall.fanout";
- String msg = "hello, everyone!";
- rabbitTemplate.convertAndSend(exchangeName, null, msg);
- }
复制代码
Direct交换机(定向)
场景:不同的微服务需要接收不同种类的消息。(好比支付成功需要发送短信、失败则不需要)
Direct交换时机将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个大概多个BindingKey;
- 生产者发布消息时,会指定消息的RoutingKey;
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
由此可见,特殊情况下,Direct交换机也可以成为Fanout交换机。
- @Test
- void testSendDirect() {
- String exchangeName = "hmall.direct";
- String msg = "RoutingKey为blue的通知~";
- rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
- }
复制代码
Topic交换机(话题)
Topic交换机与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,并且以“.”分割。
声明队列与交换机
方式一(基于Bean)
SpringAMQP提供了几个类,用于声明队列、交换机及其绑定关系。
- Queue:用于声明队列,可以用工厂类QueueBuilder构建;
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建;
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
示例代码:
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- public class FanoutConfiguration {
- @Bean
- public FanoutExchange fanoutExchange(){
- // ExchangeBuilder.fanoutExchange("").build();
- return new FanoutExchange("hmall.fanout2");
- }
- @Bean
- public Queue fanoutQueue3(){
- // QueueBuilder.durable("ff").build();
- return new Queue("fanout.queue3");
- }
- @Bean
- public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
- //绑定方法1
- return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
- }
- @Bean
- public Queue fanoutQueue4(){
- return new Queue("fanout.queue4");
- }
- @Bean
- public Binding fanoutBinding4(){
- //绑定方法2
- //此处的fanoutQueue4()不是在调用本类中的fanoutQueue4()方法
- //在Spring中,凡是加了@Bean注解的都会被动态代理,当调用fanoutQueue4()时,Spring首先会检查Spring容器中有没有这个Bean
- //即名为fanoutQueue4的Bean,如果有,就不会执行fanoutQueue4()方法中的代码而是会直接返回这个Bean,如果没有,才会去执行这个方法生成Bean。
- return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
- }
- }
复制代码 该方法的缺点:每一个绑定都需要重写一个方法,较为繁琐。
方式二(基于注解)
SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
示例(在消费者的listener文件中声明):
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1", durable = "true"),
- exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
- key = {"red", "blue"}
- ))
- public void listenDirectQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
- }
复制代码
消息转换器
所有的数据传输都是转成字节的形式传递的,当传递map大概类对象的时间就会造成占用更多内存的情况。
内部是什么原理?简朴查看一下convertAndSend方法的源码:
- public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
- this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
- }
复制代码 继续跟进其中的convertMessageIfNecessary()方法:
- protected Message convertMessageIfNecessary(Object object) {
- return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
- }
复制代码 可以看到这段代码是在判断当前对象是否为Message类,假如是,就不做处理惩罚。
此时,对于Map类型,不属于Message类型 ,会进行getRequiredMessageConverter(),即获取消息转换器,可以看到,默认的消息转换器是SimpleMessageConverter()。

在SimpleMessageConverter.class的源码为(已添加解释):
- /**
- * 根据传入的对象创建消息(Message),并根据对象类型配置消息属性(MessageProperties)。
- *
- * @param object 要转换的消息内容对象
- * @param messageProperties 消息属性配置对象
- * @return 创建的消息对象(Message)
- * @throws MessageConversionException 对象转换失败时抛出
- * @throws IllegalArgumentException 对象类型不支持时抛出
- */
- protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
- // 初始化字节数组,用于存储对象转换后的二进制数据
- byte[] bytes = null;
- // 根据对象类型处理不同的序列化逻辑
- if (object instanceof byte[]) {
- // 情况1:对象本身已经是字节数组
- bytes = (byte[]) object; // 直接赋值
- messageProperties.setContentType("application/octet-stream"); // 设置内容类型为二进制流
- } else if (object instanceof String) {
- // 情况2:对象是字符串类型
- try {
- // 使用默认字符集将字符串转换为字节数组
- bytes = ((String) object).getBytes(this.defaultCharset);
- } catch (UnsupportedEncodingException var6) {
- // 捕获不支持的字符集异常,包装后抛出
- UnsupportedEncodingException e = var6;
- throw new MessageConversionException("failed to convert to Message content", e);
- }
- // 设置内容类型为纯文本,并指定字符集编码
- messageProperties.setContentType("text/plain");
- messageProperties.setContentEncoding(this.defaultCharset);
- } else if (object instanceof Serializable) {
- // 情况3:对象实现了Serializable接口(可序列化对象)
- try {
- // 使用序列化工具将对象序列化为字节数组
- bytes = SerializationUtils.serialize(object);
- } catch (IllegalArgumentException var5) {
- // 捕获序列化异常(如对象不可序列化),包装后抛出
- IllegalArgumentException e = var5;
- throw new MessageConversionException("failed to convert to serialized Message content", e);
- }
- // 设置内容类型为Java序列化对象
- messageProperties.setContentType("application/x-java-serialized-object");
- }
- // 最终处理逻辑
- if (bytes != null) {
- // 若成功转换出字节数组:
- messageProperties.setContentLength((long) bytes.length); // 设置消息内容的长度
- return new Message(bytes, messageProperties); // 创建并返回消息对象
- } else {
- // 若未处理任何类型(对象类型不受支持),抛出异常
- throw new IllegalArgumentException(
- this.getClass().getSimpleName() +
- " only supports String, byte[] and Serializable payloads, received: " +
- object.getClass().getName()
- );
- }
- }
复制代码 从这段源码中可以看出,对几种对象有以下几种处理惩罚方式:
- byte[]:直接作为二进制数据,标志内容类型为 application/octet-stream;
- String:按默认字符集转为字节数组,标志为 text/plain 并指定编码;
- Serializable对象:序列化为字节省,标志为 application/x-java-serialized-object;
- 其他类型:直接抛出 IllegalArgumentException,拒绝处理惩罚。
即所有有用类型最终均以字节数组存储,并在消息属性中明确数据类型和长度。
这种JDK的序列化的问题在于:
- 转为字节数组的形式之后,体积太大了;
- 代码可读性变低;
- 有安全毛病,由于字节码文件是可变的,容易受到篡改。
为相识决这种缺陷,保举使用JSON序列化代替默认的JDK序列化,需要完成以下设置:
(1)在生产者和消费者中都要引入jackson依赖
- <!--Jackson-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
复制代码 (2)在生产者和消费者中都要设置MessageConverter
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
- @SpringBootApplication
- public class PublisherApplication {
- public static void main(String[] args) {
- SpringApplication.run(PublisherApplication.class);
- }
- @Bean
- public MessageConverter jacksonMessageConvertor(){
- return new Jackson2JsonMessageConverter();
- }
- }
复制代码 生产者同样在启动类中设置。
这样就解决了原先的问题。
本文内容源自于黑马程序员的视频学习。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |