慢吞云雾缓吐愁 发表于 2024-8-5 03:26:33

RabbitMQ概述

RabbitMQ

RabbitMQ概述

RabbitMQ是一个开源的消息署理(message broker)系统,最初由Rabbit Technologies Ltd开发,并在开源社区的支持下不断发展和完善。它提供了强盛的消息传递机制,被广泛应用于构建分布式系统和应用。RabbitMQ实现了AMQP(高级消息队列协议),并支持多种消息传递模式,包罗点对点、发布/订阅和路由等。
常用消息中间件特点及应用场景



[*]RabbitMQ:RabbitMQ是一个开源的消息署理和队列服务器,支持多种协议和消息持久化。它广泛应用于须要高可靠性、高性能的消息通信场景。
[*]Kafka:Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用。它适用于大规模、高吞吐量的数据传输和处理场景。
[*]ActiveMQ:ActiveMQ是一个功能丰富的消息中间件,支持多种语言和协议。它适用于须要跨平台、跨语言通信的场景。
[*]RocketMQ 是一款低耽误、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的本领,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
RabbitMQ的焦点特性



[*]高性能:RabbitMQ是一个高性能的消息署理系统,能够处理大量的并发连接和消息传递。
[*]可靠性:RabbitMQ利用多种机制来保证消息的可靠性,如持久化、传输确认、发布确认等。这些机制确保即使在系统瓦解或重启的环境下,消息也不会丢失。
[*]灵活的路由:RabbitMQ通过交换机(Exchange)来实现消息的灵活路由。交换机可以根据消息的路由键(Routing Key)将消息路由到一个或多个队列。RabbitMQ提供了多种交换机类型,如直接交换机、扇出交换机、主题交换机等,以满意不同的消息传递需求。
[*]消息集群:多个RabbitMQ服务器可以构成一个集群,形成一个逻辑Broker(服务)。这样可以提高系统的可扩展性和容错性。
[*]高可用:队列可以在集群中的机器上举行镜像,使得在部分节点出问题的环境下队列仍然可用。这种可伸缩性确保了系统的高可用性。
[*]多语言支持:RabbitMQ几乎支持所有的常用语言,如Java、.NET、Ruby、Python等。这使得RabbitMQ可以与各种语言和框架举行集成。
与其他消息中间件的对比


[*]Kafka:


[*]持久性:Kafka以日志的情势存储消息,提供高度的持久性和可重放性。
[*]高吞吐量:设计用于处理大规模数据流,适用于高吞吐量的场景。
分布式:构建为分布式系统,支持程度扩展。
适用场景:大规模数据处理,实时数据流分析。

[*]ActiveMQ:


[*]JMS支持:完全支持Java Message Service(JMS),提供强盛的消息模型。
[*]集成:可以与各种应用服务器和开发框架集成。
[*]适用场景:Java生态系统中的应用,须要支持JMS标准的企业级应用。

[*]RocketMQ:


[*]分布式架构:支持程度扩展,适应高并发场景。
[*]可靠性:提供强盛的消息持久性和可靠性,支持同步和异步传输方式。
[*]实时性:适用于实时数据传输和大规模消息处理。
[*]高性能:具有高吞吐量和低耽误的特性。
总结与归纳:
RabbitMQ:强调灵活性和易用性,适用于须要简单、可靠消息传递的应用,特别是对AMQP标准有需求的企业。
Kafka:专注于构建实时数据管道,支持流式处理,适用于大规模数据处理和实时数据流分析。
ActiveMQ:完全支持JMS标准,适合Java生态系统中的应用,特别是在企业级应用和微服务架构中。
RocketMQ:具有高性能和分布式架构,适用于须要实时数据传输和大规模消息处理的场景。
RabbitMQ的工作原理

RabbitMQ的工作原理可以概括为以下几个步骤:


[*]生产者发送消息:生产者将消息发送到指定的交换机。
[*]交换机分发消息:交换机根据路由规则将消息分发到不同的队列。
[*]队列存储消息:队列负责存储消息,直到消费者从队列中获取消息。
[*]消费者获取并处理消息:消费者从队列中获取消息,并举行相应的处理。在处理完消息后,消费者会向RabbitMQ发送确认信号,表现消息已被乐成处理。
[*]确认消息处理完成:RabbitMQ在吸收到消费者的确认信号后,会将该消息从队列中删除。如果消费者在处理消息时发生异常或瓦解,RabbitMQ会将消息重新放回队列中,等候其他消费者再次处理。
RabbitMQ的利用场景

RabbitMQ的利用场景非常广泛,包罗但不限于以下几个方面:


[*]异步通信:在微服务架构中,服务与服务之间的通信经常是异步的。RabbitMQ可以作为通信的桥梁,实现服务的解耦和异步处理。
[*]任务调度:RabbitMQ可以作为任务调度的中心,将须要定时执行的任务发送到队列中。任务执行器作为消费者订阅该队列,当有新的任务到达时立即执行。
[*]日志网络:在分布式系统中,各个服务都会产生大量的日志信息。可以将这些日志信息发送到RabbitMQ的队列中,由专门的日志网络服务举行统一处理和分析。
[*]消息通知:RabbitMQ可以用于实现各种消息通知功能,如用户注册乐成后的通知、订单状态变动的通知等。通过发布/订阅模式,可以将消息广播到所有订阅了该主题的消费者。
RabbitMQ实践案例

1.异步处理


[*]假设有一个订单系统,用户在提交订单后须要等候系统处理。为了提高系统的响应速度,可以将订单处理逻辑放入RabbitMQ的消息队列中举行异步处理
[*]当用户提交订单时,生产者将订单信息封装成消息,发送到名为orders_queue的队列中。订单处理服务作为消费者订阅该队列,当有新的订单消息到达时,立即举行处理。
[*]消费者处理完订单后,可以发送一个确认消息到另一个队列,通知前端或其他服务订单处理完成。

[*]微服务间通信


[*]在微服务架构中,不同服务之间的通信通常通过REST API或gRPC等方式举行。然而,在某些场景下,利用消息队枚举行通信可能更加符合。
[*]比方,在电商系统中,当用户下单后,订单服务须要将订单信息发送给库存服务举行库存扣减。这时,可以利用RabbitMQ举行服务间的通信。
[*]订单服务作为生产者,将订单信息发送到名为order_to_stock_queue的队列中。库存服务作为消费者订阅该队列,当有新的订单消息到达时,举行库存扣减操作。

[*]耽误任务处理


[*]耽误任务是指须要在将来某个时间点执行的任务,如发送耽误邮件、耽误删除数据等。利用RabbitMQ的耽误队列功能可以方便地实现耽误任务的处理。
[*]起首,须要安装rabbitmq-delayed-message-exchange插件来启用耽误队列功能。然后,在生产者发送消息时,设置消息的耽误时间。RabbitMQ会消息放入耽误队列中,并在指定的耽误时间后将消息发送到目的队列中。
[*]消费者从目的队列中获取消息并举行处理,从而实现了耽误任务的处理。

[*]流量削峰


[*]在高并发的场景下,系统的处理本领可能会受到限定。为了应对这种环境,可以利用RabbitMQ举行流量削峰。
[*]当系统吸收到大量的请求时,生产者可以将这些请求封装成消息,并按照肯定的速率发送到RabbitMQ的队列中。消费者从队列中获取请求并举行处理,从而制止了系统因为过载而瓦解。
举个栗子

https://img-blog.csdnimg.cn/direct/c6bc13d4a7674beaa03916ec8f6bcec4.png
RabbitMQ的实践应用可以通过Java代码来展示。以下是一个简单的例子,展示如何利用Java AMQP客户端库(RabbitMQ的官方Java客户端)来发送和吸收消息。

[*]依靠配置
起首,你须要在你的pom.xml(如果你利用的是Maven)中添加RabbitMQ Java客户端的依靠:
<dependencies>
    <!-- RabbitMQ Java Client -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.13.4</version> <!-- 请检查并使用最新版本 -->
    </dependency>
</dependencies>

[*]发送消息(生产者)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost"); // RabbitMQ服务器地址
      try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" Sent '" + message + "'");
      }
    }
}

[*]吸收消息(消费者)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost"); // RabbitMQ服务器地址
      try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                          AMQP.BasicProperties properties, byte[] body) throws IOException {
                  String message = new String(body, "UTF-8");
                  System.out.println(" Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
      }
    }
}

[*]运行
起首,确保你的RabbitMQ服务已经启动。
运行Send类,你会看到它发送了一个消息到名为"hello"的队列。
然后,运行Recv类,你会看到它开始监听"hello"队列,并吸收并打印出从该队列中吸收到的消息。
这个例子展示了RabbitMQ的基本功能:一个生产者发送消息到一个队列,一个消费者从该队列吸收并处理消息。当然,RabbitMQ的功能远不止于此,它还支持多种交换机类型、消息持久化、消费者确认机制等高级功能。
总结来说,RabbitMQ是一个功能强盛、性能杰出的消息署理系统,它支持多种消息传递模式和高级特性,能够满意各种分布式系统和应用的需求。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ概述