RabbitMQ之五种消息模型

打印 上一主题 下一主题

主题 892|帖子 892|积分 2676

1、 情况准备

创建Virtual Hosts

虚拟主机:雷同于mysql中的database。他们都是以“/”开头

设置权限




2. 五种消息模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不外举行路由的方式不同。

依赖:
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.         <modelVersion>4.0.0</modelVersion>
  4.         <groupId>cn.atguigu.rabbitmq</groupId>
  5.         <artifactId>atguigu-rabbitmq</artifactId>
  6.         <version>0.0.1-SNAPSHOT</version>
  7.         <parent>
  8.                 <groupId>org.springframework.boot</groupId>
  9.                 <artifactId>spring-boot-starter-parent</artifactId>
  10.                 <version>2.3.0.RELEASE</version>
  11.         </parent>
  12.         <properties>
  13.                 <java.version>1.8</java.version>
  14.         </properties>
  15.         <dependencies>
  16.                 <dependency>
  17.                         <groupId>org.apache.commons</groupId>
  18.                         <artifactId> /artifactId>
  19.                         <version>3.3.2</version>
  20.                 </dependency>
  21.                 <dependency>
  22.                         <groupId>org.springframework.boot</groupId>
  23.                         <artifactId>spring-boot-starter-amqp</artifactId>
  24.                 </dependency>
  25.                 <dependency>
  26.                         <groupId>org.springframework.boot</groupId>
  27.                         <artifactId>spring-boot-starter-test</artifactId>
  28.                 </dependency>
  29.         </dependencies>
  30. </project>
复制代码
我们抽取一个建立RabbitMQ连接的工具类,方便其他程序获取连接:
  1. import com.rabbitmq.client.Connection;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. public class ConnectionUtil {
  4.     public static Connection getConnection() throws Exception {
  5.         //定义连接工厂
  6.         ConnectionFactory factory = new ConnectionFactory();
  7.         //设置服务地址
  8.         factory.setHost("192.168.1.129");
  9.         //端口
  10.         factory.setPort(5672);
  11.         //设置账号信息,用户名、密码、vhost
  12.         factory.setVirtualHost("/zhenguo");
  13.         factory.setUsername("anni");
  14.         factory.setPassword("123456");
  15.         // 通过工程获取连接
  16.         Connection connection = factory.newConnection();
  17.         return connection;
  18.     }
  19. }
复制代码
2.1. 根本消息模型-simple

特点:一个生产者,一个消费者,一个队列
2.1.1. 生产者发送消息

  1. public class Send {
  2.     private final static String QUEUE_NAME = "simple_queue";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接以及mq通道
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 从连接中创建通道,这是完成大部分API的地方。
  7.         Channel channel = connection.createChannel();
  8.         // 声明(创建)队列,必须声明队列才能够发送消息,我们可以把消息发送到队列中。
  9.         /**
  10.          * 参数1:queue,队列名称
  11.          * 参数2:durable,是否持久化
  12.          * 参数3:exclusive,队列是否为专用队列,如果是专用队列断开连接后会自动删除
  13.          * 参数4:autoDelete,队列长时间闲置时是否需要删除
  14.          * 参数5:arguments,队列的其他参数
  15.          */
  16.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17.         // 消息内容
  18.         String message = "Hello World!";
  19.         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  20.         System.out.println(" [x] Sent '" + message + "'");
  21.         //关闭通道和连接
  22.         channel.close();
  23.         connection.close();
  24.     }
  25. }
复制代码
控制台:

2.1.2. 管理工具 中检察消息

进入队列页面,可以看到新建了一个队列:simple_queue

点击队列名称,进入详情页,可以检察消息:

在控制台检察消息并不会将消息消费,所以消息还在。
2.1.3. 消费者获取消息

  1. public class Recv {
  2.     private final static String QUEUE_NAME = "simple_queue";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 创建通道
  7.         Channel channel = connection.createChannel();
  8.         // 声明队列
  9.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10.         // 定义队列的消费者
  11.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  12.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  13.             @Override
  14.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  15.                     byte[] body) throws IOException {
  16.                 // body 即消息体
  17.                 String msg = new String(body);
  18.                 System.out.println(" [x] received : " + msg + "!");
  19.             }
  20.         };
  21.         // 监听队列,第二个参数:是否自动进行消息确认。阻塞等待获取队列中的消息
  22.         channel.basicConsume(QUEUE_NAME, true, consumer);
  23.     }
  24. }
复制代码
控制台:

这个时候,队列中的消息就没了:

我们发现,消费者已经获取了消息,但是程序没有制止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立刻打印.
2.1.4. 消息确认机制(ACK)

通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么题目来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没实行操纵就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK(Acknowledge character:确认字符),告知消息已经被接收。不外这种回执ACK分两种情况:


  • 自动ACK:消息一旦被接收,消费者自动发送ACK
  • 手动ACK:消息接收后,不会发送ACK,需要手动调用
大家以为哪种更好呢?
这需要看消息的重要性:


  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:
  1. public class Recv2 {
  2.     private final static String QUEUE_NAME = "simple_queue";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 创建通道
  7.         final Channel channel = connection.createChannel();
  8.         // 声明队列
  9.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10.         // 定义队列的消费者
  11.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  12.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  13.             @Override
  14.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  15.                     byte[] body) throws IOException {
  16.                 // body 即消息体
  17.                 String msg = new String(body);
  18.                 System.out.println(" [x] received : " + msg + "!");
  19.                 // 手动进行ACK
  20.                 channel.basicAck(envelope.getDeliveryTag(), false);
  21.             }
  22.         };
  23.         // 监听队列,第二个参数false,手动进行ACK
  24.         channel.basicConsume(QUEUE_NAME, false, consumer);
  25.     }
  26. }
复制代码
注意到末了一行代码:
  1. channel.basicConsume(QUEUE_NAME, false, consumer);
复制代码
如果第二个参数为true,则会自动举行ACK;如果为false,则需要手动ACK。方法的声明:

2.1.4.1. 自动ACK存在的题目

修改消费者,添加异常,如下:

生产者不做任何修改,直接运行,消息发送成功:

运行消费者,程序抛出异常。但是消息依然被消费:

管理界面:

2.1.4.2. 演示手动ACK

修改消费者,把自动改成手动(去掉之前制造的异常)

生产者稳定,再次运行:

运行消费者

但是,检察守理界面,发现:

停掉消费者的程序,发现:

这是因为固然我们设置了手动ACK,但是代码中并没有举行消息确认!所以消息并未被真正消费掉。
当我们关掉这个消费者,消息的状态再次称为Ready
修改代码手动ACK:

实行:

消息消费成功!
2.2. work消息模型

工作队列或者竞争消费者模式
特点:多个消费者,一个生产者,一个队列

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。
工作队列,又称任务队列。主要思想就是避免实行资源密集型任务时,必须等待它实行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并终极实行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取
这个概念在Web应用程序中特殊有用,因为在短的HTTP哀求窗口中无法处置惩罚复杂的任务。
接下来我们来模拟这个流程:
  1. P:生产者:任务的发布者
  2. C1:消费者,领取任务并且完成任务,假设完成速度较快
  3. C2:消费者2:领取任务并完成任务,假设完成速度慢
复制代码
面试题:避免消息堆积?
1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。
2.2.1. 生产者

生产者与案例1中的几乎一样:
  1. public class Send {
  2.     private final static String QUEUE_NAME = "test_work_queue";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 获取通道
  7.         Channel channel = connection.createChannel();
  8.         // 声明队列
  9.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10.         // 循环发布任务
  11.         for (int i = 0; i < 50; i++) {
  12.             // 消息内容
  13.             String message = "task .. " + i;
  14.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  15.             System.out.println(" [x] Sent '" + message + "'");
  16.             Thread.sleep(i * 2);
  17.         }
  18.         // 关闭通道和连接
  19.         channel.close();
  20.         connection.close();
  21.     }
  22. }
复制代码
不外这里我们是循环发送50条消息。
2.2.2. 消费者1

  1. public class Recv1{
  2.     private final static String QUEUE_NAME = "test_work_queue";
  3. public static void main(String[] argv) throws Exception {
  4.     // 获取到连接
  5.     Connection connection = ConnectionUtil.getConnection();
  6.     // 创建通道
  7.     Channel channel = connection.createChannel();
  8.     // 声明队列
  9.     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10.     // 定义队列的消费者
  11.     DefaultConsumer consumer = new DefaultConsumer(channel) {
  12.         @Override
  13.         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  14.             String msg = new String(body);
  15.             System.out.println(" [消费者1] received : " + msg + "!");
  16.             channel.basicAck(envelope.getDeliveryTag(),false);
  17.         }
  18.     };
  19.     channel.basicConsume(QUEUE_NAME, false ,consumer);
  20. }
  21. }
复制代码
2.2.3. 消费者2

  1. public class Recv2{
  2.     private final static String QUEUE_NAME = "test_work_queue";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 创建通道
  7.         Channel channel = connection.createChannel();
  8.         // 声明队列
  9.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10.         // 定义队列的消费者
  11.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  12.             @Override
  13.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  14.                 String msg = new String(body);
  15.                 System.out.println(" [消费者2] received : " + msg + "!");
  16.                 //模拟消耗时间
  17.                 try {
  18.                     Thread.sleep(1000);
  19.                 } catch (InterruptedException e) {
  20.                     e.printStackTrace();
  21.                 }
  22.                 channel.basicAck(envelope.getDeliveryTag(),false);
  23.             }
  24.         };
  25.         channel.basicConsume(QUEUE_NAME, false ,consumer);
  26.     }
  27. }
复制代码
与消费者1根本雷同,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:

可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。
2.2.4. 能者多劳

刚才的实现有题目吗?


  • 消费者2比消费者1的服从要低,一次任务的耗时较长
  • 然而两人终极消费的消息数量是一样的
  • 消费者1大量时间处于空闲状态,消费者2一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
在比较慢的消费者创建队列后我们可以使用basicQos方法参数prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处置惩罚并确认了前一个消息。 相反,它会将其分派给空闲的下一个工作人员。(注意:必须手动ack)

再次测试:

2.3. 订阅模型分类

在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事变 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。
订阅模型示意图:

解读:
1、1个生产者,多个消费者
2、每一个消费者都有本身的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,颠末交换机到达队列,实现一个消息被多个消费者获取的目标
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处置惩罚消息,比方递交给某个特殊队列、递交给全部队列、或是将消息抛弃。到底如何操纵,取决于Exchange的范例。
Exchange范例有以下几种:
  1. Fanout:广播,将消息交给所有绑定到交换机的队列
  2. Direct:定向,把消息交给符合指定routing key 的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
复制代码
我们这里先学习
  1. Fanout:广播模式
复制代码
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
2.4. 订阅模型-Fanout

Fanout,也称为广播。
流程图:

在广播模式下,消息发送流程是这样的:


  • 1) 可以有多个消费者
  • 2) 每个消费者有本身的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的全部队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
2.4.1. 生产者

两个变化:


  • 1) 声明Exchange,不再声明Queue
  • 2) 发送消息到Exchange,不再发送到Queue
  1. public class Send {
  2.     private final static String EXCHANGE_NAME = "fanout_exchange_test";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 获取通道
  7.         Channel channel = connection.createChannel();
  8.         
  9.         // 声明exchange,指定类型为fanout
  10.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  11.         
  12.         // 消息内容
  13.         String message = "Hello everyone";
  14.         // 发布消息到Exchange
  15.         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  16.         System.out.println(" [生产者] Sent '" + message + "'");
  17.         channel.close();
  18.         connection.close();
  19.     }
  20. }
复制代码
2.4.2. 消费者1

  1. public class Recv1 {
  2.     private final static String QUEUE_NAME = "fanout_exchange_queue_1";
  3.     private final static String EXCHANGE_NAME = "fanout_exchange_test";
  4.     public static void main(String[] argv) throws Exception {
  5.         // 获取到连接
  6.         Connection connection = ConnectionUtil.getConnection();
  7.         // 获取通道
  8.         Channel channel = connection.createChannel();
  9.         // 声明队列
  10.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.         // 绑定队列到交换机
  12.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  13.         // 定义队列的消费者
  14.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  15.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  16.             @Override
  17.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  18.                     byte[] body) throws IOException {
  19.                 // body 即消息体
  20.                 String msg = new String(body);
  21.                 System.out.println(" [消费者1] received : " + msg + "!");
  22.             }
  23.         };
  24.         // 监听队列,自动返回完成
  25.         channel.basicConsume(QUEUE_NAME, true, consumer);
  26.     }
  27. }
复制代码
要注意代码中:队列需要和交换机绑定
2.4.3. 消费者2

  1. public class Recv2 {
  2.     private final static String QUEUE_NAME = "fanout_exchange_queue_2";
  3.     private final static String EXCHANGE_NAME = "fanout_exchange_test";
  4.     public static void main(String[] argv) throws Exception {
  5.         // 获取到连接
  6.         Connection connection = ConnectionUtil.getConnection();
  7.         // 获取通道
  8.         Channel channel = connection.createChannel();
  9.         // 声明队列
  10.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.         // 绑定队列到交换机
  12.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  13.         
  14.         // 定义队列的消费者
  15.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  16.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  17.             @Override
  18.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  19.                     byte[] body) throws IOException {
  20.                 // body 即消息体
  21.                 String msg = new String(body);
  22.                 System.out.println(" [消费者2] received : " + msg + "!");
  23.             }
  24.         };
  25.         // 监听队列,手动返回完成
  26.         channel.basicConsume(QUEUE_NAME, true, consumer);
  27.     }
  28. }
复制代码
2.4.4. 测试

我们运行两个消费者,然后发送1条消息:


应用场景:笔墨直播
2.5. 订阅模型-Direct

有选择性的接收消息
在订阅模式中,生产者发布消息,全部消费者都可以获取全部消息。
在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 比方,我们只能将重要的错误消息引导到日志文件(以节流磁盘空间),同时仍旧可以或许在控制台上打印全部日志消息。
但是,在某些场景下,我们盼望不同的消息被不同的队列消费。这时就要用到Direct范例的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其地点队列指定了需要routing key 为 error 的消息
C2:消费者,其地点队列指定了需要routing key 为 info、error、warning 的消息
2.5.1. 生产者

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
  1. public class Send {
  2.     private final static String EXCHANGE_NAME = "direct_exchange_test";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 获取通道
  7.         Channel channel = connection.createChannel();
  8.         // 声明exchange,指定类型为direct
  9.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  10.         // 消息内容
  11.         String message = "商品新增了, id = 1001";
  12.         // 发送消息,并且指定routing key 为:insert ,代表新增商品
  13.         channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
  14.         System.out.println(" [商品服务:] Sent '" + message + "'");
  15.         channel.close();
  16.         connection.close();
  17.     }
  18. }
复制代码
2.5.2. 消费者1

我们此处假设消费者1只接收两种范例的消息:更新商品和删除商品。
  1. public class Recv {
  2.     private final static String QUEUE_NAME = "direct_exchange_queue_1";
  3.     private final static String EXCHANGE_NAME = "direct_exchange_test";
  4.     public static void main(String[] argv) throws Exception {
  5.         // 获取到连接
  6.         Connection connection = ConnectionUtil.getConnection();
  7.         // 获取通道
  8.         Channel channel = connection.createChannel();
  9.         // 声明队列
  10.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.         
  12.         // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
  13.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  14.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  15.         // 定义队列的消费者
  16.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  17.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  20.                     byte[] body) throws IOException {
  21.                 // body 即消息体
  22.                 String msg = new String(body);
  23.                 System.out.println(" [消费者1] received : " + msg + "!");
  24.             }
  25.         };
  26.         // 监听队列,自动ACK
  27.         channel.basicConsume(QUEUE_NAME, true, consumer);
  28.     }
  29. }
复制代码
2.5.3. 消费者2

我们此处假设消费者2接收全部范例的消息:新增商品,更新商品和删除商品。
  1. public class Recv2 {
  2.     private final static String QUEUE_NAME = "direct_exchange_queue_2";
  3.     private final static String EXCHANGE_NAME = "direct_exchange_test";
  4.     public static void main(String[] argv) throws Exception {
  5.         // 获取到连接
  6.         Connection connection = ConnectionUtil.getConnection();
  7.         // 获取通道
  8.         Channel channel = connection.createChannel();
  9.         // 声明队列
  10.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.         
  12.         // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
  13.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
  14.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  15.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  16.         // 定义队列的消费者
  17.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  18.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  19.             @Override
  20.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  21.                     byte[] body) throws IOException {
  22.                 // body 即消息体
  23.                 String msg = new String(body);
  24.                 System.out.println(" [消费者2] received : " + msg + "!");
  25.             }
  26.         };
  27.         // 监听队列,自动ACK
  28.         channel.basicConsume(QUEUE_NAME, true, consumer);
  29.     }
  30. }
复制代码
2.5.4. 测试

我们分别发送增、删、改的RoutingKey,发现结果:

2.6. 订阅模型-Topic

Topic范例的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不外Topic范例Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词构成,多个单词之间以”.”分割,比方: item.insert
通配符规则:
  1. `#`:匹配一个或多个词
  2. `*`:匹配恰好1个词
复制代码
举例:
  1. `audit.#`:能够匹配`audit.irs.corporate` 或者 `audit.irs`
  2. `audit.*`:只能匹配`audit.irs`
复制代码
2.6.1. 生产者

使用topic范例的Exchange,发送消息的routing key有3种: item.isnert、item.update、item.delete:
  1. public class Send {
  2.     private final static String EXCHANGE_NAME = "topic_exchange_test";
  3.     public static void main(String[] argv) throws Exception {
  4.         // 获取到连接
  5.         Connection connection = ConnectionUtil.getConnection();
  6.         // 获取通道
  7.         Channel channel = connection.createChannel();
  8.         // 声明exchange,指定类型为topic
  9.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  10.         // 消息内容
  11.         String message = "新增商品 : id = 1001";
  12.         // 发送消息,并且指定routing key 为:insert ,代表新增商品
  13.         channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
  14.         System.out.println(" [商品服务:] Sent '" + message + "'");
  15.         channel.close();
  16.         connection.close();
  17.     }
  18. }
复制代码
2.6.2. 消费者1

我们此处假设消费者1只接收两种范例的消息:更新商品和删除商品
  1. public class Recv {
  2.     private final static String QUEUE_NAME = "topic_exchange_queue_1";
  3.     private final static String EXCHANGE_NAME = "topic_exchange_test";
  4.     public static void main(String[] argv) throws Exception {
  5.         // 获取到连接
  6.         Connection connection = ConnectionUtil.getConnection();
  7.         // 获取通道
  8.         Channel channel = connection.createChannel();
  9.         // 声明队列
  10.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  11.         
  12.         // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
  13.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
  14.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
  15.         // 定义队列的消费者
  16.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  17.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  18.             @Override
  19.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  20.                     byte[] body) throws IOException {
  21.                 // body 即消息体
  22.                 String msg = new String(body);
  23.                 System.out.println(" [消费者1] received : " + msg + "!");
  24.             }
  25.         };
  26.         // 监听队列,自动ACK
  27.         channel.basicConsume(QUEUE_NAME, true, consumer);
  28.     }
  29. }
复制代码
2.6.3. 消费者2

我们此处假设消费者2接收全部范例的消息:新增商品,更新商品和删除商品。
  1. /**
  2. * 消费者2
  3. */
  4. public class Recv2 {
  5.     private final static String QUEUE_NAME = "topic_exchange_queue_2";
  6.     private final static String EXCHANGE_NAME = "topic_exchange_test";
  7.     public static void main(String[] argv) throws Exception {
  8.         // 获取到连接
  9.         Connection connection = ConnectionUtil.getConnection();
  10.         // 获取通道
  11.         Channel channel = connection.createChannel();
  12.         // 声明队列
  13.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14.         
  15.         // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
  16.         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
  17.         // 定义队列的消费者
  18.         DefaultConsumer consumer = new DefaultConsumer(channel) {
  19.             // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  20.             @Override
  21.             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  22.                     byte[] body) throws IOException {
  23.                 // body 即消息体
  24.                 String msg = new String(body);
  25.                 System.out.println(" [消费者2] received : " + msg + "!");
  26.             }
  27.         };
  28.         // 监听队列,自动ACK
  29.         channel.basicConsume(QUEUE_NAME, true, consumer);
  30.     }
  31. }
复制代码
2.7. 持久化

如何避免消息丢失?
1) 消费者的ACK机制。可以防止消费者丢失消息。
2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。
是可以将消息举行持久化呢?
要将消息持久化,前提是:队列、Exchange都持久化
2.7.1. 交换机持久化


2.7.2. 队列持久化


2.7.3. 消息持久化

  1. MessageProperties:使用rabbitmq包下的类
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

羊蹓狼

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表