rabbitmq交换机

打印 上一主题 下一主题

主题 786|帖子 786|积分 2358

交换机

Fanout交换机(广播)



创建队列

创建fanout.queue01和fanout.queue02


创建交换机



创建绑定关系





测试



两个队列都收到了消息


总结

交换机的作用


  • 吸收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列
Direct交换机



创建队列

创建direct.queue01和direct.queue02


创建交换机



创建绑定关系










测试

key=red 发送消息


可以看到,两个队列都收到了


key=blue发送消息


可以看到,只有direct.queue01收到消息了(因为它绑定的key是red和blue)


总结

在direct模型下


  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key举行判断,只有队列的Routingkey与消息的 Routing key完全划一,才会吸收到消息
Topic交换机

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到差别的队列。
只不外Topic类型Exchange可以让队列在绑定BindingKey 的时间使用通配符!
BindingKey 一样平常都是有一个或多个单词组成,多个单词之间以.分割,比方: item.insert
通配符规则:


  • #:匹配0个或多个词(包括1个)
  • *:匹配不多不少恰好1个词
举例:


  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu



假如此时publisher发送的消息使用的RoutingKey共有四种:


  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;
解释:


  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:




    • china.news
    • china.weather



  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:




    • china.news
    • japan.news


创建队列



创建交换机



创建绑定关系





测试

发送消息,routingkey=china.news


可以看到,两个队列都收到消息了



发送消息,routingkey=china.fujian.news


两个队列都收到了,因为#是匹配0个或多个

发送消息,routingkey=china.


只有topic.queue01收到,符合预期


总结

形貌下Direct交换机与Topic交换机的差异?


  • Topic交换机吸收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

用java代码声明队列和交换机

基本api

SpringAMQP提供了一个Queue类,用来创建队列


SpringAMQP还提供了一个Exchange接口,来表示全部差别类型的交换机:


我们可以自己创建队列和交换机,不外SpringAMQP还提供了ExchangeBuilder来简化这个过程


而在绑定队列和交换机时,则必要使用BindingBuilder来创建Binding对象



案例

创建一个springboot项目,导入web rabbitmq依赖
rabbitmq控制台新建一个虚拟主机,名为/test
  1. # 应用服务 WEB 访问端口
  2. server.port=8080
  3. # rabbitmq配置
  4. # 主机ip
  5. spring.rabbitmq.host=192.168.168.168
  6. # rabbitmq的编程端口,默认5672
  7. spring.rabbitmq.port=5672
  8. # 账号和密码
  9. spring.rabbitmq.username=chen
  10. spring.rabbitmq.password=123456
  11. # 虚拟主机
  12. spring.rabbitmq.virtual-host=/test
  13. # 通过设置prefetch来控制消费者预取的消息数量。这条配置告诉RabbitMQ的消费者一次只从队列中拉取一条消息进行处理。
  14. spring.rabbitmq.listener.simple.prefetch=1
复制代码
只声明队列和交换机,没有声明队列的消费者,队列是不会被创建的
fanout

  1. package com.gmgx.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.FanoutExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Slf4j
  10. @Configuration
  11. public class FanoutConfig {
  12.     //声明队列
  13.     @Bean
  14.     public Queue fanoutQueue01() {
  15.         return new Queue("fanout.queue1");
  16.     }
  17.     @Bean
  18.     public Queue fanoutQueue02() {
  19.         return new Queue("fanout.queue2");
  20.     }
  21.     //声明交换机
  22.     @Bean
  23.     public FanoutExchange fanoutExchange() {
  24.         return new FanoutExchange("fanout.exchange");
  25.     }
  26.     //声明绑定关系   bind 队列 到 交换机
  27.     @Bean
  28.     public Binding binding01() {
  29.         return BindingBuilder.bind(fanoutQueue01()).to(fanoutExchange());
  30.     }
  31.     @Bean
  32.     public Binding binding02() {
  33.         return BindingBuilder.bind(fanoutQueue02()).to(fanoutExchange());
  34.     }
  35. }
复制代码

  1. package com.gmgx.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class FanoutListener {
  6.     @RabbitListener(queues = "fanout.queue1")
  7.     public void listen01(String message) {
  8.         System.out.println("队列1 Received message: " + message);
  9.     }
  10.     @RabbitListener(queues = "fanout.queue2")
  11.     public void listen02(String message) {
  12.         System.out.println("队列2 Received message: " + message);
  13.     }
  14. }
复制代码

  1. @Test
  2. void testFanout() {
  3.     String msg = "hello 二爷人用额!";
  4.     for (int i = 0; i < 10; i++) {
  5.         rabbitTemplate.convertAndSend("fanout.exchange", "", msg + i);
  6.     }
  7. }
复制代码
direct

  1. package com.gmgx.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class DirectConfig {
  10.     //声明队列
  11.     @Bean
  12.     public Queue queue1() {
  13.         return new Queue("direct.queue1");
  14.     }
  15.     @Bean
  16.     public Queue queue2() {
  17.         return new Queue("direct.queue2");
  18.     }
  19.     //声明交换机
  20.     @Bean
  21.     public DirectExchange exchange() {
  22.         return new DirectExchange("direct.exchange");
  23.     }
  24.     //声明绑定关系
  25.     @Bean
  26.     public Binding binding1() {
  27.         return BindingBuilder.bind(queue1()).to(exchange()).with("red");
  28.     }
  29.     @Bean
  30.     public Binding binding2() {
  31.         return BindingBuilder.bind(queue2()).to(exchange()).with("green");
  32.     }
  33. }
复制代码

  1. package com.gmgx.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class DirectListener {
  6.     @RabbitListener(queues = "direct.queue1")
  7.     public void listen01(String msg) {
  8.         System.out.println("队列1 收到消息 : " + msg);
  9.     }
  10.     @RabbitListener(queues = "direct.queue2")
  11.     public void listen02(String msg) {
  12.         System.out.println("队列2 收到消息 : " + msg);
  13.     }
  14. }
复制代码

  1. @Test
  2. void testDirect() {
  3.     rabbitTemplate.convertAndSend("direct.exchange", "red", "this is a red msg!!");
  4.     rabbitTemplate.convertAndSend("direct.exchange", "green", "this is a green msg!!");
  5.     rabbitTemplate.convertAndSend("direct.exchange", "green", "this is a green msg!!");
  6. }
复制代码
topic

  1. package com.gmgx.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class TopicConfig {
  10.     //声明队列
  11.     @Bean
  12.     public Queue topicQueue1() {
  13.         return new Queue("topic.queue1");
  14.     }
  15.     @Bean
  16.     public Queue topicQueue2() {
  17.         return new Queue("topic.queue2");
  18.     }
  19.     //声明交换机
  20.     @Bean
  21.     public TopicExchange topicExchange() {
  22.         return new TopicExchange("topic.exchange");
  23.     }
  24.     //声明绑定关系
  25.     @Bean
  26.     public Binding topicBinding1() {
  27.         return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("china.#");
  28.     }
  29.     @Bean
  30.     public Binding topicBinding2() {
  31.         return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("#.news");
  32.     }
  33. }
复制代码

  1. package com.gmgx.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class TopicListener {
  6.     @RabbitListener(queues = "topic.queue1")
  7.     public void listen1(String msg) {
  8.         System.out.println("队列1 routingKey=china.# " + msg);
  9.     }
  10.     @RabbitListener(queues = "topic.queue2")
  11.     public void listen2(String msg) {
  12.         System.out.println("队列2 routingKey=#.news " + msg);
  13.     }
  14. }
复制代码

  1. @Test
  2. void testTopic() {
  3.     rabbitTemplate.convertAndSend("topic.exchange", "china.news", "key=china.news");
  4.     rabbitTemplate.convertAndSend("topic.exchange", "china.fujian.news", "key=china.fujian.news");
  5.     rabbitTemplate.convertAndSend("topic.exchange", "japan.news", "key=japan.news");
  6.     rabbitTemplate.convertAndSend("topic.exchange", "china.weather", "key=china.weather");
  7. }
复制代码
基于注解声明交换机、队列、消费者

  1. package com.gmgx.listener;
  2. import org.springframework.amqp.core.ExchangeTypes;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class TopicListener {
  10.     @RabbitListener(bindings = @QueueBinding(
  11.             value = @Queue(name = "topic.queue1"),
  12.             exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),//不指定默认为direct类型
  13.             key = "china.#"
  14.     ))
  15.     public void listen1(String msg) {
  16.         System.out.println("topic.queue1接收到消息 : " + msg);
  17.     }
  18.     @RabbitListener(bindings = @QueueBinding(
  19.             value = @Queue(name = "topic.queue2"),
  20.             exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
  21.             key = "#.news"
  22.     ))
  23.     public void listen2(String msg) {
  24.         System.out.println("topic.queue2接收到消息 : " + msg);
  25.     }
  26. }
复制代码

  1. @Test
  2. void testTopic() {
  3.     rabbitTemplate.convertAndSend("topic.exchange", "china.news", "key=china.news");
  4.     rabbitTemplate.convertAndSend("topic.exchange", "china.fujian.news", "key=china.fujian.news");
  5.     rabbitTemplate.convertAndSend("topic.exchange", "japan.news", "key=japan.news");
  6.     rabbitTemplate.convertAndSend("topic.exchange", "china.weather", "key=china.weather");
  7. }
复制代码
消息转换器

使用转换器发送对象数据到队列

新建一个obj.queue 往内里发一个Student对象的消息

  1. package com.gmgx.entity;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.io.Serializable;
  6. @Data
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class Student implements Serializable {
  10.     private static final long serialVersionUID = 1L;
  11.     private int id;
  12.     private String name;
  13.     private int age;
  14. }
复制代码

  1. @Test
  2. void testObject(){
  3.     Student stu = new Student(1, "张三", 22);
  4.     rabbitTemplate.convertAndSend("obj.queue", stu);
  5. }
复制代码

取出来的数据变成了这样


这是因为:
默认情况下Spring采用的序列化方式是JDK序列化。Student对象被序列化后传给队列。
众所周知,JDK序列化存在下列问题:


  • 数据体积过大
  • 有安全毛病
  • 可读性差
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

使用json转换器

  1. <dependency>
  2.     <groupId>com.fasterxml.jackson.dataformat</groupId>
  3.     <artifactId>jackson-dataformat-xml</artifactId>
  4.     <version>2.9.10</version>
  5. </dependency>
复制代码
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
  1. package com.gmgx.config;
  2. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  3. import org.springframework.amqp.support.converter.MessageConverter;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMqConfig {
  8.     @Bean
  9.     public MessageConverter messageConverter(){
  10.         // 1.定义消息转换器
  11.         Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  12.         // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  13.         jackson2JsonMessageConverter.setCreateMessageIds(true);
  14.         return jackson2JsonMessageConverter;
  15.     }
  16. }
复制代码
现在重新发送消息到obj.queue
可以看到是json格式,只占33个字节



现在写一个消费者来监听队列
  1. package com.gmgx.listener;
  2. import com.gmgx.entity.Student;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class ObjListener {
  7.     @RabbitListener(queues = "obj.queue")
  8.     public void objListener(Student stu) {
  9.         System.out.println("obj.queue 接收到"+stu);
  10.     }
  11. }
复制代码
重新执行测试,得到如下结果


符合预期

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我爱普洱茶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表