ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ的五种消息模子 [打印本页]

作者: 大连全瓷种植牙齿制作中心    时间: 2024-10-29 16:57
标题: RabbitMQ的五种消息模子
相干RabbitMQ学习链接:
  1.初学RabbitMQ-CSDN博客
  2.安装RabbitMQ-CSDN博客
  3.SpringBoot整合RabbitMQ-CSDN博客
  RabbitMQ提供了多种消息模子,官网上第6种是RPC不属于常规的消息队列。 属于消息模子的是前5种:
 一对一模子

最根本的队列模子: 一个生产者发送消息到一个队列,一个消费者从队列中取消息。
操作步骤

1)启动Rabbitmq,在管理页面中创建用户admin 2)使用admin登录,然后创建假造主机myhost
创建队列,设置如下

案例代码

导入依赖
  1. <dependency>
  2.   <groupId>com.rabbitmq</groupId>
  3.   <artifactId>amqp-client</artifactId>
  4.   <version>3.4.1</version>
  5. </dependency>
复制代码
开辟工具类
  1. public class MQUtils {
  2.     public static final String QUEUE_NAME = "myqueue01";
  3.     public static final String QUEUE_NAME2 = "myqueue02";
  4.     public static final String EXCHANGE_NAME = "myexchange01";
  5.     public static final String EXCHANGE_NAME2 = "myexchange02";
  6.     public static final String EXCHANGE_NAME3 = "myexchange03";
  7.     /**
  8.      * 获得MQ的连接
  9.      * @return
  10.      * @throws IOException
  11.      */
  12.     public static Connection getConnection() throws IOException {
  13.         ConnectionFactory connectionFactory = new ConnectionFactory();
  14.         //配置服务器名、端口、虚拟主机名、登录账号和密码
  15.         connectionFactory.setHost("localhost");
  16.         connectionFactory.setPort(5672);
  17.         connectionFactory.setVirtualHost("myhost");
  18.         connectionFactory.setUsername("admin");
  19.         connectionFactory.setPassword("123456");
  20.         return connectionFactory.newConnection();
  21.     }
  22. }
复制代码
开辟生产者
  1. /**
  2. * 生产者,发送简单的消息到队列中
  3. */
  4. public class SimpleProducer {
  5.     public static void main(String[] args) throws IOException {
  6.         Connection connection = MQUtils.getConnection();
  7.         //创建通道
  8.         Channel channel = connection.createChannel();
  9.         //定义队列
  10.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  11.         String msg = "Hello World!";
  12.         //发布消息到队列
  13.         channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());
  14.         channel.close();
  15.         connection.close();
  16.     }
  17. }
复制代码
运行生产者代码,管理页面点进myqueue01,在GetMessages中可以看到消息

开辟消费者
  1. /**
  2. * 消费者,从队列中读取简单的消息
  3. */
  4. public class SimpleConsumer {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         //定义队列
  9.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  10.         //创建消费者
  11.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  12.         //消费者消费通道中的消息
  13.         channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
  14.         //读取消息
  15.         while(true){
  16.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  17.             System.out.println(new String(delivery.getBody()));
  18.         }
  19.     }
  20. }
复制代码
工作队列模子


工作队列,生产者将消息分发给多个消费者,假如生产者生产了100条消息,消费者1消费50条,消费者2消费50条。
案例代码

开辟生产者
  1. /**
  2.   多对多模式的生产者,会发送多条消息到队列中
  3. */
  4. public class WorkProductor {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  9.         for(int i = 0;i < 100;i++){
  10.             String msg = "Hello-->" + i;
  11.             channel.basicPublish("",MQUtils.QUEUE_NAME,null, msg.getBytes());
  12.             System.out.println("send:" + msg);
  13.             Thread.sleep(10);
  14.         }
  15.         channel.close();
  16.         connection.close();
  17.     }
  18. }
复制代码
开辟消费者1
  1. /**
  2. * 多对多模式的消费者1
  3. */
  4. public class WorkConsumer01 {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  9.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  10.         //消费者消费通道中的消息
  11.         channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
  12.         while(true){
  13.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  14.             System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
  15.             Thread.sleep(10);
  16.         }
  17.     }
  18. }
复制代码
开辟消费者2
  1. /**
  2. * 多对多模式的消费者2
  3. */
  4. public class WorkConsumer02 {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  9.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  10.         //消费者消费通道中的消息
  11.         channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
  12.         while(true){
  13.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  14.             System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
  15.             Thread.sleep(1000);
  16.         }
  17.     }
  18. }
复制代码
生产者发送100个消息,两个消费者分别读取了50条。 看消息内容,发现队列发送消息采用的是轮询方式,也就是先发给消费者1,再发给消费者2,依次往复。
能者多劳

上面案例中有一个问题:消费者处理消息的速度是不一样的,消费者1处理后睡眠10毫秒(Thread.sleep(10)),消费者2是1000毫秒,速度相差100倍,但是末了处理的消息数还是一样的。这样就存在服从问题:处理能力强的消费者得不到更多的消息。
因为队列默认采用是主动确认机制,消息发过去后就主动确认,队列不清楚每个消息详细什么时间处理完,以是平均分配消息数量。
实现能者多劳:
  1. /**
  2. 多对多模式的消费者1
  3. */
  4. public class WorkConsumer1 {
  5.    public static void main(String[] args) throws IOException, InterruptedException {
  6.        Connection connection = MQUtils.getConnection();
  7.        Channel channel = connection.createChannel();
  8.        channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  9.        //同一时刻服务器只发送一条消息给消费者
  10.        channel.basicQos(1);
  11.        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  12.        //true是自动返回完成状态,false表示手动
  13.        channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
  14.        while(true){
  15.            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  16.            System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));
  17.            Thread.sleep(10);
  18.            //手动确定返回状态,不写就是自动确认
  19.            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
  20.        }
  21.    }
  22. }
  23.    
  24. /**
  25. * 多对多模式的消费者2
  26. */
  27. public class WorkConsumer2 {
  28.    public static void main(String[] args) throws IOException, InterruptedException {
  29.        Connection connection = MQUtils.getConnection();
  30.        Channel channel = connection.createChannel();
  31.        channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  32.        //同一时刻服务器只发送一条消息给消费者
  33.        channel.basicQos(1);
  34.        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  35.        //true是自动返回完成状态,false表示手动
  36.        channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);
  37.        while(true){
  38.            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  39.            System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));
  40.            Thread.sleep(1000);
  41.            //手动确定返回状态,不写就是自动确认
  42.            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
  43.        }
  44.    }
  45. }
复制代码
发布/订阅模子

发布/订阅模式和Work模式的区别是:Work模式只存在一个队列,多个消费者共同消费一个队列中的消息;而发布订阅模式存在多个队列,不同的消费者可以从各自的队列中处理完全相同的消息。
 操作步骤

实现步骤: 1) 创建交换机(Exchange)范例是fanout(扇出) 2) 交换机必要绑定不同的队列 3) 不同的消费者从不同的队列中得到消息 4) 生产者发送消息到交换机 5) 再由交换机将消息分发到多个队列
新建队列
新建交换机
点击交换机,在bindings里面绑定两个队列

案例代码

生产者
  1. /**
  2. * 发布和订阅模式的生产者,消息会通过交换机发到队列
  3. */
  4. public class PublishProductor {
  5.     public static void main(String[] args) throws IOException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         //声明fanout exchange
  9.         channel.exchangeDeclare(MQUtils.EXCHANGE_NAME,"fanout");
  10.         String msg = "Hello Fanout";
  11.         //发布消息到交换机
  12.         channel.basicPublish(MQUtils.EXCHANGE_NAME,"",null,msg.getBytes());
  13.         System.out.println("send:" + msg);
  14.         channel.close();
  15.         connection.close();
  16.     }
  17. }
复制代码
消费者1
  1. [/code] [code]/**
  2. * 发布订阅模式的消费者1
  3. * 两个消费者绑定的消息队列不同
  4. * 通过交换机一个消息能被不同队列的两个消费者同时获取
  5. * 一个队列可以有多个消费者,队列中的消息只能被一个消费者获取
  6. */
  7. public class SubscribeConsumer1 {
  8.     public static void main(String[] args) throws IOException, InterruptedException {
  9.         Connection connection = MQUtils.getConnection();
  10.         Channel channel = connection.createChannel();
  11.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  12.         //绑定队列1到交换机
  13.         channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME,"");
  14.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  15.         channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
  16.         while(true){
  17.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  18.             System.out.println("Consumer1 receive :" + new String(delivery.getBody()));
  19.         }
  20.     }
  21. }
复制代码
消费者2
  1. public class SubscribeConsumer2 {
  2.     public static void main(String[] args) throws IOException, InterruptedException {
  3.         Connection connection = MQUtils.getConnection();
  4.         Channel channel = connection.createChannel();
  5.         channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
  6.         //绑定队列2到交换机
  7.         channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME,"");
  8.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  9.         channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
  10.         while(true){
  11.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  12.             System.out.println("Consumer2 receive :" + new String(delivery.getBody()));
  13.         }
  14.     }
  15. }
复制代码
路由模子

路由模式的消息队列可以给队列绑定不同的key,生产者发送消息时,给消息设置不同的key,这样交换机在分发消息时,可以让消息路由到key匹配的队列中。 可以想象上图是一个日记处理系统,C1可以处理error日记消息,C2可以处理info\error\warining范例的日记消息,使用路由模式就很轻易实现了。
操作步骤

新建direct范例的交换机

案例代码

生产者,给myqueue01绑定了key:error,myqueue02绑定了key:debug,然后发送了key:error的消息
  1. /**
  2.   路由模式的生产者,发布消息会有特定的Key,消息会被绑定特定Key的消费者获取
  3. */
  4. public class RouteProductor {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         //声明交换机类型为direct
  9.         channel.exchangeDeclare(MQUtils.EXCHANGE_NAME2,"direct");
  10.         String msg = "Hello-->Route";
  11.         //绑定队列1到交换机,指定了Key为error
  12.         channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME2,"error");
  13.         //绑定队列2到交换机,指定了Key为debug
  14.         channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME2,"debug");
  15.         //error是一个指定的Key
  16.         channel.basicPublish(MQUtils.EXCHANGE_NAME2,"error",null,msg.getBytes());
  17.         System.out.println("send:" + msg);
  18.         channel.close();
  19.         connection.close();
  20.     }
  21. }
复制代码
消费者1
  1. /**
  2. * 路由模式的消费者1
  3. * 可以指定Key,消费特定的消息
  4. */
  5. public class RouteConsumer1 {
  6.     public static void main(String[] args) throws IOException, InterruptedException {
  7.         Connection connection = MQUtils.getConnection();
  8.         Channel channel = connection.createChannel();
  9.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  10.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  11.         channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
  12.         while(true){
  13.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  14.             System.out.println("RouteConsumer1 receive :" + new String(delivery.getBody()));
  15.         }
  16.     }
  17. }
复制代码
消费者2
  1. /**
  2. * 路由模式的消费者2
  3. * 可以指定Key,消费特定的消息
  4. */
  5. public class RouteConsumer2 {
  6.     public static void main(String[] args) throws IOException, InterruptedException {
  7.         Connection connection = MQUtils.getConnection();
  8.         Channel channel = connection.createChannel();
  9.         channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
  10.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  11.         channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
  12.         while(true){
  13.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  14.             System.out.println("RouteConsumer2 receive :" + new String(delivery.getBody()));
  15.         }
  16.     }
  17. }
复制代码
主题模子

主题模式和路由模式差不多,在key中可以参加通配符:

案例代码

生产者代码
  1. /**
  2.   主题模式的生产者
  3. */
  4. public class TopicProductor {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         //声明交换机类型为topic
  9.         channel.exchangeDeclare(MQUtils.EXCHANGE_NAME3,"topic");
  10.         //绑定队列到交换机,最后指定了Key
  11.         channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME3,"xray.#");
  12.         //绑定队列到交换机,最后指定了Key
  13.         channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME3,"*.*.cn");
  14.         String msg = "Hello-->Topic";
  15.         channel.basicPublish(MQUtils.EXCHANGE_NAME3,"rabbit.com.cn",null,msg.getBytes());
  16.         System.out.println("send:" + msg);
  17.         channel.close();
  18.         connection.close();
  19.     }
  20. }
复制代码
消费者1
  1. /**
  2. * 主题模式的消费者1 ,类似路由模式,可以使用通配符对Key进行筛选
  3. *   #匹配1个或多个单词,*匹配一个单词
  4. */
  5. public class TopicConsumer1 {
  6.     public static void main(String[] args) throws IOException, InterruptedException {
  7.         Connection connection = MQUtils.getConnection();
  8.         Channel channel = connection.createChannel();
  9.         channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);
  10.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  11.         channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);
  12.         while(true){
  13.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  14.             System.out.println("TopicConsumer1 receive :" + new String(delivery.getBody()));
  15.         }
  16.     }
  17. }
复制代码
消费者2
  1. /**
  2. * 主题模式的消费者2
  3. */
  4. public class TopicConsumer2 {
  5.     public static void main(String[] args) throws IOException, InterruptedException {
  6.         Connection connection = MQUtils.getConnection();
  7.         Channel channel = connection.createChannel();
  8.         channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);
  9.         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  10.         channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);
  11.         while(true){
  12.             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  13.             System.out.println("TopicConsumer2 receive :" + new String(delivery.getBody()));
  14.         }
  15.     }
  16. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4