IT评测·应用市场-qidao123.com

标题: RabbitMQ---交换机-Fanout-Direct [打印本页]

作者: 金歌    时间: 2024-7-22 23:55
标题: RabbitMQ---交换机-Fanout-Direct


Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的范例有四种:

Fanout交换机


简朴点来说,就是生产者把消息发给交换机,交换机根据路由(绑定规则)来转发消息给队列,斲丧者订阅队列,获得消息。
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:


01使用官方文档的

消息发送:

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.util.Scanner;
  5. public class EmitLog {
  6.     //定义交换机
  7.     private static final String EXCHANGE_NAME = "fanout-exchange";
  8.     public static void main(String[] argv) throws Exception {
  9.         //创建连接工厂
  10.         ConnectionFactory factory = new ConnectionFactory();
  11.         factory.setHost("localhost");
  12.         //建立连接和通道
  13.         try (Connection connection = factory.newConnection();
  14.              Channel channel = connection.createChannel()) {
  15.             //通过channel.exchangeDeclare方法声明了一个名为EXCHANGE_NAME(即"logs")的交换机
  16.             // 并指定了其类型为fanout。fanout类型的交换机会将消息广播到所有与之绑定的队列中。
  17.             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  18.            //发送消息
  19.             Scanner scanner=new Scanner(System.in);
  20.             while(scanner.hasNext()){
  21.                 String message = scanner.nextLine();
  22.                 //channel.basicPublish方法将消息发布到前面声明的交换机中。
  23.                 // 注意,这里的routingKey(即第二个参数)为空字符串"",因为对于fanout类型的交换机来说,routingKey是不起作用的。
  24.                 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
  25.                 System.out.println(" [x] Sent '" + message + "'");
  26.             }
  27.             }
  28.     }
  29. }
复制代码
接收:

注意肯定要创建队列,不然只有交换机没用
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. public class ReceiveLogs {
  6.     private static final String EXCHANGE_NAME = "fanout-exchange";
  7.     public static void main(String[] argv) throws Exception {
  8.         ConnectionFactory factory = new ConnectionFactory();
  9.         factory.setHost("localhost");
  10.         Connection connection = factory.newConnection();
  11.         //创建通道
  12.         Channel channel1 = connection.createChannel();
  13.         Channel channel2 = connection.createChannel();
  14.         //声明交换机
  15.         channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
  16.         channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
  17.         //队列的名字
  18.         String queueName = "星星";
  19.         //创建队列
  20.         channel1.queueDeclare(queueName, true, false, false, null);
  21.         channel1.queueBind(queueName, EXCHANGE_NAME, "");
  22.         String queueName1 = "晨晨";
  23.         //创建队列
  24.         channel2.queueDeclare(queueName1, true, false, false, null);
  25.         channel2.queueBind(queueName1, EXCHANGE_NAME, "");
  26.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  27.         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  28.             String message = new String(delivery.getBody(), "UTF-8");
  29.             System.out.println(" [1] Received '" + message + "'");
  30.         };
  31.         DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
  32.             String message = new String(delivery.getBody(), "UTF-8");
  33.             System.out.println(" [2] Received '" + message + "'");
  34.         };
  35.         channel1.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  36.         channel2.basicConsume(queueName1, true, deliverCallback2, consumerTag -> { });
  37.     }
  38. }
复制代码



可以看到fanout范例,生产者发送一个消息,所有的斲丧者都能接收到,这个范例不消设置路由
02注解形式的:

消息发送

先在配置类中声明交换机,队列,以及绑定关系
  1.    public static final String FANOUT_QUEUE_1 = "fanout.queue.1";
  2.     public static final String FANOUT_QUEUE_2 = "fanout.queue.2";
  3.     public static final String FANOUT_EXCHANGE = "fanout.exchange";
  4.     @Bean
  5.     public Queue fanoutQueue1() {
  6.         return new Queue(FANOUT_QUEUE_1);
  7.     }
  8.     @Bean
  9.     public Queue fanoutQueue2() {
  10.         return new Queue(FANOUT_QUEUE_2);
  11.     }
  12.     @Bean
  13.     public FanoutExchange fanoutExchange() {
  14.         return new FanoutExchange(FANOUT_EXCHANGE);
  15.     }
  16.     @Bean
  17.     public Binding binding1() {
  18.         return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
  19.     }
  20.     @Bean
  21.     public Binding binding2() {
  22.         return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
  23.     }
复制代码
Test类中添加测试方法:
  1. @Test
  2. public void testFanoutExchange() {
  3.     // 交换机名称
  4.     String exchangeName = "fanout.exchange";
  5.     // 消息
  6.     String message = "hello, everyone!";
  7.     rabbitTemplate.convertAndSend(exchangeName, "", message);
  8. }
复制代码
消息接收

在添加两个方法,作为斲丧者:
  1. @RabbitListener(queues = "fanout.queue.1")
  2. public void listenFanoutQueue1(String msg) {
  3.     System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
  4. }
  5. @RabbitListener(queues = "fanout.queue.2")
  6. public void listenFanoutQueue2(String msg) {
  7.     System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
  8. }
复制代码

总结

交换机的作用是什么?

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都斲丧。但是,在某些场景下,我们希望不同的消息被不同的队列斲丧。这时就要用到Direct范例的Exchange。

在Direct模型下:

此处我省略了官方文档那种,直接springboot注解那种的,而且不再bean。而是全注解那种的,也是对黑马代码的进一步优化,不手动操作添加
下面是黑马的案例
案例需求如图

先在配置类加入下面的然后启动一下:
  1.     /** 基于注解的来声明交换机和队列及其绑定关系 */
  2.     @RabbitListener( bindings = @QueueBinding(
  3.             exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),
  4.             value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue1"),
  5.             key = {"red", "blue"}
  6.     ))
  7.     public void rabbitListener5(String message) {
  8.         System.out.println("红蓝: " + message);
  9.     }
  10.     @RabbitListener( bindings = @QueueBinding(
  11.             exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),
  12.             value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue2"),
  13.             key = {"yellow","red"}
  14.     ))
  15.     public void rabbitListener6(String message) {
  16.         System.out.println("黄红: " + message);
  17.     }
复制代码
消息发送

在Test类中添加测试方法:
  1.     @Test
  2.     public void testSendDirectExchange() {
  3.         // 交换机名称
  4.         String exchangeName = "heima.direct";
  5.         // 消息
  6.         String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
  7.         // 发送消息
  8.         rabbitTemplate.convertAndSend(exchangeName, "red", message);
  9.     }
  10.     @Test
  11.     public void testSendDirectExchange01() {
  12.         // 交换机名称
  13.         String exchangeName = "heima.direct";
  14.         // 消息
  15.         String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
  16.         // 发送消息
  17.         rabbitTemplate.convertAndSend(exchangeName, "blue", message);
  18.     }
复制代码
消息接收:

  1.     @RabbitListener(queues = "direct.queue1")
  2.     public void listenDirectQueue1(String msg) {
  3.         System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
  4.     }
  5.     @RabbitListener(queues = "direct.queue2")
  6.     public void listenDirectQueue2(String msg) {
  7.         System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
  8.     }
复制代码
先点击测试的谁人red的运行一下,在启动项目:

我们再切换为blue这个key:
你会发现,只有斲丧者1收到了消息:

###总结
描述下Direct交换机与Fanout交换机的差别?


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4