RabbitMQ底子——work queues、三种交换机、声明队列与交换机以及消息转换器
目次work queues
Fanout交换机(广播)
Direct交换机(定向)
Topic交换机(话题)
声明队列与交换机
方式一(基于Bean)
方式二(基于注解)
消息转换器
work queues
work queues,即任务模型,即让多个消费者绑定到同一个队列,共同消费队列中的消息。
https://i-blog.csdnimg.cn/direct/3fee7ae37ae442f9a51fdf82a768ceb6.png
一般不额外设置的情况下,MQ会将消息依次轮询投递给绑定在队列上的每一个消费者,此时并未考虑消费者是否已经处理惩罚完成消息,容易造成消息的堆积。
为了减少由于性能差异造成的运行效果的区别,实现“能者多劳”的局面,可以在消费者的application.yml设置文件中设置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 prefetch设置为1表示同一时刻最多投递给消费者1条消息,当消费者处理惩罚完成之后,MQ才能投递下一条消息。
总结work模型的使用:
[*]多个消费者绑定到一个队列,可以加快消息处理惩罚速率;
[*]同一条消息只会被一个消费者使用;
[*]可以通过设置prefetch参数来控制消费者预取的消息数量,实现能者多劳。
现实中的生产情况需要通过exchange来发送消息,而不是直接发送到队列。
假如没有交换机,就无法实现一个消息被多个消费者消费的场景(好比一个下单成功的消息需要被购物车服务和消费系统服务消费)。
Fanout交换机(广播)
Fanout交换时机将接收到的消息广播到每一个与其绑定的queue,所以也叫广播模式(一条消息大家都能收到)。
https://i-blog.csdnimg.cn/direct/92651f3c130c4ca28a729a473972d331.png
每一个微服务一般需要有自己的队列,可以在一个队列上绑定多个相同功能的微服务模块。
在生产者阶段,发送消息到交换机需要指定RoutingKey(作用接下来会说),可以填写null:
@Test
void testSendFanout() {
String exchangeName = "hmall.fanout";
String msg = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
Direct交换机(定向)
场景:不同的微服务需要接收不同种类的消息。(好比支付成功需要发送短信、失败则不需要)
Direct交换时机将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
[*]每一个Queue都与Exchange设置一个大概多个BindingKey;
[*]生产者发布消息时,会指定消息的RoutingKey;
[*]Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
由此可见,特殊情况下,Direct交换机也可以成为Fanout交换机。
@Test
void testSendDirect() {
String exchangeName = "hmall.direct";
String msg = "RoutingKey为blue的通知~";
rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
Topic交换机(话题)
Topic交换机与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,并且以“.”分割。
https://i-blog.csdnimg.cn/direct/e081b7a3dd4b4c2388486f82fe858f78.png
声明队列与交换机
方式一(基于Bean)
SpringAMQP提供了几个类,用于声明队列、交换机及其绑定关系。
[*]Queue:用于声明队列,可以用工厂类QueueBuilder构建;
[*]Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建;
[*]Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
示例代码:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("hmall.fanout2");
}
@Bean
public Queue fanoutQueue3(){
// QueueBuilder.durable("ff").build();
return new Queue("fanout.queue3");
}
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
//绑定方法1
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue4(){
return new Queue("fanout.queue4");
}
@Bean
public Binding fanoutBinding4(){
//绑定方法2
//此处的fanoutQueue4()不是在调用本类中的fanoutQueue4()方法
//在Spring中,凡是加了@Bean注解的都会被动态代理,当调用fanoutQueue4()时,Spring首先会检查Spring容器中有没有这个Bean
//即名为fanoutQueue4的Bean,如果有,就不会执行fanoutQueue4()方法中的代码而是会直接返回这个Bean,如果没有,才会去执行这个方法生成Bean。
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}
}
该方法的缺点:每一个绑定都需要重写一个方法,较为繁琐。
方式二(基于注解)
SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
示例(在消费者的listener文件中声明):
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
}
消息转换器
所有的数据传输都是转成字节的形式传递的,当传递map大概类对象的时间就会造成占用更多内存的情况。
内部是什么原理?简朴查看一下convertAndSend方法的源码:
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
} 继续跟进其中的convertMessageIfNecessary()方法:
protected Message convertMessageIfNecessary(Object object) {
return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
} 可以看到这段代码是在判断当前对象是否为Message类,假如是,就不做处理惩罚。
此时,对于Map类型,不属于Message类型 ,会进行getRequiredMessageConverter(),即获取消息转换器,可以看到,默认的消息转换器是SimpleMessageConverter()。
https://i-blog.csdnimg.cn/direct/a2f1317fa9604eb380d79ab7cbc04972.png
在SimpleMessageConverter.class的源码为(已添加解释):
/**
* 根据传入的对象创建消息(Message),并根据对象类型配置消息属性(MessageProperties)。
*
* @param object 要转换的消息内容对象
* @param messageProperties 消息属性配置对象
* @return 创建的消息对象(Message)
* @throws MessageConversionException 对象转换失败时抛出
* @throws IllegalArgumentException 对象类型不支持时抛出
*/
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// 初始化字节数组,用于存储对象转换后的二进制数据
byte[] bytes = null;
// 根据对象类型处理不同的序列化逻辑
if (object instanceof byte[]) {
// 情况1:对象本身已经是字节数组
bytes = (byte[]) object; // 直接赋值
messageProperties.setContentType("application/octet-stream"); // 设置内容类型为二进制流
} else if (object instanceof String) {
// 情况2:对象是字符串类型
try {
// 使用默认字符集将字符串转换为字节数组
bytes = ((String) object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
// 捕获不支持的字符集异常,包装后抛出
UnsupportedEncodingException e = var6;
throw new MessageConversionException("failed to convert to Message content", e);
}
// 设置内容类型为纯文本,并指定字符集编码
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
// 情况3:对象实现了Serializable接口(可序列化对象)
try {
// 使用序列化工具将对象序列化为字节数组
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
// 捕获序列化异常(如对象不可序列化),包装后抛出
IllegalArgumentException e = var5;
throw new MessageConversionException("failed to convert to serialized Message content", e);
}
// 设置内容类型为Java序列化对象
messageProperties.setContentType("application/x-java-serialized-object");
}
// 最终处理逻辑
if (bytes != null) {
// 若成功转换出字节数组:
messageProperties.setContentLength((long) bytes.length); // 设置消息内容的长度
return new Message(bytes, messageProperties); // 创建并返回消息对象
} else {
// 若未处理任何类型(对象类型不受支持),抛出异常
throw new IllegalArgumentException(
this.getClass().getSimpleName() +
" only supports String, byte[] and Serializable payloads, received: " +
object.getClass().getName()
);
}
} 从这段源码中可以看出,对几种对象有以下几种处理惩罚方式:
[*] byte[]:直接作为二进制数据,标志内容类型为 application/octet-stream;
[*] String:按默认字符集转为字节数组,标志为 text/plain 并指定编码;
[*] Serializable对象:序列化为字节省,标志为 application/x-java-serialized-object;
[*] 其他类型:直接抛出 IllegalArgumentException,拒绝处理惩罚。
即所有有用类型最终均以字节数组存储,并在消息属性中明确数据类型和长度。
这种JDK的序列化的问题在于:
[*]转为字节数组的形式之后,体积太大了;
[*]代码可读性变低;
[*]有安全毛病,由于字节码文件是可变的,容易受到篡改。
为相识决这种缺陷,保举使用JSON序列化代替默认的JDK序列化,需要完成以下设置:
(1)在生产者和消费者中都要引入jackson依赖
<!--Jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency> (2)在生产者和消费者中都要设置MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
} 生产者同样在启动类中设置。
这样就解决了原先的问题。
本文内容源自于黑马程序员的视频学习。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]