文章目次
一、什么是消息队列?
二、消息队列的作用(优点)
1、解耦
2、流量削峰
3、异步
4、次序性
三、RabbitMQ根本结构
四、RabbitMQ队列模式
1、简单队列模式
2、工作队列模式
3、发布/订阅模式
4、路由模式
5、主题模式
6、RPC模式
7、发布者确认模式
五、RabbitMQ相关属性描述
总结
一、什么是消息队列?
消息队列是一种用于在分布式体系中举行通信的技术。它是一种存储和转发消息的中间件,可以用
于将应用程序之间的通信解耦,从而实现高效的异步通信。消息队列允许发送者将消息发送到队列
中,而吸收者则可以从队列中获取消息并举行处理。这种方式可以资助体系实现高可用性、高性
能、松耦合和可伸缩性。消息队列通常包括生产者(发送消息的应用程序)、消费者(吸收消息的
应用程序)和队列(存储消息的缓冲区)。
RabbitMQ:是由erlang语言开辟,基于AMQP(高级消息队列协议)协议实现的一种消息队列。市面
上还有许多消息队列,比如Kafka、RocketMQ、Redis等,各有优劣,本文重要先容RabbitMQ。
官方文档:RabbitMQ Tutorials | RabbitMQ
二、消息队列的作用(优点)
1、解耦
应用程序解耦,通过引入消息队列,不同的应用程序之间可以通过消息队列举行通信,而无需直接
调用对方的接口或方法。如许可以低落体系中各个应用程序之间的耦合度,使得它们可以独立演化
和扩展,而不会由于对方的变革而受到影响。
2、流量削峰
消息队列可以作为一个缓冲区,暂时存储流入的消息,直到体系有足够的资源来处理它们。当体系
出现流量高峰时,消息队列可以暂时存储过多的消息,以平滑处理流量的波动,避免体系被突发的
高负载压垮。
3、异步
发送者在发送消息后可以立即继续执行其他操作,而不需要等待吸收者的相应。如许可以进步体系
的并发性和相应速度。也可以资助进步体系的吞吐量,特别是在面临大量请求或处理复杂盘算时。
发送者可以并行地向多个吸收者发送消息,而不会由于等待吸收者的相应而阻塞。
4、次序性
虽然并不是所有消息队列都能包管消息的绝对次序性,但是在许多情况下,消息队列可以包管消息
的相对次序性。即按照发送次序举行处理,对某些场景要求次序执行很适合。
三、RabbitMQ根本结构
名称
描述
Connection(连接 )
连接是生产者和消费者与RabbitMQ之间的连接。每个生产者和消费者都需要与RabbitMQ创建一个连接,以便发送和吸收消息。连接通常是长连接,可以重用以进步性能和效率。
Channel(信道)
Channel是连接(Connection)内的逻辑通道,用于完成大部分 AMQP 操作,如声明队列、发送和吸收消息等。在 RabbitMQ 中引入 Channel(信道)的重要目的是为了进步体系的性能、机动性和效率。利用 Channel 可以避免频仍地创建和销毁连接,由于一个连接可以包罗多个 Channel。如许可以减少连接的开销,节省体系资源,并进步性能。
Exchange(交换机)
交换机是消息的吸收和分发中央,负责吸收生产者发送的消息,并根据指定的路由规则发送到一个或多个队列中。
(Exchange相称于Queue的代理,可以设置不同的写入战略,写入到对应的队列中。对于队列的写入,更加机动)
交换机的类型有:fanout扇出、topic主题、direct直接
Queue(队列)
队列是消息的缓存区,用于存储交换机发送的消息。生产者发送的消息最终会被存储在队列中,等待消费者举行消费。队列可以长期化到磁盘,以确保消息不会在RabbitMQ宕机或重启后丢失。
Producer(生产者)
生产者是发送消息到RabbitMQ的应用程序。生产者负责创建消息并将其发送到RabbitMQ的消息队列中。
Consumer(消费者)
消费者是从RabbitMQ队列中吸收消息并举行处理的应用程序。消费者可以订阅一个或多个队列,并在消息到达队列时吸收并处理它们。消费者负责监听队列中的消息,并将其取出举行处理。
四、RabbitMQ队列模式
基于Exchange交换机,RabbitMQ截至目前有七种队列模式。
1、简单队列模式
一个消息生产者,一个消息消费者,一个队列。也称为点对点模式。

图中P代表生产者,C代表消费者,Queue是队列名称。
我们看到是没有Exchange的,但是RabbitMQ也会有一个默认的交换机。这个默认的交换机通常被
称为"amq.default"或者""(空字符串),是RabbitMQ自动创建的,用于在没有指定交换机的情况
下将消息发送到队列。
- //生产者
- var factory = new ConnectionFactory { HostName = "localhost"}; //初始化连接信息
- using var connection = factory.CreateConnection(); //创建连接
- using var channel = connection.CreateModel(); //创建信道
- //声明一个队列,并将信道与队列绑定
- channel.QueueDeclare(queue: "hello",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- //发送消息的内容
- string message = $"Hello World!";
- var body = Encoding.UTF8.GetBytes(message);
- //信道绑定交换机
- channel.BasicPublish(exchange: string.Empty,
- routingKey: string.Empty,
- basicProperties: null,
- body: body);
- Console.WriteLine($" [x] Sent {message}");
- Console.WriteLine(" Press [enter] to exit.");
- //消费者
- var factory = new ConnectionFactory { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- channel.QueueDeclare(queue: "hello",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- Console.WriteLine(" [*] Waiting for messages.");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine($" [x] Received {message}");
- };
- channel.BasicConsume(queue: "hello",
- autoAck: true,
- consumer: consumer);
- Console.WriteLine(" Press [enter] to exit.");
复制代码 此时就会生产者发送一条消息,消费者就会吸收一条消息。
2、工作队列模式
_工作队列又叫做任务队列,正常_会按次序把消息发送给每一个订阅的消费者,均匀而言,每个消费
者将获得雷同数量的消息。(不是P发送一条消息,C1和C2都会收到,而是第一条C1消费,第二
条C2消费。每个消息只会被一个消费者吸收和处理)。
如许的好处是可以进步吞吐量,由于生产者发送了许多消息,但是消费者只有一个,消费者处理很
慢,就会造成消息积存。

- //生产者
- var factory = new ConnectionFactory { HostName = "localhost"};
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- channel.QueueDeclare(queue: "task_queue",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- var message = $"work queue";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: string.Empty,
- routingKey: string.Empty,
- basicProperties: null,
- body: body);
- Console.WriteLine($" [x] Sent {message}");
- Console.WriteLine(" Press [enter] to exit.");
- //消费者
- var factory = new ConnectionFactory { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- channel.QueueDeclare(queue: "task_queue",
- durable: true,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- Console.WriteLine(" [*] Waiting for messages.");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- byte[] body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine($" [x] Received {message}");
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- };
- channel.BasicConsume(queue: "task_queue",
- autoAck: false,
- consumer: consumer);
- Console.WriteLine(" Press [enter] to exit.");
复制代码 工作队列与简单队列一致,会有一个默认的交换机。
3、发布/订阅模式
发布/订阅模式是一种消息通报模式,它允许发送者(发布者)将消息发布到多个吸收者(订阅
者)。消息通报模型的核心头脑是生产者从不直接向队列发送任何消息。实际上,生产者通常根本
不知道消息是否会被通报到任何队列。
所以消息通报模式,发布者不需要指定队列。
发布/订阅模式交换机类型为Fanout。

- //发布者
- var factory = new ConnectionFactory { HostName = "localhost"};
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- //声明一个交换机,叫做logs,并且交换机的类型是Fanout
- channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
- var message = "publish_subscribe";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: "logs",
- routingKey: string.Empty,
- basicProperties: null,
- body: body);
- Console.WriteLine($" [x] Sent {message}");
- Console.WriteLine(" Press [enter] to exit.");
- //接收者
- var factory = new ConnectionFactory { HostName = "localhost"};
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
- //创建一个具有生成名称的非持久、独占、自动删除队列
- var queueName = channel.QueueDeclare().QueueName;
- channel.QueueBind(queue: queueName,
- exchange: "logs",
- routingKey: string.Empty);
- Console.WriteLine(" [*] Waiting for logs.");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- byte[] body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine($" [x] {message}");
- };
- channel.BasicConsume(queue: queueName,
- autoAck: false,
- consumer: consumer);
- Console.WriteLine(" Press [enter] to exit.");
复制代码 注:如果发布者已经发布消息到交换机,但还没有队列绑定到交换机,消息将会丢失。
4、路由模式
路由模式也是一种消息通报模式,是基于消息的路由键(routing key)来将消息从交换机
(exchange)发送到一个或多个队列中。相比较于发布/订阅模式,路由模式多了一个routing key
的概念。
路由模式交换机类型为Direct。

- //生产者
- var factory = new ConnectionFactory { HostName = "localhost"};
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- //定义交换机名称以及类型为Direct
- channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
- //定义路由键
- string routingKey = "direct_test";
- //发送消息体
- string message = "direct_message";
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: "direct_logs",
- routingKey: routingKey,
- basicProperties: null,
- body: body);
- Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
- Console.WriteLine(" Press [enter] to exit.");
- //消费者
- var factory = new ConnectionFactory { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
- //创建一个具有生成名称的非持久、独占、自动删除队列
- var queueName = channel.QueueDeclare().QueueName;
- //路由键集合
- var routeKeyArr = new string[] { "direct_test", "direct_test2" };
- foreach (var routeKey in routeKeyArr)
- {
- channel.QueueBind(queue: queueName,
- exchange: "direct_logs",
- routingKey: routeKey);
- }
- Console.WriteLine(" [*] Waiting for messages.");
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- var routingKey = ea.RoutingKey;
- Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
- };
- channel.BasicConsume(queue: queueName,
- autoAck: true,
- consumer: consumer);
- Console.WriteLine(" Press [enter] to exit.");
复制代码 路由模式,消费者可以监听多个路由键。
5、主题模式
基于路由模式,仍旧有局限性——它不能基于多个尺度举行路由。也就是一个消费者只能吸收完全
与routing key相匹配的交换机。主题模式重要办理路由模式的不足,可以模糊匹配routing key。
路由模式交换机类型为Topic。
在生产者方面,基于 . 作为分隔符,用于routing key。比如“stock.usd.nyse”、“nyse.vmw”、
“quick.orange.rabbit”。可以是任何单词,但最多只有255 个字节。
在消费者方面,绑定routing key有两种重要的情况:
(1)*(星号):匹配一个单词。
具体语法:
- var routeing_key = "info.debug.error";
- //匹配 info
- "info.*.*"
- //匹配debug
- "*.debug.*"
- //匹配error
- "*.*.error"
复制代码 (2)#(散列):匹配零个或多个单词。
具体语法:
- var routeing_key = "info.debug.error";
- //匹配 info
- "info.#"
- //匹配debug
- "#.debug.#"
- //匹配error
- "*.*.error"
复制代码 6、RPC模式
RPC模式又叫"请求/回复模式"。
RPC(Remote Procedure Call,远程过程调用)是一种用于在分布式体系中举行通信的技术。它
允许一个进程(或线程)调用另一个进程(或线程)的过程(函数或方法),就像调用本地函数一
样,而不需要开辟者显式处理底层通信细节。
(就是生产者发送一条消息,消费者端执行某个方法,获取值的同时,并返回到生产者。)

- //生产者
- var factory = new ConnectionFactory { HostName = "localhost"};
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- //定义接收返回结果的队列
- var replyQueueName = channel.QueueDeclare().QueueName;
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var response = Encoding.UTF8.GetString(body);
- Console.WriteLine(" [.] Got '{0}'", response);
- };
- //发送消息
- var correlationId = Guid.NewGuid().ToString(); //消息唯一性
- var props = channel.CreateBasicProperties();
- props.CorrelationId = correlationId;
- props.ReplyTo = replyQueueName; //回调队列名称
- string message = "30";
- var messageBytes = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(
- exchange: "",
- routingKey: "rpc_queue",
- basicProperties: props,
- body: messageBytes);
- channel.BasicConsume(
- consumer: consumer,
- queue: replyQueueName,
- autoAck: true);
- //消费者
- var factory = new ConnectionFactory { HostName = "localhost" };
- using var connection = factory.CreateConnection();
- using var channel = connection.CreateModel();
- channel.QueueDeclare(queue: "rpc_queue",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
- var consumer = new EventingBasicConsumer(channel);
- channel.BasicConsume(queue: "rpc_queue",
- autoAck: false,
- consumer: consumer);
- Console.WriteLine(" [x] Awaiting RPC requests");
- consumer.Received += (model, ea) =>
- {
- string response = string.Empty;
- var body = ea.Body.ToArray();
- var props = ea.BasicProperties;
- var replyProps = channel.CreateBasicProperties();
- replyProps.CorrelationId = props.CorrelationId;
- try
- {
- var message = Encoding.UTF8.GetString(body);
- int n = int.Parse(message);
- Console.WriteLine($" [.] Fib({message})");
- response = FibHelper.Fib(n).ToString();
- }
- catch (Exception e)
- {
- Console.WriteLine($" [.] {e.Message}");
- response = string.Empty;
- }
- finally
- {
- //回调到生产者队列
- var responseBytes = Encoding.UTF8.GetBytes(response);
- channel.BasicPublish(exchange: string.Empty,
- routingKey: props.ReplyTo,
- basicProperties: replyProps,
- body: responseBytes);
- channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- }
- };
- //执行函数,并返回结果
- public class FibHelper
- {
- public static int Fib(int n)
- {
- if (n == 0 || n == 1)
- return n;
- return Fib(n - 1) + Fib(n - 2);
- }
- }
复制代码 RPC模式不是消息通报模式,消息只会被一个消费者消费。
7、发布者确认模式
发布者确认模式(Publisher Confirmation)是 RabbitMQ 提供的一种机制,用于确保消息被乐成
发送到交换机(exchange)并被吸收到,以及确保消息被正确地路由到队列中。在传统的消息发
布过程中,发布者发送消息到交换机后,并不知道消息是否已经被正确地处理。为了办理这个问
题,RabbitMQ 提供了发布者确认模式,允许发布者确认消息是否已经被乐成吸收到。
- //生产者
- var factory = new ConnectionFactory() { HostName = "localhost" };
- using (var connection = factory.CreateConnection())
- using (var channel = connection.CreateModel())
- {
- // 设置信道为确认模式
- channel.ConfirmSelect();
- // 声明一个队列
- channel.QueueDeclare(queue: "hello",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // 消息内容
- string message = "Hello World!";
- var body = Encoding.UTF8.GetBytes(message);
- try
- {
- // 发送消息
- channel.BasicPublish(exchange: "",
- routingKey: "",
- basicProperties: null,
- body: body);
- // 等待消息确认
- if (channel.WaitForConfirms())
- {
- Console.WriteLine(" [x] Sent {0}", message);
- }
- else
- {
- Console.WriteLine(" [x] Failed to send {0}", message);
- }
- }
- catch (Exception ex)
- {
- Console.WriteLine($"An error occurred: {ex.Message}");
- }
- }
- Console.WriteLine(" Press [enter] to exit.");
- Console.ReadLine();
- //消费者
- var factory = new ConnectionFactory() { HostName = "localhost" };
- using (var connection = factory.CreateConnection())
- using (var channel = connection.CreateModel())
- {
- // 声明一个队列
- channel.QueueDeclare(queue: "hello",
- durable: false,
- exclusive: false,
- autoDelete: false,
- arguments: null);
- // 创建消费者
- var consumer = new EventingBasicConsumer(channel);
- // 消费消息
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var message = Encoding.UTF8.GetString(body);
- Console.WriteLine(" [x] Received {0}", message);
- };
- // 消费者开始接收消息
- channel.BasicConsume(queue: "hello",
- autoAck: true,
- consumer: consumer);
- Console.WriteLine(" Press [enter] to exit.");
- Console.ReadLine();
- }
复制代码 五、RabbitMQ相关属性描述
上述代码中,有许多属性的设置,下面表明一下。
名称
值
描述
durable
布尔
队列是否长期化。如果将队列声明为长期化,那么当 RabbitMQ 服务器重启时,队列将被重新声明。长期化队列中的消息会被存储在磁盘上,因此即使在服务器重启后,消息也不会丢失。
exclusive
布尔
是否为排他队列。当队列被声明为排他时,只有声明该队列的连接(connection)才能利用它。一旦连接关闭,排他队列就会被删除。这种队列通常是用于暂时任务,只允许声明它的连接利用,不会被其他连接访问。
autoDelete
布尔
队列是否自动删除。如果将队列声明为自动删除,则在末了一个连接订阅它的消费者取消订阅后,队列将被自动删除。这种属性通常与暂时队列一起利用,以确保在不再需要队列时它会被清理。
arguments
类
用于设置队列或交换机的额外参数的选项。这些参数可以用于定制化队列或交换机的行为,以满足特定的需求。比如,设置队列的最大长度、消息在队列中的最大存活时间。
autoAck
布尔
消息是否自动确认。它是指在消费者从队列中吸收到消息后,是否自动确认消息的消费。当设置为 true 时,表示消费者会自动确认收到的消息,此时队列中表示该消息已被消费乐成了。当设置为 false 时,表示消费者需要显式地调用确认方法来告知 RabbitMQ 已经乐成处理了消息,否则消息将被重新放回队列,等待其他消费者处理。
basicProperties
类
是指在发布消息时可以携带的消息属性。这些属性包罗了有关消息的元数据信息,例如消息的优先级、消息的过期时间、消息的类型等等。
总结
RabbitMQ 是一个消息队列,重要作用就是异步、次序性、削峰等。
七种队列模式,可以根据不同的场景具体利用。
1. 简单队列模式
最简单的消息模式。一个生产者发送消息到一个队列,一个消费者从队列中吸收消息并处理。实用
于单个生产者-单个消费者的简单场景。
2. 工作队列模式
多个消费者共同消费消息。消费者从队列中取出消息并处理,消息会均匀地分配给消费者。
是基于简单队列模式的缺点,做了提升。实用于负载均衡和任务分发的场景。
3. 发布/订阅模式
生产者将消息发送到交换机,交换机将消息广播到所有与之绑定的队列。多个消费者可以订阅不同
的队列,从而吸收消息的副本。实用于消息广播和关照的场景。
4. 路由模式
生产者发送消息到交换机,并利用路由键指定消息的目标队列。交换机根据消息的路由键将消息路
由到与之匹配的队列中。实用于根据消息内容举行精确路由的场景。
5. 主题模式
雷同于路由模式,但是路由键可以利用通配符举行匹配。实用于消息的多样化路由和机动的匹配需
求。
6. RPC模式
客户端(RPC请求者)发送请求消息到队列中,并等待服务器(RPC相应者)返回相应消息。
服务器监听请求队列,处理请求并将相应发送回客户端指定的队列。实用于需要请求-相应式通信
的场景,雷同于远程调用。
7. 发布者确认模式
发布者确认模式是 RabbitMQ 提供的一种机制,用于确保消息在发送到交换机并被路由到队列时的
可靠性。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |