什么是RabbitMQ?RabbitMQ有什么长处?我们该怎样利用呢

打印 上一主题 下一主题

主题 571|帖子 571|积分 1713

RabbitMQ概述

RabbitMQ是一个开源的消息署理软件,也称为面向消息的中间件。它实现了高级消息队列协议(AMQP),由Erlang语言编写,旨在提供高性能、健壮以及可伸缩性消息队列服务。RabbitMQ在分布式系统开发中应用广泛,支持多种操作系统和编程语言,如Linux、Windows、macOS等操作系统,以及Python、Java、Ruby、PHP、C#、JavaScript等编程语言。
简朴来说RabbitMQ 是一个流行的消息署理队列服务器,它允许应用步调之间进行异步通信

RabbitMQ的利用案例


下面我们来看看RabbitMQ的长处及利用场景
发布与订阅模式:


   - 在此模式下,RabbitMQ充当了消息队列的角色,连接了生产者和消费者。生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理。

   - 例如,一个新闻网站可能利用RabbitMQ来处理新闻稿的发布。当新闻稿被创建时,它会被发送到RabbitMQ的某个队列中。然后,该网站的多个服务(如网站前端、移动应用后端等)可以作为消费者从该队列中获取新闻稿,并实时更新其表现内容。
   - 配置参数可能包括队列名称、交换机类型(如直接交换机、主题交换机等)、路由键等。在Spring Boot项目中,可以通过添加`spring-boot-starter-amqp`依靠来集成RabbitMQ。

消息长期化

   - RabbitMQ支持将消息从内存长期化到硬盘,以确保在服务器重启或故障时不会丢失消息。
   - 例如,一个电子商务网站可能利用RabbitMQ来处理订单。当用户提交订单时,订单信息会被发送到RabbitMQ的队列中,并立即长期化到硬盘。然后,订单处理服务可以从队列中获取订单并处理它。假如订单处理服务在处理订单时瓦解,RabbitMQ可以确保订单信息不会丢失,并在服务恢复后继承处理

集群服务

- RabbitMQ支持集群服务,可以通过添加多个RabbitMQ节点来提高系统的可伸缩性和容错性。
   - 例如,一个大型交际媒体应用可能利用RabbitMQ来处理用户之间的消息通报。为了处理大量的并发消息,该应用可以摆设一个RabbitMQ集群,由多个节点组成。每个节点都可以处理一部分消息,从而提高整个系统的吞吐量

高可用性

   - RabbitMQ的集群和故障转移机制可以确保在高负载或故障情况下系统的可用性和可靠性。
   - 例如,在一个银行系统中,RabbitMQ可能被用于处理交易消息。为了确保系统的稳定性和可靠性,可以利用RabbitMQ的集群和故障转移机制来确保即使部分节点出现故障,整个系统仍旧可以正常运行并处理交易消息。

下面我们来举几个RabbitMQ的例子来帮你加深一下印象
1. 简朴的消息发送和接收
发送者(Producer)
  1. import com.rabbitmq.client.ConnectionFactory;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.Channel;
  4. public class Send {
  5.     private final static String QUEUE_NAME = "hello";
  6.     public static void main(String[] argv) throws Exception {
  7.         ConnectionFactory factory = new ConnectionFactory();
  8.         factory.setHost("localhost");
  9.         try (Connection connection = factory.newConnection();
  10.              Channel channel = connection.createChannel()) {
  11.             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  12.             String message = "Hello World!";
  13.             channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
  14.             System.out.println(" [x] Sent '" + message + "'");
  15.         }
  16.     }
  17. }
复制代码
接收者(Consumer)
  1. import com.rabbitmq.client.ConnectionFactory;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Consumer;
  5. import com.rabbitmq.client.DefaultConsumer;
  6. import com.rabbitmq.client.AMQP.BasicProperties;
  7. public class Recv {
  8.     private final static String QUEUE_NAME = "hello";
  9.     public static void main(String[] argv) throws Exception {
  10.         ConnectionFactory factory = new ConnectionFactory();
  11.         factory.setHost("localhost");
  12.         Connection connection = factory.newConnection();
  13.         Channel channel = connection.createChannel();
  14.         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  16.         Consumer consumer = new DefaultConsumer(channel) {
  17.             @Override
  18.             public void handleDelivery(String consumerTag, Envelope envelope,
  19.                                        AMQP.BasicProperties properties, byte[] body) throws IOException {
  20.                 String message = new String(body, "UTF-8");
  21.                 System.out.println(" [x] Received '" + message + "'");
  22.             }
  23.         };
  24.         channel.basicConsume(QUEUE_NAME, true, consumer);
  25.     }
  26. }
复制代码
2. 订单处理系统
订单发送(Order Producer)
  1. // 省略了部分代码,只显示发送订单到队列的部分
  2. // ...
  3. channel.basicPublish("orders_queue", "", null, orderData.getBytes("UTF-8"));
  4. // ...
复制代码
订单处理(Order Consumer)
  1. // 省略了部分代码,只显示从队列接收订单并处理的部分
  2. // ...
  3. Consumer consumer = new DefaultConsumer(channel) {
  4.     @Override
  5.     public void handleDelivery(String consumerTag, Envelope envelope,
  6.                                BasicProperties properties, byte[] body) throws IOException {
  7.         String orderData = new String(body, "UTF-8");
  8.         // 处理订单...
  9.         System.out.println("Processing order: " + orderData);
  10.     }
  11. };
  12. channel.basicConsume("orders_queue", true, consumer);
  13. // ...
复制代码
3.日志发送和网络
日志发送者(Log Producer)
  1. // 省略了部分代码,只显示发送日志到队列的部分
  2. // ...
  3. String logEntry = "Application log entry at " + new Date();
  4. channel.basicPublish("logs", "", null, logEntry.getBytes("UTF-8"));
  5. // ...
复制代码
日志网络器(Log Consumer)
作为日志网络器(Log Consumer)来消费 RabbitMQ 队列中的日志消息。该应用步调将消费名为 "logs" 的队列,并将收到的消息打印到控制台
  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class LogConsumer {
  5.     public static void main(String[] args) throws IOException, TimeoutException {
  6.         // 创建连接工厂
  7.         ConnectionFactory factory = new ConnectionFactory();
  8.         factory.setHost("localhost");
  9.         factory.setPort(5672);
  10.         factory.setUsername("guest");
  11.         factory.setPassword("guest");
  12.         // 获取连接
  13.         Connection connection = factory.newConnection();
  14.         // 获取信道
  15.         Channel channel = connection.createChannel();
  16.         // 声明队列(如果队列已存在,则不需要声明)
  17.         channel.queueDeclare("logs", false, false, false, null);
  18.         // 定义消费者
  19.         Consumer consumer = new DefaultConsumer(channel) {
  20.             @Override
  21.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  22.                 String message = new String(body, "UTF-8");
  23.                 System.out.println(" [x] Received message: " + message);
  24.             }
  25.         };
  26.         // 开始消费队列中的消息
  27.         channel.basicConsume("logs", true, consumer);
  28.     }
  29. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

傲渊山岳

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表