rabbitmq五种模式的总结
完整项目地点:https://github.com/9lucifer/rabbitmq4j-learning
一、简单模式
(一)简单模式概述
RabbitMQ 的简单模式是最根本的消息队列模式,包含以下两个角色:
- 生产者:负责发送消息到队列。
- 斲丧者:负责从队列中接收并处置处罚消息。
在简单模式中,消息的转达是单向的,生产者将消息发送到队列,斲丧者从队列中接收消息。
(二)生产者代码解析
代码
生产者负责创建消息并将其发送到指定的队列中。
- package top.miqiu._01_hello;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("ip(要换成真实的ip哦)");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明队列
- /**
- * 参数说明:
- * 1. 队列名称:01-hello2
- * 2. 是否持久化:true(重启后队列仍然存在)
- * 3. 是否独占队列:false(允许多个消费者连接)
- * 4. 是否自动删除:false(队列不会自动删除)
- * 5. 额外参数:null
- */
- channel.queueDeclare("01-hello2", true, false, false, null);
- // 6. 发送消息
- /**
- * 参数说明:
- * 1. 交换机名称:空字符串(使用默认交换机)
- * 2. 路由键:队列名称(01-hello2)
- * 3. 额外属性:null
- * 4. 消息内容:字节数组
- */
- channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());
- System.out.println("消息发送成功");
- // 7. 关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 结果
(三)斲丧者代码解析
代码
斲丧者负责从队列中接收并处置处罚消息。
- package top.miqiu._01_hello_c;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("ip(要换成真实的ip哦");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明队列(需与生产者保持一致)
- channel.queueDeclare("01-hello2", false, false, false, null);
- // 6. 接收消息
- /**
- * 参数说明:
- * 1. 队列名称:01-hello2
- * 2. 是否自动确认:true(消息被消费后自动确认)
- * 3. 消息处理回调:DeliverCallback
- * 4. 消息取消回调:CancelCallback
- */
- channel.basicConsume("01-hello2", true, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- System.out.println("接收到消息:" + new String(delivery.getBody()));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- System.out.println("消息被取消");
- }
- });
- }
- }
复制代码 结果
在mq中查看
(四)总结
- 简单模式:实用于一对一的简单消息转达场景。
- 生产者:负责创建队列并发送消息。
- 斲丧者:负责从队列中接收并处置处罚消息。
- 留意事项:
- 队列名称需保持同等,不然肯定会报错!
- 消息确认机制需根据业务需求选择自动或手动确认。
- 使用完资源后需显式关闭 Channel 和 Connection。
二、工作模式
(一)工作模式概述
工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个斲丧者。它的特点是:
- 一个生产者:负责发送消息到队列。
- 多个斲丧者:共同斲丧同一个队列中的消息。
- 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给斲丧者。
工作模式实用于任务分发场景,例如将耗时的任务分发给多个 Worker 处置处罚。
(二)生产者代码解析
生产者负责创建消息并将其发送到指定的队列中。
- package top.miqiu._02_work;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("你的ip!别忘了改");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明队列
- /**
- * 参数说明:
- * 1. 队列名称:02-work1
- * 2. 是否持久化:true(重启后队列仍然存在)
- * 3. 是否独占队列:false(允许多个消费者连接)
- * 4. 是否自动删除:false(队列不会自动删除)
- * 5. 额外参数:null
- */
- channel.queueDeclare("02-work1", true, false, false, null);
- // 6. 发送消息
- for (int i = 0; i < 20; i++) {
- String message = "hello work:" + i;
- channel.basicPublish("", "02-work1", null, message.getBytes());
- }
- System.out.println("消息发送成功");
- // 7. 关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 关键点:
- 队列声明(queueDeclare):创建队列并设置队列属性。
- 消息发送(basicPublish):通过循环发送多条消息到队列。
- 持久化队列:设置为 true,确保队列在 RabbitMQ 重启后仍然存在。
(三)斲丧者代码解析
代码
斲丧者负责从队列中接收并处置处罚消息。
- package top.miqiu._02_work;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("你的ip!别忘了改");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明队列(需与生产者保持一致)
- channel.queueDeclare("02-work1", true, false, false, null);
- // 6. 设置每次只接收一条消息
- channel.basicQos(1);
- // 7. 接收消息
- /**
- * 参数说明:
- * 1. 队列名称:02-work1
- * 2. 是否自动确认:false(手动确认消息)
- * 3. 消息处理回调:DeliverCallback
- * 4. 消息取消回调:CancelCallback
- */
- channel.basicConsume("02-work1", false, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- try {
- // 模拟消息处理耗时
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));
- // 手动确认消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }, new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- System.out.println("消息被取消");
- }
- });
- }
- }
复制代码 关键点:
- 队列声明(queueDeclare):确保队列存在,需与生产者保持同等。
- 消息预取(basicQos):设置每次只接收一条消息,制止某个斲丧者处置处罚过多消息。
- 手动确认(basicAck):消息处置处罚完成后手动确认,确保消息不会丢失。
- 消息处置处罚耗时:通过 Thread.sleep(1000) 模拟消息处置处罚耗时。
效果
(四)工作模式的特点
- 消息分发机制:
- 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个斲丧者。
- 可以通过 basicQos 设置每次只接收一条消息,制止某个斲丧者处置处罚过多消息。
- 消息确认机制:
- 设置为手动确认(autoAck=false),确保消息处置处罚完成后才确认。(防止业务处置处罚失败的情况下丢失消息)
- 如果斲丧者在处置处罚消息时崩溃,未确认的消息会重新分发给其他斲丧者。
- 实用场景:
- 任务分发场景,例如将耗时的任务分发给多个 Worker 处置处罚。
(五)总结
- 工作模式:实用于任务分发场景,多个斲丧者共同斲丧同一个队列中的消息。
- 生产者:负责发送消息到队列。
- 斲丧者:负责接收并处置处罚消息,支持手动确认和消息预取。
- 留意事项:
- 队列名称需保持同等。
- 消息确认机制需根据业务需求选择自动或手动确认。
- 使用 basicQos 控制消息分发,制止某个斲丧者处置处罚过多消息。
三、发布订阅模式
(一)发布订阅模式概述
发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个斲丧者。它的特点是:
- 一个生产者:将消息发送到交换机(Exchange)。
- 多个斲丧者:每个斲丧者都有自己的队列,并与交换机绑定。
- 消息广播:交换机将消息广播给全部绑定的队列。
发布订阅模式实用于消息广播场景,例如日志系统、关照系统等。
(二)生产者代码解析
生产者负责创建消息并将其发送到指定的交换机中。
- package top.miqiu._03_pubsub;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("用自己的ip!!");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明交换机
- /**
- * 参数说明:
- * 1. 交换机名称:03-pubsub
- * 2. 交换机类型:fanout(广播模式)
- */
- channel.exchangeDeclare("03-pubsub", "fanout");
- // 6. 发送消息
- for (int i = 0; i < 20; i++) {
- String message = "hello work:" + i;
- /**
- * 参数说明:
- * 1. 交换机名称:03-pubsub
- * 2. 路由键:空字符串(fanout 模式忽略路由键)
- * 3. 消息属性:MessageProperties.TEXT_PLAIN
- * 4. 消息内容:字节数组
- */
- channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());
- }
- System.out.println("消息发送成功");
- // 7. 关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 关键点:
- 交换机声明(exchangeDeclare):创建交换机并设置范例为 fanout(广播模式)。
- 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(fanout 模式忽略路由键)。
- 消息广播:消息会被广播到全部绑定到该交换机的队列。
(三)斲丧者代码解析
代码
斲丧者负责从队列中接收并处置处罚消息。
- package top.miqiu._03_pubsub;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("用自己的ip!!");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明交换机
- channel.exchangeDeclare("03-pubsub", "fanout");
- // 6. 创建临时队列
- String queue = channel.queueDeclare().getQueue();
- // 7. 绑定队列到交换机
- /**
- * 参数说明:
- * 1. 队列名称:queue
- * 2. 交换机名称:03-pubsub
- * 3. 路由键:空字符串(fanout 模式忽略路由键)
- */
- channel.queueBind(queue, "03-pubsub", "");
- // 8. 接收消息
- /**
- * 参数说明:
- * 1. 队列名称:queue
- * 2. 是否自动确认:true(自动确认消息)
- * 3. 消息处理回调:DeliverCallback
- * 4. 消息取消回调:CancelCallback
- */
- channel.basicConsume(queue, true, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- try {
- // 模拟消息处理耗时
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- System.out.println("消息被取消");
- }
- });
- }
- }
复制代码 关键点:
- 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持同等。
- 暂时队列(queueDeclare):创建一个暂时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(fanout 模式忽略路由键)。
- 消息接收(basicConsume):从队列中接收消息并处置处罚。
结果
可以看到两个consumer都斲丧了相同的消息
(四)发布订阅模式的特点
- 消息广播:交换机将消息广播给全部绑定的队列。
- 暂时队列:斲丧者可以创建暂时队列,队列名称由 RabbitMQ 自动生成。
- 实用场景:
- 日志系统:将日志消息广播给多个斲丧者。
- 关照系统:将关照消息广播给多个用户。
(五)总结
- 发布订阅模式:实用于消息广播场景,多个斲丧者各自接收相同的消息。
- 生产者:负责将消息发送到交换机。
- 斲丧者:负责创建队列并绑定到交换机,接收并处置处罚消息。
- 留意事项:
- 交换机范例需设置为 fanout。
- 队列绑定到交换机时,路由键为空字符串。
- 暂时队列的名称由 RabbitMQ 自动生成。
(六)RabbitMQ 交换机范例总结
交换机范例描述路由举动实用场景Fanout广播模式,将消息发送到全部绑定到该交换机的队列。忽略路由键(Routing Key),消息会被广播到全部绑定的队列。日志系统、关照系统等需要广播消息的场景。Direct直接模式,根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。Topic主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。消息分类、多条件路由等需要机动匹配的场景。Headers头部模式,根据消息的头部属性(Headers)进行匹配。不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑,例如根据消息的元数据进行路由。 详细说明
1. Fanout 交换机(广播,常用)
- 特点:
- 消息会被广播到全部绑定到该交换机的队列。
- 忽略路由键(Routing Key)。
- 实用场景:
- 日志系统:将日志消息广播给多个斲丧者。
- 关照系统:将关照消息广播给多个用户。
2. Direct 交换机
- 特点:
- 消息的路由键必须与队列绑定的路由键完全匹配。
- 支持一对一或一对多的精确路由。
- 实用场景:
- 任务分发:将特定任务路由到特定的 Worker。
- 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
- 特点:
- 支持通配符匹配:
- 路由键的格式通常是点分字符串(如 user.create)。
- 实用场景:
- 消息分类:根据消息的主题进行路由。
- 多条件路由:支持机动的路由规则。
4. Headers 交换机
- 特点:
- 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
- 支持复杂的匹配规则(如 x-match 参数)。
- 实用场景:
- 复杂的路由逻辑:根据消息的元数据进行路由。
- 需要高度机动性的场景。
对比
场景FanoutDirectTopicHeaders日志广播全部斲丧者接收相同的日志消息。不实用。不实用。不实用。任务分发不实用。将任务路由到特定的 Worker。将任务分类路由到差别的 Worker。根据任务的元数据进行路由。关照系统全部效户接收相同的关照。特定用户接收特定关照。根据关照范例路由到差别用户。根据关照的元数据进行路由。消息分类不实用。不实用。根据消息主题进行路由。根据消息的头部属性进行路由。 总结
- Fanout:实用于广播场景。
- Direct:实用于精确路由场景。
- Topic:实用于机动的路由场景。
- Headers:实用于复杂的路由逻辑。
四、路由模式
(一)路由模式概述
路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:
- 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
- 多个斲丧者:每个斲丧者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
- 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
路由模式实用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。
(二)生产者代码解析
生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。
- package top.miqiu._04_routing;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("你的ip!!!");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明 Direct 交换机
- /**
- * 参数说明:
- * 1. 交换机名称:04-routing
- * 2. 交换机类型:direct
- */
- channel.exchangeDeclare("04-routing", "direct");
- // 6. 发送消息
- for (int i = 0; i < 20; i++) {
- String message = "hello work:" + i;
- /**
- * 参数说明:
- * 1. 交换机名称:04-routing
- * 2. 路由键:err(消息将发送到绑定 err 路由键的队列)
- * 3. 消息属性:MessageProperties.TEXT_PLAIN
- * 4. 消息内容:字节数组
- */
- channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());
- }
- System.out.println("消息发送成功");
- // 7. 关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 关键点:
- 交换机声明(exchangeDeclare):创建 Direct 交换机,范例为 direct。
- 消息发送(basicPublish):指定路由键(如 err),消息会被发送到绑定该路由键的队列。
- 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。
(三)斲丧者代码解析
代码
斲丧者负责创建队列并绑定到 Direct 交换机,同时指定路由键。
- package top.miqiu._04_routing;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("你的ip!!!");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明 Direct 交换机
- channel.exchangeDeclare("04-routing", "direct");
- // 6. 创建临时队列
- String queue = channel.queueDeclare().getQueue();
- // 7. 绑定队列到交换机,并指定路由键
- /**
- * 参数说明:
- * 1. 队列名称:queue
- * 2. 交换机名称:04-routing
- * 3. 路由键:info、err、waring
- */
- channel.queueBind(queue, "04-routing", "info");
- channel.queueBind(queue, "04-routing", "err");
- channel.queueBind(queue, "04-routing", "waring");
- // 8. 接收消息
- /**
- * 参数说明:
- * 1. 队列名称:queue
- * 2. 是否自动确认:true(自动确认消息)
- * 3. 消息处理回调:DeliverCallback
- * 4. 消息取消回调:CancelCallback
- */
- channel.basicConsume(queue, true, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- try {
- // 模拟消息处理耗时
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- System.out.println("消息被取消");
- }
- });
- }
- }
复制代码 关键点:
- 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持同等。
- 暂时队列(queueDeclare):创建一个暂时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如 info、err、waring)。
- 消息接收(basicConsume):从队列中接收消息并处置处罚。
效果
consumer1绑定了[info,err,waring],以是在producer绑定了info时发送消息的情况下,consumer1可以接收到信息
由于consumer2绑定的是trace,以是consumer2是接收不到消息的
(四)路由模式的特点
- 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
- 多路由键支持:一个队列可以绑定多个路由键,接收多种范例的消息。
- 实用场景:
- 日志级别分类:将差别级别的日志(如 info、err)路由到差别的队列。
- 任务分发:将特定任务路由到特定的 Worker。
(五)总结
- 路由模式:实用于需要根据路由键精确路由消息的场景。
- 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
- 斲丧者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
- 留意事项:
- 路由键必须完全匹配。
- 一个队列可以绑定多个路由键,接收多种范例的消息。
五、Topic 模式
(一)Topic 模式概述
Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:
- 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
- 多个斲丧者:每个斲丧者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
- 机动的路由:支持通配符匹配:
Topic 模式实用于需要根据复杂条件机动路由消息的场景,例如消息分类、多条件路由等。
(二)生产者代码解析
代码
生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。
- package top.miqiu._05_topic;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("用自己的ip!!");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明 Topic 交换机
- /**
- * 参数说明:
- * 1. 交换机名称:05-topic
- * 2. 交换机类型:topic
- */
- channel.exchangeDeclare("05-topic", "topic");
- // 6. 发送消息
- for (int i = 0; i < 20; i++) {
- String message = "hello work:" + i;
- /**
- * 参数说明:
- * 1. 交换机名称:05-topic
- * 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)
- * 3. 消息属性:MessageProperties.TEXT_PLAIN
- * 4. 消息内容:字节数组
- */
- channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());
- }
- System.out.println("消息发送成功");
- // 7. 关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 关键点:
- 交换机声明(exchangeDeclare):创建 Topic 交换机,范例为 topic。
- 消息发送(basicPublish):指定路由键(如 user.hi),消息会被发送到匹配的队列。
- 通配符匹配:
- * 匹配一个单词(如 user.* 匹配 user.hi,但不匹配 user.hi.there)。
- # 匹配零个或多个单词(如 user.# 匹配 user.hi 和 user.hi.there)。
(三)斲丧者代码解析
代码
斲丧者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
- package top.miqiu._05_topic;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer2 {
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
- connectionFactory.setHost("用自己的ip!!");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("admin");
- // 3. 创建连接对象
- Connection connection = connectionFactory.newConnection();
- // 4. 创建 Channel
- Channel channel = connection.createChannel();
- // 5. 声明 Topic 交换机
- channel.exchangeDeclare("05-topic", "topic");
- // 6. 创建临时队列
- String queue = channel.queueDeclare().getQueue();
- // 7. 绑定队列到交换机,并指定路由键模式
- /**
- * 参数说明:
- * 1. 队列名称:queue
- * 2. 交换机名称:05-topic
- * 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)
- */
- channel.queueBind(queue, "05-topic", "user.*");
- // 8. 接收消息
- /**
- * 参数说明:
- * 1. 队列名称:queue
- * 2. 是否自动确认:true(自动确认消息)
- * 3. 消息处理回调:DeliverCallback
- * 4. 消息取消回调:CancelCallback
- */
- channel.basicConsume(queue, true, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- try {
- // 模拟消息处理耗时
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- System.out.println("消息被取消");
- }
- });
- }
- }
复制代码 关键点:
- 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持同等。
- 暂时队列(queueDeclare):创建一个暂时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如 user.*)。
- 消息接收(basicConsume):从队列中接收消息并处置处罚。
效果
当我在producer使用“employee.hi”作为路由key的时间,绑定了“employee.*”的consumer1可以斲丧这个消息
(四)Topic 模式的特点
- 机动的路由:支持通配符匹配,可以根据复杂的条件路由消息。
- 多路由键支持:一个队列可以绑定多个路由键模式,接收多种范例的消息。
- 实用场景:
- 消息分类:根据消息的主题进行路由。
- 多条件路由:支持机动的路由规则。
(五)总结
- Topic 模式:实用于需要根据复杂条件机动路由消息的场景。
- 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
- 斲丧者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
- 留意事项:
- 路由键模式支持通配符 * 和 #。
- 一个队列可以绑定多个路由键模式,接收多种范例的消息。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |