RabbitMQ安装与使用教程(含Spring Boot整合)

打印 上一主题 下一主题

主题 1591|帖子 1591|积分 4773

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
RabbitMQ安装与使用教程(含Spring Boot整合)

一、RabbitMQ简介

RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP),在分布式系统中广泛用于异步通信、解耦组件以及流量削峰等场景。它支持多种消息模式,如点对点、发布/订阅等,具备高可靠性、可扩展性和丰富的功能特性。
二、RabbitMQ安装

1. 安装Erlang情况

RabbitMQ是用Erlang语言编写的,因此需要先安装Erlang运行情况。以下以Ubuntu系统为例进行安装:
更新软件包列表
bash sudo apt-get update
安装Erlang
bash sudo apt-get install erlang
2. 安装RabbitMQ

添加RabbitMQ官方仓库
bash echo "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
导入仓库密钥
bash wget -O - 'https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc' | sudo apt-key add -
再次更新软件包列表并安装RabbitMQ
bash sudo apt-get update sudo apt-get install rabbitmq-server
3. 启动RabbitMQ服务

启动服务
bash sudo systemctl start rabbitmq-server
查看服务状态
bash sudo systemctl status rabbitmq-server
4. 访问RabbitMQ管理界面

RabbitMQ自带一个方便的管理界面,通过以下命令开启:
  1. sudo rabbitmq-plugins enable rabbitmq_management
复制代码
然后在浏览器中访问 http://服务器IP地址:15672 ,默认用户名和密码都是 guest ,登录后可以进行各种管理操作。
三、Spring Boot整合RabbitMQ

1. 创建Spring Boot项目

可以使用Spring Initializr(https://start.spring.io/ )快速创建一个Spring Boot项目,在依赖中添加 Spring for RabbitMQ 。在项目创建过程中,选择合适的Java版本和Spring Boot版本,并添加 spring-boot-starter-amqp 依赖。
2. 设置application.properties或application.yml

在 src/main/resources 目录下的 application.properties (或 application.yml )文件中添加RabbitMQ连接信息:
  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
复制代码
四、定义一样平常队列

1. 设置队列

创建一个设置类来定义队列:
  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class RabbitConfig {
  6.     @Bean
  7.     public Queue normalQueue() {
  8.         return new Queue("normalQueue", true, false, false);
  9.     }
  10. }
复制代码
这里创建了一个名为 normalQueue 的队列,true 表示队列持久化(消息在服务器重启后仍存在),第二个 false 表示非排他队列(多个消耗者可以同时消耗),第三个 false 表示自动删除(消息处理完后队列是否自动删除)。
2. 生产者发送消息

创建一个消息生产者类:
  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class NormalQueueProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendMessage(String message) {
  9.         rabbitTemplate.convertAndSend("normalQueue", message);
  10.     }
  11. }
复制代码
3. 消耗者接收消息

创建一个消息消耗者类:
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class NormalQueueConsumer {
  5.     @RabbitListener(queues = "normalQueue")
  6.     public void receiveMessage(String message) {
  7.         System.out.println("Received message from normalQueue: " + message);
  8.     }
  9. }
复制代码
五、定义私信队列(Fanout Exchange)

1. 场景阐明

假设有一个消息发布系统,需要将同一条消息广播给多个不同的接收方,比方消息发布系统将一条消息消息同时发送给多个订阅者,这种情况下就可以使用 Fanout Exchange (私信队列)。Fanout Exchange 会将接收到的消息广播到所有与之绑定的队列,而不考虑路由键。
2. 设置交换机和队列

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class FanoutConfig {
  9.     @Bean
  10.     public Queue fanoutQueue1() {
  11.         return new Queue("fanoutQueue1", true, false, false);
  12.     }
  13.     @Bean
  14.     public Queue fanoutQueue2() {
  15.         return new Queue("fanoutQueue2", true, false, false);
  16.     }
  17.     @Bean
  18.     public FanoutExchange fanoutExchange() {
  19.         return new FanoutExchange("fanoutExchange");
  20.     }
  21.     @Bean
  22.     public Binding binding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
  23.         return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  24.     }
  25.     @Bean
  26.     public Binding binding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
  27.         return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  28.     }
  29. }
复制代码
这里创建了两个队列 fanoutQueue1 和 fanoutQueue2 ,并将它们绑定到一个 fanoutExchange 交换机上。Fanout Exchange 会将收到的消息广播到所有绑定的队列。
3. 生产者发送消息

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class FanoutProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendMessage(String message) {
  9.         rabbitTemplate.convertAndSend("fanoutExchange", "", message);
  10.     }
  11. }
复制代码
注意这里的路由键为空字符串,因为 Fanout Exchange 不需要路由键。
4. 消耗者接收消息

分别创建两个消耗者类来接收两个队列的消息,这里只展示一个示例:
  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class FanoutQueue1Consumer {
  5.     @RabbitListener(queues = "fanoutQueue1")
  6.     public void receiveMessage(String message) {
  7.         System.out.println("Received message from fanoutQueue1: " + message);
  8.     }
  9. }
复制代码
六、定义延时队列

1. 场景阐明

在很多业务场景中,需要对消息的发送进行延时处理。比方,在电商系统中,当用户下单后,如果用户在一定时间内没有支付,系统需要自动取消订单。这种情况下就可以使用延时队列来实现消息的延时发送和处理。
2. TTL + DLX方式(保举)

2.1 设置死信交换机和队列

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. @Configuration
  7. public class DelayQueueConfig {
  8.     // 定义延时队列
  9.     @Bean
  10.     public Queue delayQueue() {
  11.         Map<String, Object> args = new HashMap<>();
  12.         args.put("x-message-ttl", 5000); // 消息存活时间为5秒
  13.         args.put("x-dead-letter-exchange", "delayDeadLetterExchange"); // 死信交换机
  14.         args.put("x-dead-letter-routing-key", "delayDeadLetterQueue"); // 死信路由键
  15.         return new Queue("delayQueue", true, false, false, args);
  16.     }
  17.     // 定义死信队列
  18.     @Bean
  19.     public Queue delayDeadLetterQueue() {
  20.         return new Queue("delayDeadLetterQueue", true, false, false);
  21.     }
  22.     // 定义死信交换机
  23.     @Bean
  24.     public DirectExchange delayDeadLetterExchange() {
  25.         return new DirectExchange("delayDeadLetterExchange");
  26.     }
  27.     // 绑定死信队列到死信交换机
  28.     @Bean
  29.     public Binding bindingDelayDeadLetter(Queue delayDeadLetterQueue, DirectExchange delayDeadLetterExchange) {
  30.         return BindingBuilder.bind(delayDeadLetterQueue).to(delayDeadLetterExchange).with("delayDeadLetterQueue");
  31.     }
  32. }
复制代码
这里创建了一个延时队列 delayQueue ,设置了消息存活时间为5秒,当消息过期后会转发到 delayDeadLetterExchange 交换机的 delayDeadLetterQueue 队列。下面具体解释一下各个参数的作用:
• x-message-ttl:表示消息在队列中的存活时间,单位是毫秒。当消息在队列中的存活时间超过这个值后,消息就会被标记为过期。
• x-dead-letter-exchange:指定死信交换机的名称。当消息过期后,会被发送到这个指定的交换机。
• x-dead-letter-routing-key:指定死信路由键。当消息过期后,会根据这个路由键将消息发送到死信交换机对应的队列。
2.2 生产者发送消息

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class DelayQueueProducer {
  6.     @Autowired
  7.     private RabbitTemplate rabbitTemplate;
  8.     public void sendMessage(String message) {
  9.         rabbitTemplate.convertAndSend("delayQueue", message);
  10.     }
  11. }
复制代码
2.3 消耗者接收消息

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class DelayDeadLetterQueueConsumer {
  5.     @RabbitListener(queues = "delayDeadLetterQueue")
  6.     public void receiveMessage(String message) {
  7.         System.out.println("Received expired message from delayDeadLetterQueue: " + message);
  8.     }
  9. }
复制代码
3. 使用RabbitMQ官方插件(需额外安装插件)

3.1 安装插件

下载插件:从RabbitMQ官方网站(https://www.rabbitmq.com/community-plugins.html )下载 rabbitmq_delayed_message_exchange 插件。选择与你的RabbitMQ版本匹配的插件版本进行下载。
放置插件文件:将下载的插件文件(通常是一个 .ez 文件)复制到RabbitMQ服务器的插件目录下。在Ubuntu系统上,插件目录通常是 /usr/lib/rabbitmq/plugins/ 。如果没有该目录,可以创建它。
启用插件:安装完成后,需要启用插件。执行以下命令:
bash sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启RabbitMQ服务:插件启用后,需要重启RabbitMQ服务使设置见效:
bash sudo systemctl restart rabbitmq-server
3.2 设置延时交换机和队列

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class DelayedExchangeConfig {
  6.     // 定义延时交换机
  7.     @Bean
  8.     public CustomExchange delayedExchange() {
  9.         Map<String, Object> args = new HashMap<>();
  10.         args.put("x-delayed-type", "direct"); // 延时消息类型,这里设置为direct
  11.         return new CustomExchange("delayedExchange", "x-delayed-message", true, false, args);
  12.     }
  13.     // 定义队列
  14.     @Bean
  15.     public Queue delayedQueue() {
  16.         return new Queue("delayedQueue", true, false, false);
  17.     }
  18.     // 绑定队列到延时交换机
  19.     @Bean
  20.     public Binding bindingDelayedQueue(CustomExchange delayedExchange, Queue delayedQueue) {
  21.         return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayedRoutingKey").noargs();
  22.     }
  23. }
复制代码
这里创建了一个名为 delayedExchange 的延时交换机,范例为 x-delayed-message ,并设置了 x-delayed-type 参数为 direct 。然后创建了一个队列 delayedQueue 并将其绑定到延时交换机上。
3.3 生产者发送消息

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.amqp.support.converter.MessageConverter;
  3. import org.springframework.amqp.support.converter.SimpleMessageConverter;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.messaging.support.MessageBuilder;
  6. import org.springframework.stereotype.Service;
  7. import com.rabbitmq.client.MessageProperties;
  8. @Service
  9. public class DelayedExchangeProducer {
  10.     @Autowired
  11.     private RabbitTemplate rabbitTemplate;
  12.     public void sendMessage(String message, int delay) {
  13.         MessageProperties messageProperties = new MessageProperties();
  14.         messageProperties.setHeader("x-delay", delay); // 设置延时时间,单位为毫秒
  15.         org.springframework.amqp.core.Message amqpMessage = new org.springframework.amqp.core.Message(message.getBytes(), messageProperties);
  16.         rabbitTemplate.send("delayedExchange", "delayedRoutingKey", amqpMessage);
  17.     }
  18. }
复制代码
在生产者发送消息时,通过设置消息头 x-delay 来指定消息的延时时间。
3.4 消耗者接收消息

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class DelayedExchangeConsumer {
  5.     @RabbitListener(queues = "delayedQueue")
  6.     public void receiveMessage(String message) {
  7.         System.out.println("Received delayed message from delayedQueue: " + message);
  8.     }
  9. }
复制代码
七、总结

通过以上步骤,我们完成了RabbitMQ的安装以及与Spring Boot的整合,并实现了普通队列、私信队列和延时队列的定义和使用。在实际应用中,可以根据具体业务需求选择合适的消息模式和设置方式,以实现高效、可靠的消息通报和处理。同时,对于RabbitMQ的高级特性,如消息确认机制、消息持久化、集群设置等,可以根据实际情况进行深入学习和应用,以满意更复杂的业务场景。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表