rabbitmq五种模式的总结——附java-se实现(详细)

打印 上一主题 下一主题

主题 986|帖子 986|积分 2958

rabbitmq五种模式的总结

   完整项目地点:https://github.com/9lucifer/rabbitmq4j-learning
  

一、简单模式

(一)简单模式概述

RabbitMQ 的简单模式是最根本的消息队列模式,包含以下两个角色:

  • 生产者:负责发送消息到队列。
  • 斲丧者:负责从队列中接收并处置处罚消息。
在简单模式中,消息的转达是单向的,生产者将消息发送到队列,斲丧者从队列中接收消息。


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到指定的队列中。
  1. package top.miqiu._01_hello;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8.     public static void main(String[] args) throws IOException, TimeoutException {
  9.         // 1. 创建连接工厂
  10.         ConnectionFactory connectionFactory = new ConnectionFactory();
  11.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  12.         connectionFactory.setHost("ip(要换成真实的ip哦)");
  13.         connectionFactory.setPort(5672);
  14.         connectionFactory.setUsername("admin");
  15.         connectionFactory.setPassword("admin");
  16.         // 3. 创建连接对象
  17.         Connection connection = connectionFactory.newConnection();
  18.         // 4. 创建 Channel
  19.         Channel channel = connection.createChannel();
  20.         // 5. 声明队列
  21.         /**
  22.          * 参数说明:
  23.          * 1. 队列名称:01-hello2
  24.          * 2. 是否持久化:true(重启后队列仍然存在)
  25.          * 3. 是否独占队列:false(允许多个消费者连接)
  26.          * 4. 是否自动删除:false(队列不会自动删除)
  27.          * 5. 额外参数:null
  28.          */
  29.         channel.queueDeclare("01-hello2", true, false, false, null);
  30.         // 6. 发送消息
  31.         /**
  32.          * 参数说明:
  33.          * 1. 交换机名称:空字符串(使用默认交换机)
  34.          * 2. 路由键:队列名称(01-hello2)
  35.          * 3. 额外属性:null
  36.          * 4. 消息内容:字节数组
  37.          */
  38.         channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());
  39.         System.out.println("消息发送成功");
  40.         // 7. 关闭资源
  41.         channel.close();
  42.         connection.close();
  43.     }
  44. }
复制代码
结果



(三)斲丧者代码解析

代码

斲丧者负责从队列中接收并处置处罚消息。
  1. package top.miqiu._01_hello_c;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         // 1. 创建连接工厂
  8.         ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  10.         connectionFactory.setHost("ip(要换成真实的ip哦");
  11.         connectionFactory.setPort(5672);
  12.         connectionFactory.setUsername("admin");
  13.         connectionFactory.setPassword("admin");
  14.         // 3. 创建连接对象
  15.         Connection connection = connectionFactory.newConnection();
  16.         // 4. 创建 Channel
  17.         Channel channel = connection.createChannel();
  18.         // 5. 声明队列(需与生产者保持一致)
  19.         channel.queueDeclare("01-hello2", false, false, false, null);
  20.         // 6. 接收消息
  21.         /**
  22.          * 参数说明:
  23.          * 1. 队列名称:01-hello2
  24.          * 2. 是否自动确认:true(消息被消费后自动确认)
  25.          * 3. 消息处理回调:DeliverCallback
  26.          * 4. 消息取消回调:CancelCallback
  27.          */
  28.         channel.basicConsume("01-hello2", true, new DeliverCallback() {
  29.             @Override
  30.             public void handle(String consumerTag, Delivery delivery) throws IOException {
  31.                 System.out.println("接收到消息:" + new String(delivery.getBody()));
  32.             }
  33.         }, new CancelCallback() {
  34.             @Override
  35.             public void handle(String consumerTag) throws IOException {
  36.                 System.out.println("消息被取消");
  37.             }
  38.         });
  39.     }
  40. }
复制代码
结果


在mq中查看



(四)总结


  • 简单模式:实用于一对一的简单消息转达场景。
  • 生产者:负责创建队列并发送消息。
  • 斲丧者:负责从队列中接收并处置处罚消息。
  • 留意事项

    • 队列名称需保持同等,不然肯定会报错!
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用完资源后需显式关闭 Channel 和 Connection。

二、工作模式

(一)工作模式概述

工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个斲丧者。它的特点是:

  • 一个生产者:负责发送消息到队列。
  • 多个斲丧者:共同斲丧同一个队列中的消息。
  • 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给斲丧者。
工作模式实用于任务分发场景,例如将耗时的任务分发给多个 Worker 处置处罚。


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的队列中。
  1. package top.miqiu._02_work;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8.     public static void main(String[] args) throws IOException, TimeoutException {
  9.         // 1. 创建连接工厂
  10.         ConnectionFactory connectionFactory = new ConnectionFactory();
  11.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  12.         connectionFactory.setHost("你的ip!别忘了改");
  13.         connectionFactory.setPort(5672);
  14.         connectionFactory.setUsername("admin");
  15.         connectionFactory.setPassword("admin");
  16.         // 3. 创建连接对象
  17.         Connection connection = connectionFactory.newConnection();
  18.         // 4. 创建 Channel
  19.         Channel channel = connection.createChannel();
  20.         // 5. 声明队列
  21.         /**
  22.          * 参数说明:
  23.          * 1. 队列名称:02-work1
  24.          * 2. 是否持久化:true(重启后队列仍然存在)
  25.          * 3. 是否独占队列:false(允许多个消费者连接)
  26.          * 4. 是否自动删除:false(队列不会自动删除)
  27.          * 5. 额外参数:null
  28.          */
  29.         channel.queueDeclare("02-work1", true, false, false, null);
  30.         // 6. 发送消息
  31.         for (int i = 0; i < 20; i++) {
  32.             String message = "hello work:" + i;
  33.             channel.basicPublish("", "02-work1", null, message.getBytes());
  34.         }
  35.         System.out.println("消息发送成功");
  36.         // 7. 关闭资源
  37.         channel.close();
  38.         connection.close();
  39.     }
  40. }
复制代码
关键点:


  • 队列声明(queueDeclare):创建队列并设置队列属性。
  • 消息发送(basicPublish):通过循环发送多条消息到队列。
  • 持久化队列:设置为 true,确保队列在 RabbitMQ 重启后仍然存在。

(三)斲丧者代码解析

代码

斲丧者负责从队列中接收并处置处罚消息。
  1. package top.miqiu._02_work;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         // 1. 创建连接工厂
  8.         ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  10.         connectionFactory.setHost("你的ip!别忘了改");
  11.         connectionFactory.setPort(5672);
  12.         connectionFactory.setUsername("admin");
  13.         connectionFactory.setPassword("admin");
  14.         // 3. 创建连接对象
  15.         Connection connection = connectionFactory.newConnection();
  16.         // 4. 创建 Channel
  17.         Channel channel = connection.createChannel();
  18.         // 5. 声明队列(需与生产者保持一致)
  19.         channel.queueDeclare("02-work1", true, false, false, null);
  20.         // 6. 设置每次只接收一条消息
  21.         channel.basicQos(1);
  22.         // 7. 接收消息
  23.         /**
  24.          * 参数说明:
  25.          * 1. 队列名称:02-work1
  26.          * 2. 是否自动确认:false(手动确认消息)
  27.          * 3. 消息处理回调:DeliverCallback
  28.          * 4. 消息取消回调:CancelCallback
  29.          */
  30.         channel.basicConsume("02-work1", false, new DeliverCallback() {
  31.             @Override
  32.             public void handle(String consumerTag, Delivery delivery) throws IOException {
  33.                 try {
  34.                     // 模拟消息处理耗时
  35.                     Thread.sleep(1000);
  36.                 } catch (InterruptedException e) {
  37.                     e.printStackTrace();
  38.                 }
  39.                 System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));
  40.                 // 手动确认消息
  41.                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  42.             }
  43.         }, new CancelCallback() {
  44.             @Override
  45.             public void handle(String consumerTag) throws IOException {
  46.                 System.out.println("消息被取消");
  47.             }
  48.         });
  49.     }
  50. }
复制代码
关键点:


  • 队列声明(queueDeclare):确保队列存在,需与生产者保持同等。
  • 消息预取(basicQos):设置每次只接收一条消息,制止某个斲丧者处置处罚过多消息。
  • 手动确认(basicAck):消息处置处罚完成后手动确认,确保消息不会丢失。
  • 消息处置处罚耗时:通过 Thread.sleep(1000) 模拟消息处置处罚耗时。
效果




(四)工作模式的特点


  • 消息分发机制

    • 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个斲丧者。
    • 可以通过 basicQos 设置每次只接收一条消息,制止某个斲丧者处置处罚过多消息。

  • 消息确认机制

    • 设置为手动确认(autoAck=false),确保消息处置处罚完成后才确认。(防止业务处置处罚失败的情况下丢失消息)
    • 如果斲丧者在处置处罚消息时崩溃,未确认的消息会重新分发给其他斲丧者。

  • 实用场景

    • 任务分发场景,例如将耗时的任务分发给多个 Worker 处置处罚。


(五)总结


  • 工作模式:实用于任务分发场景,多个斲丧者共同斲丧同一个队列中的消息。
  • 生产者:负责发送消息到队列。
  • 斲丧者:负责接收并处置处罚消息,支持手动确认和消息预取。
  • 留意事项

    • 队列名称需保持同等。
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用 basicQos 控制消息分发,制止某个斲丧者处置处罚过多消息。

三、发布订阅模式

(一)发布订阅模式概述

发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个斲丧者。它的特点是:

  • 一个生产者:将消息发送到交换机(Exchange)。
  • 多个斲丧者:每个斲丧者都有自己的队列,并与交换机绑定。
  • 消息广播:交换机将消息广播给全部绑定的队列。
发布订阅模式实用于消息广播场景,例如日志系统、关照系统等。


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的交换机中。
  1. package top.miqiu._03_pubsub;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer {
  9.     public static void main(String[] args) throws IOException, TimeoutException {
  10.         // 1. 创建连接工厂
  11.         ConnectionFactory connectionFactory = new ConnectionFactory();
  12.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  13.         connectionFactory.setHost("用自己的ip!!");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("admin");
  16.         connectionFactory.setPassword("admin");
  17.         // 3. 创建连接对象
  18.         Connection connection = connectionFactory.newConnection();
  19.         // 4. 创建 Channel
  20.         Channel channel = connection.createChannel();
  21.         // 5. 声明交换机
  22.         /**
  23.          * 参数说明:
  24.          * 1. 交换机名称:03-pubsub
  25.          * 2. 交换机类型:fanout(广播模式)
  26.          */
  27.         channel.exchangeDeclare("03-pubsub", "fanout");
  28.         // 6. 发送消息
  29.         for (int i = 0; i < 20; i++) {
  30.             String message = "hello work:" + i;
  31.             /**
  32.              * 参数说明:
  33.              * 1. 交换机名称:03-pubsub
  34.              * 2. 路由键:空字符串(fanout 模式忽略路由键)
  35.              * 3. 消息属性:MessageProperties.TEXT_PLAIN
  36.              * 4. 消息内容:字节数组
  37.              */
  38.             channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());
  39.         }
  40.         System.out.println("消息发送成功");
  41.         // 7. 关闭资源
  42.         channel.close();
  43.         connection.close();
  44.     }
  45. }
复制代码
关键点:


  • 交换机声明(exchangeDeclare):创建交换机并设置范例为 fanout(广播模式)。
  • 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  • 消息广播:消息会被广播到全部绑定到该交换机的队列。

(三)斲丧者代码解析

代码

斲丧者负责从队列中接收并处置处罚消息。
  1. package top.miqiu._03_pubsub;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer2 {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         // 1. 创建连接工厂
  8.         ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  10.         connectionFactory.setHost("用自己的ip!!");
  11.         connectionFactory.setPort(5672);
  12.         connectionFactory.setUsername("admin");
  13.         connectionFactory.setPassword("admin");
  14.         // 3. 创建连接对象
  15.         Connection connection = connectionFactory.newConnection();
  16.         // 4. 创建 Channel
  17.         Channel channel = connection.createChannel();
  18.         // 5. 声明交换机
  19.         channel.exchangeDeclare("03-pubsub", "fanout");
  20.         // 6. 创建临时队列
  21.         String queue = channel.queueDeclare().getQueue();
  22.         // 7. 绑定队列到交换机
  23.         /**
  24.          * 参数说明:
  25.          * 1. 队列名称:queue
  26.          * 2. 交换机名称:03-pubsub
  27.          * 3. 路由键:空字符串(fanout 模式忽略路由键)
  28.          */
  29.         channel.queueBind(queue, "03-pubsub", "");
  30.         // 8. 接收消息
  31.         /**
  32.          * 参数说明:
  33.          * 1. 队列名称:queue
  34.          * 2. 是否自动确认:true(自动确认消息)
  35.          * 3. 消息处理回调:DeliverCallback
  36.          * 4. 消息取消回调:CancelCallback
  37.          */
  38.         channel.basicConsume(queue, true, new DeliverCallback() {
  39.             @Override
  40.             public void handle(String consumerTag, Delivery delivery) throws IOException {
  41.                 try {
  42.                     // 模拟消息处理耗时
  43.                     Thread.sleep(1000);
  44.                 } catch (InterruptedException e) {
  45.                     e.printStackTrace();
  46.                 }
  47.                 System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));
  48.             }
  49.         }, new CancelCallback() {
  50.             @Override
  51.             public void handle(String consumerTag) throws IOException {
  52.                 System.out.println("消息被取消");
  53.             }
  54.         });
  55.     }
  56. }
复制代码
关键点:


  • 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持同等。
  • 暂时队列(queueDeclare):创建一个暂时队列,队列名称由 RabbitMQ 自动生成。
  • 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  • 消息接收(basicConsume):从队列中接收消息并处置处罚。
结果



   可以看到两个consumer都斲丧了相同的消息
  
(四)发布订阅模式的特点


  • 消息广播:交换机将消息广播给全部绑定的队列。
  • 暂时队列:斲丧者可以创建暂时队列,队列名称由 RabbitMQ 自动生成。
  • 实用场景

    • 日志系统:将日志消息广播给多个斲丧者。
    • 关照系统:将关照消息广播给多个用户。


(五)总结


  • 发布订阅模式:实用于消息广播场景,多个斲丧者各自接收相同的消息。
  • 生产者:负责将消息发送到交换机。
  • 斲丧者:负责创建队列并绑定到交换机,接收并处置处罚消息。
  • 留意事项

    • 交换机范例需设置为 fanout。
    • 队列绑定到交换机时,路由键为空字符串。
    • 暂时队列的名称由 RabbitMQ 自动生成。


(六)RabbitMQ 交换机范例总结

交换机范例描述路由举动实用场景Fanout广播模式,将消息发送到全部绑定到该交换机的队列。忽略路由键(Routing Key),消息会被广播到全部绑定的队列。日志系统、关照系统等需要广播消息的场景。Direct直接模式,根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。Topic主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。消息分类、多条件路由等需要机动匹配的场景。Headers头部模式,根据消息的头部属性(Headers)进行匹配。不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑,例如根据消息的元数据进行路由。
详细说明

1. Fanout 交换机(广播,常用)



  • 特点

    • 消息会被广播到全部绑定到该交换机的队列。
    • 忽略路由键(Routing Key)。

  • 实用场景

    • 日志系统:将日志消息广播给多个斲丧者。
    • 关照系统:将关照消息广播给多个用户。

2. Direct 交换机



  • 特点

    • 消息的路由键必须与队列绑定的路由键完全匹配。
    • 支持一对一或一对多的精确路由。

  • 实用场景

    • 任务分发:将特定任务路由到特定的 Worker。
    • 点对点通信:将消息发送到特定的接收者。

3. Topic 交换机



  • 特点

    • 支持通配符匹配:

      • * 匹配一个单词。
      • # 匹配零个或多个单词。

    • 路由键的格式通常是点分字符串(如 user.create)。

  • 实用场景

    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持机动的路由规则。

4. Headers 交换机



  • 特点

    • 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
    • 支持复杂的匹配规则(如 x-match 参数)。

  • 实用场景

    • 复杂的路由逻辑:根据消息的元数据进行路由。
    • 需要高度机动性的场景。


对比

场景FanoutDirectTopicHeaders日志广播全部斲丧者接收相同的日志消息。不实用。不实用。不实用。任务分发不实用。将任务路由到特定的 Worker。将任务分类路由到差别的 Worker。根据任务的元数据进行路由。关照系统全部效户接收相同的关照。特定用户接收特定关照。根据关照范例路由到差别用户。根据关照的元数据进行路由。消息分类不实用。不实用。根据消息主题进行路由。根据消息的头部属性进行路由。
总结



  • Fanout:实用于广播场景。
  • Direct:实用于精确路由场景。
  • Topic:实用于机动的路由场景。
  • Headers:实用于复杂的路由逻辑。
四、路由模式

(一)路由模式概述

路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:

  • 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
  • 多个斲丧者:每个斲丧者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
  • 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
路由模式实用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。


(二)生产者代码解析

生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。
  1. package top.miqiu._04_routing;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer {
  9.     public static void main(String[] args) throws IOException, TimeoutException {
  10.         // 1. 创建连接工厂
  11.         ConnectionFactory connectionFactory = new ConnectionFactory();
  12.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  13.         connectionFactory.setHost("你的ip!!!");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("admin");
  16.         connectionFactory.setPassword("admin");
  17.         // 3. 创建连接对象
  18.         Connection connection = connectionFactory.newConnection();
  19.         // 4. 创建 Channel
  20.         Channel channel = connection.createChannel();
  21.         // 5. 声明 Direct 交换机
  22.         /**
  23.          * 参数说明:
  24.          * 1. 交换机名称:04-routing
  25.          * 2. 交换机类型:direct
  26.          */
  27.         channel.exchangeDeclare("04-routing", "direct");
  28.         // 6. 发送消息
  29.         for (int i = 0; i < 20; i++) {
  30.             String message = "hello work:" + i;
  31.             /**
  32.              * 参数说明:
  33.              * 1. 交换机名称:04-routing
  34.              * 2. 路由键:err(消息将发送到绑定 err 路由键的队列)
  35.              * 3. 消息属性:MessageProperties.TEXT_PLAIN
  36.              * 4. 消息内容:字节数组
  37.              */
  38.             channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());
  39.         }
  40.         System.out.println("消息发送成功");
  41.         // 7. 关闭资源
  42.         channel.close();
  43.         connection.close();
  44.     }
  45. }
复制代码
关键点:


  • 交换机声明(exchangeDeclare):创建 Direct 交换机,范例为 direct。
  • 消息发送(basicPublish):指定路由键(如 err),消息会被发送到绑定该路由键的队列。
  • 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。

(三)斲丧者代码解析

代码

斲丧者负责创建队列并绑定到 Direct 交换机,同时指定路由键。
  1. package top.miqiu._04_routing;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         // 1. 创建连接工厂
  8.         ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  10.         connectionFactory.setHost("你的ip!!!");
  11.         connectionFactory.setPort(5672);
  12.         connectionFactory.setUsername("admin");
  13.         connectionFactory.setPassword("admin");
  14.         // 3. 创建连接对象
  15.         Connection connection = connectionFactory.newConnection();
  16.         // 4. 创建 Channel
  17.         Channel channel = connection.createChannel();
  18.         // 5. 声明 Direct 交换机
  19.         channel.exchangeDeclare("04-routing", "direct");
  20.         // 6. 创建临时队列
  21.         String queue = channel.queueDeclare().getQueue();
  22.         // 7. 绑定队列到交换机,并指定路由键
  23.         /**
  24.          * 参数说明:
  25.          * 1. 队列名称:queue
  26.          * 2. 交换机名称:04-routing
  27.          * 3. 路由键:info、err、waring
  28.          */
  29.         channel.queueBind(queue, "04-routing", "info");
  30.         channel.queueBind(queue, "04-routing", "err");
  31.         channel.queueBind(queue, "04-routing", "waring");
  32.         // 8. 接收消息
  33.         /**
  34.          * 参数说明:
  35.          * 1. 队列名称:queue
  36.          * 2. 是否自动确认:true(自动确认消息)
  37.          * 3. 消息处理回调:DeliverCallback
  38.          * 4. 消息取消回调:CancelCallback
  39.          */
  40.         channel.basicConsume(queue, true, new DeliverCallback() {
  41.             @Override
  42.             public void handle(String consumerTag, Delivery delivery) throws IOException {
  43.                 try {
  44.                     // 模拟消息处理耗时
  45.                     Thread.sleep(1000);
  46.                 } catch (InterruptedException e) {
  47.                     e.printStackTrace();
  48.                 }
  49.                 System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));
  50.             }
  51.         }, new CancelCallback() {
  52.             @Override
  53.             public void handle(String consumerTag) throws IOException {
  54.                 System.out.println("消息被取消");
  55.             }
  56.         });
  57.     }
  58. }
复制代码
关键点:


  • 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持同等。
  • 暂时队列(queueDeclare):创建一个暂时队列,队列名称由 RabbitMQ 自动生成。
  • 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如 info、err、waring)。
  • 消息接收(basicConsume):从队列中接收消息并处置处罚。
效果

consumer1绑定了[info,err,waring],以是在producer绑定了info时发送消息的情况下,consumer1可以接收到信息

由于consumer2绑定的是trace,以是consumer2是接收不到消息的


(四)路由模式的特点


  • 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
  • 多路由键支持:一个队列可以绑定多个路由键,接收多种范例的消息。
  • 实用场景

    • 日志级别分类:将差别级别的日志(如 info、err)路由到差别的队列。
    • 任务分发:将特定任务路由到特定的 Worker。


(五)总结


  • 路由模式:实用于需要根据路由键精确路由消息的场景。
  • 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
  • 斲丧者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
  • 留意事项

    • 路由键必须完全匹配。
    • 一个队列可以绑定多个路由键,接收多种范例的消息。

五、Topic 模式

(一)Topic 模式概述

Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:

  • 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
  • 多个斲丧者:每个斲丧者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
  • 机动的路由:支持通配符匹配:

    • * 匹配一个单词。
    • # 匹配零个或多个单词。

Topic 模式实用于需要根据复杂条件机动路由消息的场景,例如消息分类、多条件路由等。


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。
  1. package top.miqiu._05_topic;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer {
  9.     public static void main(String[] args) throws IOException, TimeoutException {
  10.         // 1. 创建连接工厂
  11.         ConnectionFactory connectionFactory = new ConnectionFactory();
  12.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  13.         connectionFactory.setHost("用自己的ip!!");
  14.         connectionFactory.setPort(5672);
  15.         connectionFactory.setUsername("admin");
  16.         connectionFactory.setPassword("admin");
  17.         // 3. 创建连接对象
  18.         Connection connection = connectionFactory.newConnection();
  19.         // 4. 创建 Channel
  20.         Channel channel = connection.createChannel();
  21.         // 5. 声明 Topic 交换机
  22.         /**
  23.          * 参数说明:
  24.          * 1. 交换机名称:05-topic
  25.          * 2. 交换机类型:topic
  26.          */
  27.         channel.exchangeDeclare("05-topic", "topic");
  28.         // 6. 发送消息
  29.         for (int i = 0; i < 20; i++) {
  30.             String message = "hello work:" + i;
  31.             /**
  32.              * 参数说明:
  33.              * 1. 交换机名称:05-topic
  34.              * 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)
  35.              * 3. 消息属性:MessageProperties.TEXT_PLAIN
  36.              * 4. 消息内容:字节数组
  37.              */
  38.             channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());
  39.         }
  40.         System.out.println("消息发送成功");
  41.         // 7. 关闭资源
  42.         channel.close();
  43.         connection.close();
  44.     }
  45. }
复制代码
关键点:


  • 交换机声明(exchangeDeclare):创建 Topic 交换机,范例为 topic。
  • 消息发送(basicPublish):指定路由键(如 user.hi),消息会被发送到匹配的队列。
  • 通配符匹配

    • * 匹配一个单词(如 user.* 匹配 user.hi,但不匹配 user.hi.there)。
    • # 匹配零个或多个单词(如 user.# 匹配 user.hi 和 user.hi.there)。


(三)斲丧者代码解析

代码

斲丧者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
  1. package top.miqiu._05_topic;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer2 {
  6.     public static void main(String[] args) throws IOException, TimeoutException {
  7.         // 1. 创建连接工厂
  8.         ConnectionFactory connectionFactory = new ConnectionFactory();
  9.         // 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码
  10.         connectionFactory.setHost("用自己的ip!!");
  11.         connectionFactory.setPort(5672);
  12.         connectionFactory.setUsername("admin");
  13.         connectionFactory.setPassword("admin");
  14.         // 3. 创建连接对象
  15.         Connection connection = connectionFactory.newConnection();
  16.         // 4. 创建 Channel
  17.         Channel channel = connection.createChannel();
  18.         // 5. 声明 Topic 交换机
  19.         channel.exchangeDeclare("05-topic", "topic");
  20.         // 6. 创建临时队列
  21.         String queue = channel.queueDeclare().getQueue();
  22.         // 7. 绑定队列到交换机,并指定路由键模式
  23.         /**
  24.          * 参数说明:
  25.          * 1. 队列名称:queue
  26.          * 2. 交换机名称:05-topic
  27.          * 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)
  28.          */
  29.         channel.queueBind(queue, "05-topic", "user.*");
  30.         // 8. 接收消息
  31.         /**
  32.          * 参数说明:
  33.          * 1. 队列名称:queue
  34.          * 2. 是否自动确认:true(自动确认消息)
  35.          * 3. 消息处理回调:DeliverCallback
  36.          * 4. 消息取消回调:CancelCallback
  37.          */
  38.         channel.basicConsume(queue, true, new DeliverCallback() {
  39.             @Override
  40.             public void handle(String consumerTag, Delivery delivery) throws IOException {
  41.                 try {
  42.                     // 模拟消息处理耗时
  43.                     Thread.sleep(1000);
  44.                 } catch (InterruptedException e) {
  45.                     e.printStackTrace();
  46.                 }
  47.                 System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));
  48.             }
  49.         }, new CancelCallback() {
  50.             @Override
  51.             public void handle(String consumerTag) throws IOException {
  52.                 System.out.println("消息被取消");
  53.             }
  54.         });
  55.     }
  56. }
复制代码
关键点:


  • 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持同等。
  • 暂时队列(queueDeclare):创建一个暂时队列,队列名称由 RabbitMQ 自动生成。
  • 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如 user.*)。
  • 消息接收(basicConsume):从队列中接收消息并处置处罚。
效果

当我在producer使用“employee.hi”作为路由key的时间,绑定了“employee.*”的consumer1可以斲丧这个消息


(四)Topic 模式的特点


  • 机动的路由:支持通配符匹配,可以根据复杂的条件路由消息。
  • 多路由键支持:一个队列可以绑定多个路由键模式,接收多种范例的消息。
  • 实用场景

    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持机动的路由规则。


(五)总结


  • Topic 模式:实用于需要根据复杂条件机动路由消息的场景。
  • 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
  • 斲丧者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
  • 留意事项

    • 路由键模式支持通配符 * 和 #。
    • 一个队列可以绑定多个路由键模式,接收多种范例的消息。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户国营

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表