ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ高频面试题整理 [打印本页]

作者: 冬雨财经    时间: 2024-10-1 09:46
标题: RabbitMQ高频面试题整理
1、RabbitMQ怎样包管消息不丢失

RabbitMQ 提供了相应的解决方案:
1)confirm 消息确认机制 (生产者)

confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 confirm 模式发送消息时,它会期待RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。

使用方法:

2)消息持久化机制 (RabbitMQ 服务)

持久化机制是指将消息存储到磁盘,以包管在 RabbitMQ 服务器宕机或重启时,消息不会丢失使用方法:

注意事项:

3)ACK 事务机制(消费者)

ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认 (ACK)给 RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭举行手工发送 ACK。
使用方法:

注意事项:

2、RabbitMQ 中有哪几种交换机类型?

RabbitMQ 提供了5种不同类型的交换机,每种交换机都有其特定的路由逻辑:
1) Direct Exchange

Direct Exchange 根据消息的路由键(Routing Key)精确地将消息路由到队列。

  1. channel.exchangeDeclare("directExchange", "direct");
  2. channel.queueBind("queue1", "directExchange", "routingKey1");
  3. channel.queueBind("queue2", "directExchange", "routingKey2");
  4. // 发送消息
  5. channel.basicPublish("directExchange", "routingKey1", null, "Message to queue1".getBytes());
  6. channel.basicPublish("directExchange", "routingKey2", null, "Message to queue2".getBytes());
复制代码
2)Fanout Exchange

Fanout Exchange 将消息广播到绑定到该交换机的全部队列。

  1. channel.exchangeDeclare("fanoutExchange", "fanout");
  2. channel.queueBind("queue1", "fanoutExchange", "");
  3. channel.queueBind("queue2", "fanoutExchange", "");
  4. // 发送消息
  5. channel.basicPublish("fanoutExchange", "", null, "Broadcast Message".getBytes());
复制代码
3)Topic Exchange

Topic Exchange 根据消息的路由键模式(通常是带点号的字符串)将消息路由到匹配的队列。

  1. channel.exchangeDeclare("topicExchange", "topic");
  2. channel.queueBind("queue1", "topicExchange", "key1.*");
  3. channel.queueBind("queue2", "topicExchange", "key2.#");
  4. // 发送消息
  5. channel.basicPublish("topicExchange", "key1.test", null, "Message to queue1".getBytes());
  6. channel.basicPublish("topicExchange", "key2.test.sub", null, "Message to queue2".getBytes());
复制代码
4)Headers Exchange

Headers Exchange 根据消息的头属性(Headers)举行路由。与其他交换机不同,Headers Exchange 不使用路由键。

  1. Map<String, Object> headers = new HashMap<>();
  2. headers.put("header1", "value1");
  3. headers.put("header2", "value2");
  4. channel.exchangeDeclare("headersExchange", "headers");
  5. channel.queueBind("queue1", "headersExchange", "", new AMQP.BasicProperties.Builder().headers(headers).build());
  6. // 发送消息
  7. AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
  8. channel.basicPublish("headersExchange", "", props, "Message to queue1".getBytes());
复制代码
5) Default Exchange

Default Exchange 是 RabbitMQ 内置的一个隐式交换机,每个队列在创建时会自动绑定到这个交换机上,路由键为队列的名称。

  1. // 直接发送消息到名为 "queue1" 的队列
  2. channel.basicPublish("", "queue1", null, "Message to queue1".getBytes());
复制代码
综合使用:在实际应用中,可以根据需求选择符合的交换机类型,并结合多种类型的交换机举行复杂的消息路由和处理。
3、什么是AMQP?

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种用于消息传递的开放尺度协议,广泛用于消息队列和消息中间件系统中。RabbitMQ 是 AMQP 协议的一个实现。
AMQP 定义了一套尺度的消息传递机制,包罗以下几个核心组件:

AMQP 的工作原理:
AMQP 的消息传递流程可以概括为以下几个步调:

4、RabbitMQ中怎样解决消息堆积标题


1)增加消费者的数量

增加消费者的数量是解决消息堆积标题的直接方法。通过增加更多的消费者来并行处理消息,可以有用地提高消息的处理速度。
  1. // 示例:启动多个消费者
  2. for (int i = 0; i < 5; i++) {
  3.     new Thread(() -> {
  4.         // 消费者逻辑
  5.     }).start();
  6. }
复制代码
2)优化消费者的处理逻辑

优化消费者的处理逻辑,淘汰每条消息的处理时间,从而提高整体处理效率。这可以包罗:

3) 使用消息预取(Prefetch)机制

RabbitMQ 允许消费者设置预取值,控制消费者一次可以预取多少条消息。合理设置预取值可以确保消费者不会一次获取过多的消息,导致处理速度变慢。
  1. channel.basicQos(10); // 设置预取值为 10
复制代码
4)消息分发策略

使用符合的消息分发策略将消息均匀地分发到多个消费者。例如,可以使用轮询分发策略(Round-robin dispatching)来确保每个消费者都能公平地分配到消息。
5)消息优先级队列

使用消息优先级队列,确保高优先级的消息可以优先被处理,从而克制关键消息被低优先级消息淹没。
  1. Map<String, Object> args = new HashMap<String, Object>();
  2. args.put("x-max-priority", 10);
  3. channel.queueDeclare("priority_queue", true, false, false, args);
  4. // 发送带有优先级的消息
  5. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  6.     .priority(5)
  7.     .build();
  8. channel.basicPublish("", "priority_queue", properties, message.getBytes());
复制代码
6)分布式摆设和集群化

将 RabbitMQ 摆设在集群情况中,通过多个节点来分担消息处理压力。RabbitMQ 支持集群模式,可以水平扩展来处理大量消息。
7) 流量控制

使用 RabbitMQ 提供的流量控制机制(Flow Control)来限制生产者的消息发送速率,防止生产者过快地发送消息导致队列积蓄。
8)延长队列和死信队列

使用延长队列和死信队列处理无法立即处理的消息。通过设置消息的 TTL(Time To Live),可以将处理不了的消息重新入队,或转移到死信队罗列行后续处理。
9)监控和报警

建立完善的监控和报警系统,及时发现和处理消息堆积标题。可以使用 RabbitMQ 提供的管理插件或第三方监控工具(如 Prometheus、Grafana)来监控队列长度、消费者数量等关键指标。
5、RabbitMQ 是怎样实现死信队列的?

死信队列是 RabbitMQ 提供的一种特殊序列,处理那些无法被正常消费的消息。有三种情况会产存亡信:

在 RabbitMQ 中,实现死信队列只需要给正常队列增加三个核心参数即可:

接下来,就可以往正常队列中发送消息。如果消息满意了某些条件就会成为死信,并被重新发送到对应的死信队列中。而此时,RabbitMQ会在消息的头部添加一些与死信相干的补充信息,例如时间、成为死信的原因、原队列等。
应用程序可以按需处理这些补充的信息,最后,死信队列中的消息都是正常业务处理失败的消息,应用程序需要创建一个消费者来专门处理这些被遗漏的消息。例如记录日记、发送警报等。这样才能包管业务数据的完整性。
6、RabbitMQ中怎样包管消息不被重复消费

什么情况会导致消息被重复消费呢?

解决方案:
1)消息确认机制
启用消息确认机制,最好是手动确定。确保消费者成功处理消息后才将消息从队列中删除。

  1. // 手动确认消息
  2. channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
  3.     @Override
  4.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  5.         String message = new String(body, "UTF-8");
  6.         try {
  7.             // 处理消息
  8.             System.out.println(" [x] Received '" + message + "'");
  9.             // 确认消息已处理成功
  10.             channel.basicAck(envelope.getDeliveryTag(), false);
  11.         } catch (Exception e) {
  12.             // 处理失败,消息未被确认,将被重新投递
  13.             channel.basicNack(envelope.getDeliveryTag(), false, true);
  14.         }
  15.     }
  16. });
复制代码
2)消息重谋利制
使用消息重投(Requeue)机制,确保处理失败的消息重新进入队列,供其他消费者再次处理。
  1. catch (Exception e) {
  2.     // 处理失败,消息未被确认,将被重新投递
  3.     channel.basicNack(envelope.getDeliveryTag(), false, true);
  4. }
复制代码
3)消息幂等性设计
确保消费者在处理消息时具有幂等性,即无论雷同的消息被处理多少次,结果都是划一的。这可以通过以下方法实现:

  1. // 在发送消息时设置唯一ID
  2. AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
  3.     .messageId(UUID.randomUUID().toString())
  4.     .build();
  5. channel.basicPublish("", "my_queue", props, message.getBytes("UTF-8"));
复制代码
综合案例:
  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.HashSet;
  4. import java.util.Set;
  5. import java.util.concurrent.TimeoutException;
  6. public class RabbitMQConsumer {
  7.     private final static String QUEUE_NAME = "my_durable_queue";
  8.     private static Set<String> processedMessageIds = new HashSet<>();
  9.     public static void main(String[] argv) throws IOException, TimeoutException {
  10.         ConnectionFactory factory = new ConnectionFactory();
  11.         factory.setHost("localhost");
  12.         Connection connection = factory.newConnection();
  13.         Channel channel = connection.createChannel();
  14.         boolean durable = true;
  15.         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  16.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  17.         channel.basicQos(1); // 仅处理一个未确认的消息
  18.         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  19.             String message = new String(delivery.getBody(), "UTF-8");
  20.             String messageId = delivery.getProperties().getMessageId();
  21.             if (processedMessageIds.contains(messageId)) {
  22.                 // 已处理过的消息,直接确认
  23.                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  24.             } else {
  25.                 try {
  26.                     // 处理消息
  27.                     System.out.println(" [x] Received '" + message + "'");
  28.                     // 记录已处理的消息ID
  29.                     processedMessageIds.add(messageId);
  30.                     // 确认消息已处理成功
  31.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  32.                 } catch (Exception e) {
  33.                     // 处理失败,消息未被确认,将被重新投递
  34.                     channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
  35.                 }
  36.             }
  37.         };
  38.         channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  39.     }
  40. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4