笑看天下无敌手 发表于 2024-10-29 00:40:01

RabbitMQ核心架构

RabbitMQ架构设计 

https://i-blog.csdnimg.cn/direct/9ef37fb4ebec40b3b4f79f4fb7985221.png

[*] Producer:负责产生消息。
[*] Connection:RabbitMQ客户端和署理服务器之间的TCP连接。
[*] Channel:创建在连接之上的虚拟连接,RabbitMQ操作都是在信道中举行。
[*] Broker:一个Broker可以看做一个RabbitMQ服务节点大概服务实例。
[*] Exchange:生产者发送消息到互换器,互换器根据路由key投递到相应的队列。
[*] Queue:存储消息的队列 。
[*] RoutingKey:路由键,指定消息的路由规则。
[*] BindingKey:绑定键,关联互换器和队列。
[*] Consumer:消费消息。
路由机制


[*]Direct:默认方式,根据消息的路由键完全匹配队列的绑定键来分发消息。
[*]fanout:广播模式,将消息投递到所有绑定到互换器的队列。
[*]topic:使用含糊匹配的方式根据路由键将消息分发到不同的队列中,支持通配符(*和#)举行匹配。
[*]header:不依赖路由键,而是根据消息的头部信息来举行匹配和分发。
连接RabbitMQ

final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Connection 可以用来创建多个 Channel,但是 Channel 不能线程共享使用。channel 的开启有一个 isOpen 方法可以得知
com.rabbitmq.client.impl.ShutdownNotifierComponent#isOpen   
@Override
    public boolean isOpen() {
    synchronized(this.monitor) {
      return this.shutdownCause == null;
    }
}
生产者发送消息

https://i-blog.csdnimg.cn/direct/2eb87939442b4123a162c9bf42015e6a.png

[*]生产者连接到 RabbitMO Broker,创建一个连接(Connection),开启一个信道(Channel)
[*]生产者声明一个互换器,并设置相关属性,好比互换器范例、是否长期化等
[*]生产者声明一个队列并设置相关属性,好比是否排他、是否长期化、是否主动删除等
[*]生产者通过路由键将互换器和队列绑定起来
[*]生产者发送消息至 RabbitMO Broker,此中包罗路由键、互换器等信息
[*]相应的互换器根据接收到的路由键查找相匹配的队列。
[*]假如找到,则将从生产者发送过来的消息存入相应的队列中。
[*]假如没有找到,则根据生产者设置的属性选择丢弃照旧回退给生产者
[*]关闭信道。
[*]关闭连接。
互换器和队列

创建临时队列

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
final String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, routingKey);
上面创建一个长期化的、绑定范例为 direct 的互换器,同时也创建了一个非长期化的、排他的、主动删除的队列(队列名称由 RabbitMQ 主动生成)。
创建长期化队列

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(queue, EXCHANGE_NAME, routingKey);
分配一个固定的队列名称,并设置长期化、非排他的、非主动删除的队列
生产者和消费者都可以声明一个互换器或则队列,假如尝试声明一个已经存在的互换器或队列(只要声明的参数完全匹配已存在的互换器或队列),RabbitMQ 则什么都不做,直接返回成功。假如参数不匹配则会抛出异常。
创建互换器

public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments)
返回 Exchange.DeclareOk 标识成功声明了一个互换器


[*] exchange:互换器名称
[*] type:互换器范例;常见的有:fanout、direct、topic...
com.rabbitmq.client.BuiltinExchangeType 类界说了互换器范例
[*] durable:是否长期化
长期化将互换器存盘,服务重启时不会丢失相关信息
[*] autoDelete:是否主动删除;主动删除的前提是:至少有一个队列或则互换器与这个互换器绑定,之后所有与这个互换器绑定的队列或则互换器都与此解绑。
**留意:**这里主动删除,不是当连接断开时,主动删除这个互换器。
[*] internal:是否内置的;假如是内置的互换器,客户端步伐无法直接发送消息到这个互换器 中,只能通过互换器路由 到互换器这种方式。
[*] arguments:其他一些结构化参数
删除互换器

void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;


[*]exchange:互换器名称
[*]isUnused:设置为 true ,则只有互换器没有被使用时,才被删除。
创建队列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                           Map<String, Object> arguments) throws IOException;

[*] queue:队列名称
[*] durable:是否长期化
[*] exclusive:是否排他;当一个队列被声明为排他队列,该队列 仅对初次声明它 的连接可见,并在连接断开时主动删除。这里需要留意一点:就算是长期化的,一旦连接关闭,这个排他队列也会被主动删除。
[*] autoDelete:是否主动删除,与互换器界说一致;
[*] arguments:设置队列的其他一些参数
如 x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority
删除队列

Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
 队列绑定互换器

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;


[*]queue:队列名
[*]exchange:互换器名称
[*]routingKey:用来绑定队列和互换器的路由键
[*]arguments:界说绑定的一些参数
互换器与互换器绑定

https://i-blog.csdnimg.cn/direct/231b943b47ab48f0a01a25370b6937c4.png
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
 发送消息

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
    throws IOException;


[*]exchange:互换器名称,假如为空,则会发送到 RabbitMQ 默认的互换器中
[*]routingKey:路由键
[*]mandatory:mandatory 参数设为 true 时,互换器无法根据自身的范例和路由键找到一个符合条件的队列,那么 RabbitMQ会调用 Basic.Return 下令将消息返回给生产者 。当 mandatory
数设置为 false 时,出现上述环境,则消息直接被丢弃那么生产者怎样获取到没有被正确路由到合适队列的消息呢?这时间可以通过调用channel addReturnListener 来添加 ReturnListener 监昕器实现。
[*]props:消息的根本属性集
消费消息

RabbitMQ 消费模式分两种:


[*] Push:推模式;接纳 Basic.Consume 举行消费
[*] Pull:拉模式;则使用 Basic.Get 举行消费
消息分发

当RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin )的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常得当扩展,而且它是专门为并发步伐设计的。假如现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
默认环境下,假如有 个消费者,那么 RabbitMQ会将第 条消息分发给第 m%n (取余的方式)个消费者, RabbitMQ 不管消费者是否消费并己经确认 (Basic.Ack) 了消息。
假如某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因很快地处理完了所分配到的消息,进而进程空闲,这样就会造成团体应用吞吐量的下降。
这里就要用到 channel.basicQos(int prefetchCount) 这个方法,channel.basicQos 方法答应限定信道上的消费者所能保持的最大未确认消息的数目。例如在订阅消费队列之前,消费端步伐调用了 channel.basicQos(5) ,之后订阅了某个队枚举行消费。 RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,假如到达了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。
   Basic.Qos 的使用对于拉模式的消费方式无效.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ核心架构