ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ详解与实战(绝对足够惊喜) [打印本页]

作者: 千千梦丶琪    时间: 2024-7-13 15:49
标题: RabbitMQ详解与实战(绝对足够惊喜)

什么是RabbitMQ

   RabbitMQ 是一个开源的消息队列中间件,它实现了高度可靠、灵活和可扩展的消息通报模型。它基于 AMQP(高级消息队列协议)来举行消息的传输和交互。
  以下是 RabbitMQ 的一些重要构成部分和特性的详细先容:
    总结:RabbitMQ 是一个功能强大的消息队列中间件,它提供了高度可靠、灵活和可扩展的消息通报模型。通过使用生产者、交换机、队列和斲丧者,开辟人员可以构建可靠的分布式系统,实现异步通信息争耦应用程序的组件。
  RabbitMQ与Kafka的区别

   RabbitMQ 和 Kafka 都是流行的消息队列系统,它们在计划和用途上有一些区别。以下是 RabbitMQ 和 Kafka 之间的重要区别:
    
    
    
    
  RabbitMQ与Kafka的各自实用场景

   RabbitMQ实用场景:
  
  Kafka实用场景:
  
  总结:
RabbitMQ实用于夸大消息可靠性通报和复杂路由的场景,适合任务队列、工作流和订单处理等应用。Kafka实用于高吞吐量、低耽误、分布式架构和持久性存储,适合大规模数据流处理和及时流处理等应用。选择合适的系统取决于具体的业务需求和使用场景。
  RabbitMQ的安装

Erlang下载安装

  1. wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
  2. yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
复制代码
RabbitMQ下载安装

  1. wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
  2. rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
  3. yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
复制代码
RabbotMQ的启动

  1. systemctl start rabbitmq-server  #启动
  2. systemctl enable rabbitmq-server #设置开机自启
  3. systemctl status rabbitmq-server #查看状态
复制代码
RabbitMQ Web界面管理

默认情况下,是没有安装web端的客户端插件,必要安装插件才可以生效。实行命令:
  1. rabbitmq-plugins enable rabbitmq_management
复制代码
然后必要重启服务
  1. systemctl restart rabbitmq-server
复制代码
由于Web管理界面访问端口为15672,以是防火墙必要放行该端口
对于 Centos 7 上的防火墙,要放行端口 15672(默认 RabbitMQ 管理界面的端口),可以按照以下步骤举行操纵:
如今,CentOS 7 的防火墙应该已经放行了 15672 端口和5672 端口,答应对 RabbitMQ 管理界面举行访问。请留意,为了安全起见,发起仅在必要时才开放必要的端口,并在完成使用后关闭不必要的端口。
Web界面访问管理

RabbitMQ 默认的管理界面账号和密码通常是:

这对默认凭据在 RabbitMQ 安装后可用于访问管理界面(只限于当地)。然而,出于安全考虑,强烈发起在生产环境中修改默认凭据或创建新的管理员帐户,并使用更强大的密码来增强安全性。
要在 RabbitMQ 中添加新用户,您必要使用 RabbitMQ 提供的命令行工具或者管理界面举行操纵。下面是两种方法的简要说明:
方法一:使用 RabbitMQ 命令行工具
方法二:使用 RabbitMQ 管理界面
无论您使用哪种方法,确保为新用户选择一个强大的密码,并在生产环境中遵照安全最佳实践。
访问方式为 主机IP地址共同端标语,比方 192.168.18.14:15672


留意,此处必要举行一次授权,否则在代码中连接RabbitMQ会失败


SpringBoot+RabbitMQ实战

引入依赖

  1.       <!--rabbitmq-->
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-amqp</artifactId>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.springframework.boot</groupId>
  8.             <artifactId>spring-boot-starter-web</artifactId>
  9.         </dependency>
复制代码
修改设置文件

  1. server:
  2.     port: 8080
  3. spring:
  4.     application:
  5.         name: RabbitMQExample
  6.     rabbitmq:
  7.         host: 192.168.18.14
  8.         port: 5672
  9.         username: admin
  10.         password: admin
复制代码
  这是一个Spring Boot应用程序的设置文件,它使用RabbitMQ作为消息队列。下面是各个部分的详细解释:
  
  RabbitMQ五种消息模型

   再开始实战之前,我们在先容一下RabbitMQ 重要的消息模式!RabbitMQ 支持多种消息模型,以下是其中五种常见的消息模型,功能计划我们也将围绕如下五种举行开展
    以上是 RabbitMQ 的五种常见消息模型,每种模型都有其实用的场景和特点。您可以根据具体需求选择合适的消息模型来构建应用程序。
  RabbitMQ简单模式(生产者斲丧者模式)

   RabbitMQ简单模式,也称为基本模式(Basic Model),是RabbitMQ的最简单的消息通报模式,仅涉及到一个生产者和一个斲丧者。
  在这个模式中,当我们启动一个程序作为生产者并向RabbitMQ发出消息时,我们盼望它直接进入队列中,然后斲丧者会从队列中获取这个消息并举行处理。
  简单模式在RabbitMQ中是一个单队列单生产者单斲丧者的模式,重要实用于单纯的任务处理,消息的生产者和斲丧者的削峰填谷能力非常高。
  下面树模如何基于Spring Boot实现RabbitMQ的简单模式:
  设置类

  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * @Description MQ配置类
  6. * @Author IT小辉同学
  7. * @Date 2023/06/16
  8. */
  9. @Configuration
  10. public class RabbitMQConfig {
  11.     /**
  12.      * @return {@link Queue }
  13.      * @Description 设置队列
  14.      * @Author IT小辉同学
  15.      * @Date 2023/06/16
  16.      */
  17.     @Bean
  18.     public Queue queue(){
  19.         return new Queue("simple.hello");
  20.     }
  21. }
复制代码
  这是一个名为RabbitMQConfig的设置类,用于界说 RabbitMQ 的干系设置。
  该类中包含了一个标有 @Bean 注解的方法 queue(),用于创建并设置队列。方法返回类型为 Queue,并指定队列名称为 "simple.hello"。
  通过使用 @Bean 注解,Spring Boot 将会根据该方法的返回值来创建一个名为 "simple.hello" 的队列,并将其注册到 Spring 上下文中,以供其他组件使用。
  该设置类提供了创建队列的逻辑,通常在应用启动时会自动实行该方法并创建对应的队列实例。可以在其他组件中通过依赖注入(如 @Autowired)或者获取应用上下文(如 ApplicationContext.getBean())的方式来得到该队列实例,以便举行消息发送和吸取的操纵。
  留意:此设置不可省略,否则当队列不存在的时间会报错
  生产者

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. /**
  5. * @Description 生产者(发送消息)
  6. * @Author IT小辉同学
  7. * @Date 2023/06/16
  8. */
  9. @Service
  10. public class MessageSender {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * @param message 消息
  15.      * @Description 发送消息
  16.      * @Author IT小辉同学
  17.      * @Date 2023/06/16
  18.      */
  19.     public void sendMessage(String message) {
  20.         System.out.println("发送祝福:" + message);
  21.         rabbitTemplate.convertAndSend("simple.hello", message);
  22.     }
  23. }
复制代码
  在MessageSender类中,有一个私有成员变量rabbitTemplate,类型为RabbitTemplate。RabbitTemplate是Spring提供的一个用于操纵RabbitMQ消息队列的模板类。
  该类中界说了一个公共方法sendMessage,吸取一个字符串类型的参数message。该方法的作用是发送一条消息到名为"simple.hello"的队列中。具体实现是通过调用rabbitTemplate的convertAndSend方法来完成的。
  斲丧者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @Description 消息接收器
  7. * @Author IT小辉同学
  8. * @Date 2023/06/16
  9. */
  10. @Service
  11. public class MessageReceiver {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     /**
  15.      * @param message 消息
  16.      * @Description 处理消息
  17.      * @Author IT小辉同学
  18.      * @Date 2023/06/16
  19.      */
  20.     @RabbitListener(queues = "simple.hello")
  21.     public void handleMessage(String message) {
  22.         System.out.println("我收到了你的祝福: " + message);
  23.     }
  24. }
复制代码
  该类中界说了一个公共方法handleMessage,吸取一个字符串类型的参数message。该方法的作用是处理从名为"simple.hello"的队列中吸取到的消息,负责监听消息!!!
  测试

  1. import com.xiaohui.service.MessageSender;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. @SpringBootTest
  6. public class MQTestDemo {
  7.     @Autowired
  8.     private MessageSender messageSender;
  9.     @Test
  10.     public void testDemo1(){
  11.             messageSender.sendMessage("我想跟你说:希望你开心快乐!!!");
  12.     }
  13. }
复制代码
留意:不可以省略@SpringBootTest,否则监听不到MQ设置
   @SpringBootTest是Spring框架中的一个测试注解,用于运行Spring Boot应用程序的单元测试。它可以初始化和设置Spring应用程序上下文,使测试应用程序的活动更容易举行测试。
  当使用@SpringBootTest注解时,它可以在测试类或测试方法上使用。当在测试类上使用时,它会自动为该类中的全部测试方法初始化和设置Spring应用程序上下文。当在测试方法上使用时,它会为该特定方法初始化和设置Spring应用程序上下文。
  使用@SpringBootTest的一些长处包罗:
  
  


RabbitMQ工作队列模式(广播模式)

   RabbitMQ工作队列模式,也称为Task Queues或Background Tasks,是一种常见的应用场景,它用于处理大量的任务,将任务举行排队,然后分发给多个斲丧者举行处理。这种模式实用于必要异步处理耗时的、密集型任务而且要求可靠性的情况。
  RabbitMQ工作队列模式的基本原理是,将必要处理的任务投递到RabbitMQ中,生成任务队列(Task Queues),多个斲丧者通过拉取任务队列中的任务举行处理。
  在RabbitMQ的工作队列模式中,队列中的每个消息都会分配给一个斲丧者举行处理。斲丧者可以是不同的进程、线程或服务,从而实现可扩展性和并行性。
  在一个生产者-多个斲丧者的场景下,生产者只必要将消息发送到一个消息队列中,斲丧者会自动从队列中获取消息举行处理。假如存在多个斲丧者,队列中的消息将会被分摊给多个斲丧者举行处理,即实现了任务并行处理的功能。而且假如一个斲丧者挂掉,该斲丧者所占用的任务在肯定的时间内不会被重新分配,即实现了任务可靠性处理的功能。
  一般情况下,RabbitMQ的工作队列模式可以应用于以下场景:
  
  设置类

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @Description MQ配置类
  9. * @Author IT小辉同学
  10. * @Date 2023/06/16
  11. */
  12. @Configuration
  13. public class RabbitMQConfig {
  14.     //队列1
  15.     private static final String QUEUE01 = "queue01";
  16.     //队列2
  17.     private static final String QUEUE02 = "queue02";
  18.     //交换机
  19.     private static final String EXCHANGE_NAME = "fanout_exchange";
  20.     @Bean
  21.     public Queue queue1() {
  22.         return new Queue(QUEUE01);
  23.     }
  24.     @Bean
  25.     public Queue queue2() {
  26.         return new Queue(QUEUE02);
  27.     }
  28.     @Bean
  29.     public FanoutExchange fanoutExchange() {
  30.         return new FanoutExchange(EXCHANGE_NAME);
  31.     }
  32.     @Bean
  33.     public Binding binding01() {
  34.         return BindingBuilder.bind(queue1()).to(fanoutExchange());
  35.     }
  36.     @Bean
  37.     public Binding binding02() {
  38.         return BindingBuilder.bind(queue2()).to(fanoutExchange());
  39.     }
  40. }
复制代码
  这段代码是一个设置类,用于设置RabbitMQ的队列和交换机。其中:
  
  生产者

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. /**
  5. * @Description 生产者(发送消息)
  6. * @Author IT小辉同学
  7. * @Date 2023/06/16
  8. */
  9. @Service
  10. public class MessageSender {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * @param message 消息
  15.      * @Description 发送消息
  16.      * @Author IT小辉同学
  17.      * @Date 2023/06/16
  18.      */
  19.     public void sendMessage(String message) {
  20.         System.out.println( message);
  21.         rabbitTemplate.convertAndSend("fanout_exchange","", message);
  22.     }
  23. }
复制代码
  这段代码是使用RabbitTemplate发送消息到名为"fanout_exchange"的交换机上。其中:
  
  斲丧者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @Description 消息接收器
  7. * @Author IT小辉同学
  8. * @Date 2023/06/16
  9. */
  10. @Service
  11. public class MessageReceiver {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     /**
  15.      * @param message 消息
  16.      * @Description 消费者01
  17.      * @Author IT小辉同学
  18.      * @Date 2023/06/16
  19.      */
  20.     @RabbitListener(queues = "queue01")
  21.     public void receiver01(String message) {
  22.         System.out.println("队列01:奔赴山海," + message);
  23.     }
  24.     /**
  25.      * @param message 消息
  26.      * @Description 消费者012
  27.      * @Author IT小辉同学
  28.      * @Date 2023/06/16
  29.      */
  30.     @RabbitListener(queues = "queue02")
  31.     public void receiver02(String message) {
  32.         System.out.println("队列02:向阳而生," + message);
  33.     }
  34. }
复制代码
测试

  1. @SpringBootTest
  2. public class MQTestDemo {
  3.     @Autowired
  4.     private MessageSender messageSender;
  5.     @Test
  6.     public void testDemo1(){
  7.             messageSender.sendMessage("相信梦想。。。。。。");
  8.     }
  9. }
复制代码

RabbitMQ发布/订阅模式

   RabbitMQ发布/订阅模式,也叫做“广播(Broadcast)模式”,是RabbitMQ的一种高级消息通报模式,重要用于广播消息。
  在发布/订阅模式中,消息发送到Exchange(交换机)上,并携带着一个Routing Key(路由键),Exchange将收到的消息转发到绑定在它上面的全队伍列。每个绑定键(Binding Key)都与一个队列干系联,而队列和消息的吸取者实现了完全解耦,吸取者只必要订阅(subscribe)与该队列干系联的绑定键即可。
  我们将它作为“广播”模式,因为可以将一条消息同时发送到多个斲丧者。比方,我们可以让多个斲丧者吸取网站上发布的新闻消息。
  发布/订阅模式在RabbitMQ中的架构非常简单,重要可以描述为以下四个步骤:
    以下是如何使用Spring Boot实现RabbitMQ发布/订阅模式:
  设置类

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * @Description MQ配置类
  6. * @Author IT小辉同学
  7. * @Date 2023/06/16
  8. */
  9. @Configuration
  10. public class RabbitmqConfig {
  11.     //队列01
  12.     private static final String QUEUE01= "queue01";
  13.     //队列02
  14.     private static final String QUEUE02= "queue02";
  15.     //交换机
  16.     private static final String EXCHANGE_NAME = "direct_exchange";
  17.     //路由键01
  18.     private static final String ROUTINGKEY01 = "queue_route01";
  19.     //路由键02
  20.     private static final String ROUTINGKEY02 = "queue_route02";
  21.     @Bean
  22.     public Queue queue01(){
  23.         return new Queue(QUEUE01);
  24.     }
  25.     @Bean
  26.     public Queue queue02(){
  27.         return new Queue(QUEUE02);
  28.     }
  29.     @Bean
  30.     public DirectExchange directExchange(){
  31.         return new DirectExchange(EXCHANGE_NAME);
  32.     }
  33.     @Bean
  34.     public Binding binding1(){
  35.         //将列队01绑定到交换机上为给他设置路由键
  36.         return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
  37.     }
  38.     @Bean
  39.     public Binding binding2(){
  40.         //将列队02绑定到交换机上为给他设置路由键
  41.         return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
  42.     }
  43. }
复制代码
  在RabbitmqConfig类中,界说了多个静态变量,用于存储队列、交换机和路由键的名称。同时,还界说了多个@Bean方法,用于创建队列、交换机和绑定对象。
  其中,queue01()和queue02()方法分别创建了两个队列对象,并将它们与对应的队列名称关联起来。directExchange()方法创建了一个直接交换机对象,并将其与交换机的名称关联起来。binding1()和binding2()方法则分别创建了两个绑定对象,将队列绑定到交换机上,并设置了相应的路由键。
  通过这些设置,可以方便地对RabbitMQ举行测试和开辟。比方,可以使用@Autowired注解注入Queue对象,然后使用send()方法发送消息到指定的队列中。
  生产者

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. /**
  5. * @Description 生产者(发送消息)
  6. * @Author IT小辉同学
  7. * @Date 2023/06/16
  8. */
  9. @Service
  10. public class MessageSender {
  11.     @Autowired
  12.     private RabbitTemplate rabbitTemplate;
  13.     /**
  14.      * @param message 消息
  15.      * @Description 发送消息
  16.      * @Author IT小辉同学
  17.      * @Date 2023/06/16
  18.      */
  19.     public void sendMessage(String message) {
  20.         System.out.println( message);
  21.         rabbitTemplate.convertAndSend("direct_exchange","queue_route01", message);
  22.     }
  23. }
复制代码
  
  斲丧者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @Description 消息接收器
  7. * @Author IT小辉同学
  8. * @Date 2023/06/16
  9. */
  10. @Service
  11. public class MessageReceiver {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     /**
  15.      * @param message 消息
  16.      * @Description 消费者01
  17.      * @Author IT小辉同学
  18.      * @Date 2023/06/16
  19.      */
  20.     @RabbitListener(queues = "queue01")
  21.     public void receiver01(String message) {
  22.         System.out.println("队列01——路由01:奔赴山海," + message);
  23.     }
  24.     /**
  25.      * @param message 消息
  26.      * @Description 消费者012
  27.      * @Author IT小辉同学
  28.      * @Date 2023/06/16
  29.      */
  30.     @RabbitListener(queues = "queue02")
  31.     public void receiver02(String message) {
  32.         System.out.println("队列02——路由02:向阳而生," + message);
  33.     }
  34. }
复制代码
测试

  1. import com.xiaohui.service.MessageSender;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
  2. public class MQTestDemo {
  3.     @Autowired
  4.     private MessageSender messageSender;
  5.     @Test
  6.     public void testDemo1(){
  7.             messageSender.sendMessage("相信梦想。。。。。。");
  8.     }
  9. }
复制代码

RabbitMQ路由模式

   RabbitMQ路由模式是一种高级消息通报模式,它可以通过选择路由键(Routing Key)将消息推送到绑定键(Binding Key)与之匹配的队列中,以满足不同的斲丧者必要。
  路由模式重要用于单一应用程序内的消息通报,生产者将消息发送到指定的Exchange(交换机)中,而且Exchange会根据Routing Key将消息放到绑定到Exchange上的队列中。而不同的斲丧者使用不同的Binding Key来决定与哪个队列建立接洽并吸取消息。
  在RabbitMQ中,路由模式有以下几个步骤:
    下面是如何使用Spring Boot实现RabbitMQ路由模式:
  设置类

接着,界说一个交换机和两个队列:
  1. @Configuration
  2. public class RabbitConfig {
  3.     public static final String EXCHANGE_NAME = "direct_exchange";
  4.     public static final String QUEUE_NAME_1 = "queue_1";
  5.     public static final String QUEUE_NAME_2 = "queue_2";
  6.     public static final String ROUTING_KEY_1 = "key_1";
  7.     public static final String ROUTING_KEY_2 = "key_2";
  8.     @Bean
  9.     public DirectExchange directExchange() {
  10.         return new DirectExchange(EXCHANGE_NAME);
  11.     }
  12.     @Bean
  13.     public Queue queue1() {
  14.         return new Queue(QUEUE_NAME_1);
  15.     }
  16.     @Bean
  17.     public Queue queue2() {
  18.         return new Queue(QUEUE_NAME_2);
  19.     }
  20.     @Bean
  21.     public Binding binding1() {
  22.         return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY_1);
  23.     }
  24.     @Bean
  25.     public Binding binding2() {
  26.         return BindingBuilder.bind(queue2()).to(directExchange()).with(ROUTING_KEY_2);
  27.     }
  28. }
复制代码
生产者

  1. @Service
  2. public class MessageProducer {
  3.     @Autowired
  4.     private AmqpTemplate amqpTemplate;
  5.     public void send(String message, String routingKey) {
  6.         amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, routingKey, message);
  7.         System.out.println("Sent message: " + message + ", routing key: " + routingKey);
  8.     }
  9. }
复制代码
斲丧者

  1. @Service
  2. public class MessageConsumer {
  3.     @RabbitListener(queues = RabbitConfig.QUEUE_NAME_1)
  4.     public void receiveFromQueue1(String message) {
  5.         System.out.println("Received message from queue 1: " + message);
  6.     }
  7.     @RabbitListener(queues = RabbitConfig.QUEUE_NAME_2)
  8.     public void receiveFromQueue2(String message) {
  9.         System.out.println("Received message from queue 2: " + message);
  10.     }
  11. }
复制代码
测试

  1. @Service
  2. public class TestService {
  3.     @Autowired
  4.     private MessageProducer producer;
  5.     @PostConstruct
  6.     public void test() {
  7.         producer.send("hello, queue 1", RabbitConfig.ROUTING_KEY_1);
  8.         producer.send("hello, queue 2", RabbitConfig.ROUTING_KEY_2);
  9.     }
  10. }
复制代码
运行程序后,可以看到控制台输出:
  1. Sent message: hello, queue 1, routing key: key_1
  2. Sent message: hello, queue 2, routing key: key_2
  3. Received message from queue 1: hello, queue 1
  4. Received message from queue 2: hello, queue 2
复制代码
说明消息成功发送到了指定的队列。
RabbitMQ主题模式

   RabbitMQ主题模式(Topic Model)是一种高级消息通报模式,它使你可以订阅一个特定的主题(Topic)并吸取全部与该主题干系的消息。主题模式是在发布/订阅模式底子上进一步增强了消息通报的粒度。
  在主题模式中,Exchange不仅可以使用Routing Key来将消息通报到队列中,还可以使用一个模式字符串来匹配Routing Key,这个模式字符串被称为主题(Topic)。斲丧者可以通过订阅不同的主题来吸取不同的消息。
  一个主题可以包含一个或多个单词(Word),单词之间使用"."(点号)来分割。通配符符号“#”表示跟单词数不限,而“”则表示只匹配一个单词。比方,“news.it.#”可以匹配“news.it.abc”、“news.it.cnn”、“news.it.abc.def”等,而“news.it.”只能匹配到“news.it.abc”和“news.it.cnn”,不能匹配多于一个单词的情况。
  主题模式在RabbitMQ中的架构非常简单,重要可以描述为以下四个步骤:
    下面是如何使用Spring Boot实现RabbitMQ主题模式:
  设置类

  1. @Configuration
  2. public class RabbitConfig {
  3.     public static final String EXCHANGE_NAME = "topic_exchange";
  4.     public static final String QUEUE_NAME_1 = "queue_1";
  5.     public static final String QUEUE_NAME_2 = "queue_2";
  6.     public static final String QUEUE_NAME_3 = "queue_3";
  7.     public static final String ROUTING_KEY_1 = "key_1.*";
  8.     public static final String ROUTING_KEY_2 = "key_2.*";
  9.     public static final String ROUTING_KEY_3 = "*.key_3";
  10.     @Bean
  11.     public TopicExchange topicExchange() {
  12.         return new TopicExchange(EXCHANGE_NAME);
  13.     }
  14.     @Bean
  15.     public Queue queue1() {
  16.         return new Queue(QUEUE_NAME_1);
  17.     }
  18.     @Bean
  19.     public Queue queue2() {
  20.         return new Queue(QUEUE_NAME_2);
  21.     }
  22.     @Bean
  23.     public Queue queue3() {
  24.         return new Queue(QUEUE_NAME_3);
  25.     }
  26.     @Bean
  27.     public Binding binding1() {
  28.         return BindingBuilder.bind(queue1()).to(topicExchange()).with(ROUTING_KEY_1);
  29.     }
  30.     @Bean
  31.     public Binding binding2() {
  32.         return BindingBuilder.bind(queue2()).to(topicExchange()).with(ROUTING_KEY_2);
  33.     }
  34.     @Bean
  35.     public Binding binding3() {
  36.         return BindingBuilder.bind(queue3()).to(topicExchange()).with(ROUTING_KEY_3);
  37.     }
  38. }
复制代码
生产者

在生产者端,发送消息到交换机,这里使用了三种不同的路由键:
  1. @Service
  2. public class MessageProducer {
  3.     @Autowired
  4.     private AmqpTemplate amqpTemplate;
  5.     public void send(String message, String routingKey) {
  6.         amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, routingKey, message);
  7.         System.out.println("Sent message: " + message + ", routing key: " + routingKey);
  8.     }
  9. }
复制代码
斲丧者
在斲丧者端,使用通配符连接到交换机,并指定一个斲丧者:
  1. @Service
  2. public class MessageConsumer {
  3.     @RabbitListener(queues = RabbitConfig.QUEUE_NAME_1)
  4.     public void receiveFromQueue1(String message) {
  5.         System.out.println("Received message from queue 1: " + message);
  6.     }
  7.     @RabbitListener(queues = RabbitConfig.QUEUE_NAME_2)
  8.     public void receiveFromQueue2(String message) {
  9.         System.out.println("Received message from queue 2: " + message);
  10.     }
  11.     @RabbitListener(queues = RabbitConfig.QUEUE_NAME_3)
  12.     public void receiveFromQueue3(String message) {
  13.         System.out.println("Received message from queue 3: " + message);
  14.     }
  15. }
复制代码
测试

如今,我们来测试一下,这里发送了三条消息,分别匹配了不同的绑定键:
  1. @Service
  2. public class TestService {
  3.     @Autowired
  4.     private MessageProducer producer;
  5.     @PostConstruct
  6.     public void test() {
  7.         producer.send("hello, queue 1", RabbitConfig.ROUTING_KEY_1);
  8.         producer.send("hello, queue 2", RabbitConfig.ROUTING_KEY_2);
  9.         producer.send("hello, queue 3", RabbitConfig.ROUTING_KEY_3);
  10.     }
  11. }
复制代码
运行程序后,可以看到控制台输出:
  1. Sent message: hello, queue 1, routing key: key_1.*
  2. Sent message: hello, queue 2, routing key: key_2.*
  3. Sent message: hello, queue 3, routing key: *.key_3
  4. Received message from queue 1: hello, queue 1
  5. Received message from queue 2: hello, queue 2
  6. Received message from queue 3: hello, queue 3
复制代码
我的天哪,太不容易了,坑倒是不多,文字东西太多了,还不能违背初心去抄袭,耗费时间很长!假如您看到这里,庆贺你,我们一起成长了!感谢相遇,再会有期!!!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4