ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ工作模式详解,以及Java实现 [打印本页]

作者: 立山    时间: 2024-9-6 05:28
标题: RabbitMQ工作模式详解,以及Java实现
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、长途调用模式(RPC,不常用,不做讲讲授明)
   一、简单模式(Simple)

特点:①一个生产者对应一个消耗者,通过队列举行消息传递。
           ②该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
图解:

二、工作队列模式(Work Queue)

与简单模式相比,工作队列模式(Work Queue)多了一些消耗者,该模式也使用direct交换机,应用于处置惩罚消息较多的环境。特点如下:
①一个队列对应多个消耗者。
②一条消息只会被一个消耗者消耗。
③消息队列默认采用轮询的方式将消息均匀发送给消耗者。
图解:

三、发布订阅模式(Publish/Subscribe)

在开发过程中,有一些消息需要不同消耗者举行不同的处置惩罚
特点:
①生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
②工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换性能将消息发送给多个队列。发布订阅模式使用fanout交换机。
图解

四、路由模式(Routing)

使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时间,不是所有消息都无差异的发布到所有队列中。特点:
①每个队列绑定路由关键字RoutingKey
②生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。
图解:

五、 通配符模式(Topics)


通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
①消息设置RoutingKey时,RoutingKey由多个单词构成,中央以.分割。
②队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
六、Java实现五种模式


   简单模式生产者
  1. package com.tmh.mq.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * rabbitmq工作模式:简单模式,消息生产者
  9. */
  10. public class Producer {
  11.     public static void main(String[] args) throws IOException, TimeoutException {
  12.         //1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机
  13.         ConnectionFactory connectionFactory = new ConnectionFactory();
  14.         connectionFactory.setHost("192.168.66.8");
  15.         connectionFactory.setPort(5672);
  16.         connectionFactory.setUsername("tangminghao");
  17.         connectionFactory.setPassword("tangminghao");
  18.         connectionFactory.setVirtualHost("/");
  19.         //2、创建连接
  20.          Connection connection = connectionFactory.newConnection();
  21.         //3、创建信道
  22.          Channel channel = connection.createChannel();
  23.         //4、创建队列
  24.         /**
  25.          * 参数一:队列名称
  26.          * 参数二:是否持久化,true表示当MQ重启后队列还在
  27.          * 参数三:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
  28.          * 参数四:是否自动删除
  29.          * 参数五:其他参数
  30.          */
  31.         channel.queueDeclare("simplqueue", false,false,false,null);
  32.         //5、发送消息
  33.         String messges="hello simple queue";
  34.         /**
  35.          * 参数1:交换机名,""表示默认交换机
  36.          * 参数2:路由键,简单模式就是队列名
  37.          * 参数3:其他额外参数
  38.          * 参数4:要传递的消息字节数组
  39.          */
  40.         channel.basicPublish("","simplqueue",null,messges.getBytes());
  41.         //6、关闭信道和连接
  42.         channel.close();
  43.         connection.close();
  44.         System.out.println("消息发送成功");
  45.     }
  46. }
复制代码
简单模式消耗者
  1. package com.tmh.mq.simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. //simple队列消息消费者
  6. public class Consumer {
  7.     public static void main(String[] args) throws IOException, TimeoutException {
  8.         //1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机
  9.         ConnectionFactory connectionFactory = new ConnectionFactory();
  10.         connectionFactory.setHost("192.168.66.8");
  11.         connectionFactory.setPort(5672);
  12.         connectionFactory.setUsername("tangminghao");
  13.         connectionFactory.setPassword("tangminghao");
  14.         connectionFactory.setVirtualHost("/");
  15.         //2、创建连接
  16.         Connection connection = connectionFactory.newConnection();
  17.         //3、创建信道
  18.         Channel channel = connection.createChannel();
  19.         //4、监听队列
  20.         /**
  21.          * 参数1:监听的队列名
  22.          * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
  23.          * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
  24.          */
  25.         channel.basicConsume("simplqueue",true,new DefaultConsumer(channel){
  26.             @Override
  27.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28.                  String messge = new String(body, "UTF-8");
  29.                 System.out.println("接收消息,消息为:"+messge);
  30.             }
  31.         });
  32.     }
  33. }
复制代码

   工作队列模式生产者
  1. package com.tmh.mq.work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. //工作队列模式
  9. public class Producer {
  10.     public static void main(String[] args) throws IOException, TimeoutException {
  11.         //创建工厂连接
  12.         ConnectionFactory connectionFactory=new ConnectionFactory();
  13.         connectionFactory.setHost("192.168.66.8");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("tangminghao");
  16.         connectionFactory.setPassword("tangminghao");
  17.         connectionFactory.setVirtualHost("/");
  18.         //建立连接
  19.          Connection connection = connectionFactory.newConnection();
  20.          //创建信道
  21.          Channel channel = connection.createChannel();
  22.          //创建队列,如果队列已经存在,则使用该队列
  23.         channel.queueDeclare("workqueue",true,false,false,null);
  24.         //发送大量消息
  25.         for (int i = 0; i <100 ; i++) {
  26.             channel.basicPublish("","workqueue",
  27.                     MessageProperties.PERSISTENT_TEXT_PLAIN,
  28.                     ("hello 这是今天的第"+(i+1)+"条消息").getBytes());
  29.         }
  30.         //关闭资源
  31.         channel.close();
  32.         connection.close();
  33.     }
  34. }
复制代码
 工作模式消耗者
  1. package com.tmh.mq.work;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer1 {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         //创建工厂连接
  8.         ConnectionFactory connectionFactory=new ConnectionFactory();
  9.         connectionFactory.setHost("192.168.66.8");
  10.         connectionFactory.setPort(5672);
  11.         connectionFactory.setUsername("tangminghao");
  12.         connectionFactory.setPassword("tangminghao");
  13.         connectionFactory.setVirtualHost("/");
  14.         //建立连接
  15.         Connection connection = connectionFactory.newConnection();
  16.         //创建信道
  17.         Channel channel = connection.createChannel();
  18.         //监听队列
  19.         channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
  20.             @Override
  21.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22.                  String s = new String(body, "UTF-8");
  23.                 System.out.println("消费者1消费消息:"+s);
  24.             }
  25.         });
  26.     }
  27. }
  28. -----------------------------------------------------------------------------------------
  29. public class Consumer2 {
  30.     public static void main(String[] args) throws IOException, TimeoutException {
  31.         //创建工厂连接
  32.         ConnectionFactory connectionFactory=new ConnectionFactory();
  33.         connectionFactory.setHost("192.168.66.8");
  34.         connectionFactory.setPort(5672);
  35.         connectionFactory.setUsername("tangminghao");
  36.         connectionFactory.setPassword("tangminghao");
  37.         connectionFactory.setVirtualHost("/");
  38.         //建立连接
  39.         Connection connection = connectionFactory.newConnection();
  40.         //创建信道
  41.         Channel channel = connection.createChannel();
  42.         //监听队列
  43.         channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
  44.             @Override
  45.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46.                  String s = new String(body, "UTF-8");
  47.                 System.out.println("消费者2消费消息:"+s);
  48.             }
  49.         });
  50.     }
  51. }
  52. -----------------------------------------------------------------------------------------
  53. public class Consumer3 {
  54.     public static void main(String[] args) throws IOException, TimeoutException {
  55.         //创建工厂连接
  56.         ConnectionFactory connectionFactory=new ConnectionFactory();
  57.         connectionFactory.setHost("192.168.66.8");
  58.         connectionFactory.setPort(5672);
  59.         connectionFactory.setUsername("tangminghao");
  60.         connectionFactory.setPassword("tangminghao");
  61.         connectionFactory.setVirtualHost("/");
  62.         //建立连接
  63.         Connection connection = connectionFactory.newConnection();
  64.         //创建信道
  65.         Channel channel = connection.createChannel();
  66.         //监听队列
  67.         channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
  68.             @Override
  69.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  70.                  String s = new String(body, "UTF-8");
  71.                 System.out.println("消费者1消费消息:"+s);
  72.             }
  73.         });
  74.     }
  75. }
复制代码

   发布订阅模式生产者
  1. package com.tmh.mq.publish;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.concurrent.TimeoutException;
  9. public class Producer {
  10.     public static void main(String[] args) throws IOException, TimeoutException {
  11.         //1、创建连接工厂
  12.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  13.         connectionFactory.setHost("192.168.66.8");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("tangminghao");
  16.         connectionFactory.setPassword("tangminghao");
  17.         connectionFactory.setVirtualHost("/");
  18.         //2、建立连接
  19.         final Connection connection = connectionFactory.newConnection();
  20.         //3、建立信道
  21.         final Channel channel = connection.createChannel();
  22.         //4、创建交换机
  23.         /**
  24.          * 参数一:交换机名
  25.          * 参数二:交换机类型
  26.          * 参数三:交换机持久化
  27.          */
  28.         channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
  29.         //5、创建队列
  30.         channel.queueDeclare("MAIL_QUEUE", true,false,false,null);
  31.         channel.queueDeclare("MESSAGE_QUEUE", true,false,false,null);
  32.         channel.queueDeclare("STATION_QUEUE", true,false,false,null);
  33.         //6、队列绑定交换机
  34.         /**
  35.          * 参数一:队列名
  36.          * 参数二:交换机名字
  37.          * 参数三:路由关键字,发布订阅模式写“”即可
  38.          */
  39.         channel.queueBind("MAIL_QUEUE","exchange_fanout","");
  40.         channel.queueBind("MESSAGE_QUEUE","exchange_fanout","");
  41.         channel.queueBind("STATION_QUEUE","exchange_fanout","");
  42.         //7、发送消息
  43.         for (int i = 1; i <=10 ; i++) {
  44.             channel.basicPublish("exchange_fanout","",null,
  45.                     ("你好,发布订阅模式"+i).getBytes(StandardCharsets.UTF_8));
  46.         }
  47.         //8、关闭资源
  48.         channel.close();
  49.         connection.close();
  50.     }
  51. }
复制代码
 发布订阅模式消耗者
  1. package com.tmh.mq.publish;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ConsumerMail {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         //1、创建连接工厂
  8.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         connectionFactory.setHost("192.168.66.8");
  10.         connectionFactory.setPort(5672);
  11.         connectionFactory.setUsername("tangminghao");
  12.         connectionFactory.setPassword("tangminghao");
  13.         connectionFactory.setVirtualHost("/");
  14.         //2、建立连接
  15.         final Connection connection = connectionFactory.newConnection();
  16.         //3、建立信道
  17.         final Channel channel = connection.createChannel();
  18.         //4、监听队列
  19.         channel.basicConsume("MAIL_QUEUE",true,new DefaultConsumer(channel){
  20.             @Override
  21.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22.                 final String s = new String(body, "UTF-8");
  23.                 System.out.println("邮件消息:"+s);
  24.             }
  25.         });
  26.     }
  27. }
  28. -----------------------------------------------------------------------------------------
  29. public class ConsumerMessage {
  30.     public static void main(String[] args) throws IOException, TimeoutException {
  31.         //1、创建连接工厂
  32.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  33.         connectionFactory.setHost("192.168.66.8");
  34.         connectionFactory.setPort(5672);
  35.         connectionFactory.setUsername("tangminghao");
  36.         connectionFactory.setPassword("tangminghao");
  37.         connectionFactory.setVirtualHost("/");
  38.         //2、建立连接
  39.         final Connection connection = connectionFactory.newConnection();
  40.         //3、建立信道
  41.         final Channel channel = connection.createChannel();
  42.         //4、监听队列
  43.         channel.basicConsume("MESSAGE_QUEUE",true,new DefaultConsumer(channel){
  44.             @Override
  45.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46.                 final String s = new String(body, "UTF-8");
  47.                 System.out.println("短信消息:"+s);
  48.             }
  49.         });
  50.     }
  51. }
  52. -----------------------------------------------------------------------------------------
  53. public class ConsumerStation {
  54.     public static void main(String[] args) throws IOException, TimeoutException {
  55.         //1、创建连接工厂
  56.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  57.         connectionFactory.setHost("192.168.66.8");
  58.         connectionFactory.setPort(5672);
  59.         connectionFactory.setUsername("tangminghao");
  60.         connectionFactory.setPassword("tangminghao");
  61.         connectionFactory.setVirtualHost("/");
  62.         //2、建立连接
  63.         final Connection connection = connectionFactory.newConnection();
  64.         //3、建立信道
  65.         final Channel channel = connection.createChannel();
  66.         //4、监听队列
  67.         channel.basicConsume("STATION_QUEUE",true,new DefaultConsumer(channel){
  68.             @Override
  69.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  70.                 final String s = new String(body, "UTF-8");
  71.                 System.out.println("站内消息:"+s);
  72.             }
  73.         });
  74.     }
  75. }
复制代码


   路由模式生产者
  1. package com.tmh.mq.routing;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.concurrent.TimeoutException;
  9. public class Producer {
  10.     public static void main(String[] args) throws IOException, TimeoutException {
  11.         //1、创建连接工厂
  12.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  13.         connectionFactory.setHost("192.168.66.8");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("tangminghao");
  16.         connectionFactory.setPassword("tangminghao");
  17.         connectionFactory.setVirtualHost("/");
  18.         //2、建立连接
  19.         final Connection connection = connectionFactory.newConnection();
  20.         //3、建立信道
  21.         final Channel channel = connection.createChannel();
  22.         //4、创建交换机
  23.         /**
  24.          * 参数一:交换机名
  25.          * 参数二:交换机类型
  26.          * 参数三:交换机持久化
  27.          */
  28.         channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
  29.         //5、创建队列
  30.         channel.queueDeclare("MAIL_QUEUE2", true,false,false,null);
  31.         channel.queueDeclare("MESSAGE_QUEUE2", true,false,false,null);
  32.         channel.queueDeclare("STATION_QUEUE2", true,false,false,null);
  33.         //6、队列绑定交换机
  34.         /**
  35.          * 参数一:队列名
  36.          * 参数二:交换机名字
  37.          * 参数三:路由关键字,发布订阅模式写“”即可
  38.          */
  39.         channel.queueBind("MAIL_QUEUE2","exchange_routing","important");
  40.         channel.queueBind("MESSAGE_QUEUE2","exchange_routing","important");
  41.         channel.queueBind("STATION_QUEUE2","exchange_routing","important");
  42.         channel.queueBind("STATION_QUEUE2","exchange_routing","normal");
  43.         //7、发送消息
  44.             channel.basicPublish("exchange_routing","important",null,
  45.                     ("你好,路由模式,这是重要消息").getBytes(StandardCharsets.UTF_8));
  46.             channel.basicPublish("exchange_routing","normal",null,
  47.                     ("你好,路由模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8));
  48.         //8、关闭资源
  49.         channel.close();
  50.         connection.close();
  51.     }
  52. }
复制代码
路由模式消耗者
  1. package com.tmh.mq.routing;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ConsumerMail {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         //1、创建连接工厂
  8.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         connectionFactory.setHost("192.168.66.8");
  10.         connectionFactory.setPort(5672);
  11.         connectionFactory.setUsername("tangminghao");
  12.         connectionFactory.setPassword("tangminghao");
  13.         connectionFactory.setVirtualHost("/");
  14.         //2、建立连接
  15.         final Connection connection = connectionFactory.newConnection();
  16.         //3、建立信道
  17.         final Channel channel = connection.createChannel();
  18.         //4、监听队列
  19.         channel.basicConsume("MAIL_QUEUE2",true,new DefaultConsumer(channel){
  20.             @Override
  21.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22.                 final String s = new String(body, "UTF-8");
  23.                 System.out.println("邮件消息:"+s);
  24.             }
  25.         });
  26.     }
  27. }
  28. -----------------------------------------------------------------------------------------
  29. public class ConsumerMessage {
  30.     public static void main(String[] args) throws IOException, TimeoutException {
  31.         //1、创建连接工厂
  32.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  33.         connectionFactory.setHost("192.168.66.8");
  34.         connectionFactory.setPort(5672);
  35.         connectionFactory.setUsername("tangminghao");
  36.         connectionFactory.setPassword("tangminghao");
  37.         connectionFactory.setVirtualHost("/");
  38.         //2、建立连接
  39.         final Connection connection = connectionFactory.newConnection();
  40.         //3、建立信道
  41.         final Channel channel = connection.createChannel();
  42.         //4、监听队列
  43.         channel.basicConsume("MESSAGE_QUEUE2",true,new DefaultConsumer(channel){
  44.             @Override
  45.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46.                 final String s = new String(body, "UTF-8");
  47.                 System.out.println("短信消息:"+s);
  48.             }
  49.         });
  50.     }
  51. }
  52. -----------------------------------------------------------------------------------------
  53. public class ConsumerStation {
  54.     public static void main(String[] args) throws IOException, TimeoutException {
  55.         //1、创建连接工厂
  56.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  57.         connectionFactory.setHost("192.168.66.8");
  58.         connectionFactory.setPort(5672);
  59.         connectionFactory.setUsername("tangminghao");
  60.         connectionFactory.setPassword("tangminghao");
  61.         connectionFactory.setVirtualHost("/");
  62.         //2、建立连接
  63.         final Connection connection = connectionFactory.newConnection();
  64.         //3、建立信道
  65.         final Channel channel = connection.createChannel();
  66.         //4、监听队列
  67.         channel.basicConsume("STATION_QUEUE2",true,new DefaultConsumer(channel){
  68.             @Override
  69.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  70.                 final String s = new String(body, "UTF-8");
  71.                 System.out.println("站内消息:"+s);
  72.             }
  73.         });
  74.     }
  75. }
复制代码

   通配符模式 生产者
  1. package com.tmh.mq.topic;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.concurrent.TimeoutException;
  9. public class Producer {
  10.     public static void main(String[] args) throws IOException, TimeoutException {
  11.         //1、创建连接工厂
  12.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  13.         connectionFactory.setHost("192.168.66.8");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("tangminghao");
  16.         connectionFactory.setPassword("tangminghao");
  17.         connectionFactory.setVirtualHost("/");
  18.         //2、建立连接
  19.         final Connection connection = connectionFactory.newConnection();
  20.         //3、建立信道
  21.         final Channel channel = connection.createChannel();
  22.         //4、创建交换机
  23.         /**
  24.          * 参数一:交换机名
  25.          * 参数二:交换机类型
  26.          * 参数三:交换机持久化
  27.          */
  28.         channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
  29.         //5、创建队列
  30.         channel.queueDeclare("MAIL_QUEUE3", true,false,false,null);
  31.         channel.queueDeclare("MESSAGE_QUEUE3", true,false,false,null);
  32.         channel.queueDeclare("STATION_QUEUE3", true,false,false,null);
  33.         //6、队列绑定交换机
  34.         /**
  35.          * 参数一:队列名
  36.          * 参数二:交换机名字
  37.          * 参数三:路由关键字,发布订阅模式写“”即可
  38.          */
  39.         channel.queueBind("MAIL_QUEUE3","exchange_topic","#.mail.#");
  40.         channel.queueBind("MESSAGE_QUEUE3","exchange_topic","#.message.#");
  41.         channel.queueBind("STATION_QUEUE3","exchange_topic","#.station.#");
  42.         //7、发送消息
  43.             channel.basicPublish("exchange_topic","mail.message.station",null,
  44.                     ("你好,通配符模式,这是重要消息").getBytes(StandardCharsets.UTF_8));
  45.             channel.basicPublish("exchange_topic","station",null,
  46.                     ("你好,通配符模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8));
  47.     }
  48. }
复制代码
通配符模式消耗者
  1. package com.tmh.mq.topic;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ConsumerMail {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         //1、创建连接工厂
  8.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         connectionFactory.setHost("192.168.66.8");
  10.         connectionFactory.setPort(5672);
  11.         connectionFactory.setUsername("tangminghao");
  12.         connectionFactory.setPassword("tangminghao");
  13.         connectionFactory.setVirtualHost("/");
  14.         //2、建立连接
  15.         final Connection connection = connectionFactory.newConnection();
  16.         //3、建立信道
  17.         final Channel channel = connection.createChannel();
  18.         //4、监听队列
  19.         channel.basicConsume("MAIL_QUEUE3",true,new DefaultConsumer(channel){
  20.             @Override
  21.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22.                 final String s = new String(body, "UTF-8");
  23.                 System.out.println("邮件消息:"+s);
  24.             }
  25.         });
  26.     }
  27. }
  28. -----------------------------------------------------------------------------------------
  29. public class ConsumerMessage {
  30.     public static void main(String[] args) throws IOException, TimeoutException {
  31.         //1、创建连接工厂
  32.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  33.         connectionFactory.setHost("192.168.66.8");
  34.         connectionFactory.setPort(5672);
  35.         connectionFactory.setUsername("tangminghao");
  36.         connectionFactory.setPassword("tangminghao");
  37.         connectionFactory.setVirtualHost("/");
  38.         //2、建立连接
  39.         final Connection connection = connectionFactory.newConnection();
  40.         //3、建立信道
  41.         final Channel channel = connection.createChannel();
  42.         //4、监听队列
  43.         channel.basicConsume("MESSAGE_QUEUE3",true,new DefaultConsumer(channel){
  44.             @Override
  45.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46.                 final String s = new String(body, "UTF-8");
  47.                 System.out.println("短信消息:"+s);
  48.             }
  49.         });
  50.     }
  51. }
  52. -----------------------------------------------------------------------------------------
  53. public class ConsumerStation {
  54.     public static void main(String[] args) throws IOException, TimeoutException {
  55.         //1、创建连接工厂
  56.         final ConnectionFactory connectionFactory = new ConnectionFactory();
  57.         connectionFactory.setHost("192.168.66.8");
  58.         connectionFactory.setPort(5672);
  59.         connectionFactory.setUsername("tangminghao");
  60.         connectionFactory.setPassword("tangminghao");
  61.         connectionFactory.setVirtualHost("/");
  62.         //2、建立连接
  63.         final Connection connection = connectionFactory.newConnection();
  64.         //3、建立信道
  65.         final Channel channel = connection.createChannel();
  66.         //4、监听队列
  67.         channel.basicConsume("STATION_QUEUE3",true,new DefaultConsumer(channel){
  68.             @Override
  69.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  70.                 final String s = new String(body, "UTF-8");
  71.                 System.out.println("站内消息:"+s);
  72.             }
  73.         });
  74.     }
  75. }
复制代码
在Java实现中有很多注释没有改过来,由于很多模式之间变化不大,所以就复制粘贴了




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4