ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ 面试题精选:核心概念与实践总结 [打印本页]

作者: 知者何南    时间: 2024-12-29 07:17
标题: RabbitMQ 面试题精选:核心概念与实践总结
本文整理了 RabbitMQ 最常见和最重要的面试题,包括底子概念、工作原理、高可用架构、性能优化等方面的内容。
  一、底子概念篇

1.1 什么是 RabbitMQ?其重要特点是什么?

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过平凡协议在不同的应用之间共享数据。
重要特点:
1.2 RabbitMQ 中的重要概念

1.3 Exchange 类型有哪些?

  1. // 声明 Direct Exchange
  2. channel.exchangeDeclare("direct_exchange", "direct");
  3. // 发送消息
  4. channel.basicPublish("direct_exchange", "routing_key", null, message.getBytes());
复制代码
  1. // 声明 Fanout Exchange
  2. channel.exchangeDeclare("fanout_exchange", "fanout");
  3. // 发送消息(忽略 routing key)
  4. channel.basicPublish("fanout_exchange", "", null, message.getBytes());
复制代码
  1. // 声明 Topic Exchange
  2. channel.exchangeDeclare("topic_exchange", "topic");
  3. // 发送消息
  4. channel.basicPublish("topic_exchange", "user.created", null, message.getBytes());
复制代码
  1. // 声明 Headers Exchange
  2. channel.exchangeDeclare("headers_exchange", "headers");
  3. // 发送消息
  4. Map<String, Object> headers = new HashMap<>();
  5. headers.put("format", "pdf");
  6. headers.put("type", "report");
  7. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  8.     .headers(headers)
  9.     .build();
  10. channel.basicPublish("headers_exchange", "", properties, message.getBytes());
复制代码
二、高级特性篇

2.1 如何保证消息的可靠性投递?

  1. // 开启发布确认
  2. channel.confirmSelect();
  3. // 异步确认
  4. channel.addConfirmListener(new ConfirmListener() {
  5.     public void handleAck(long deliveryTag, boolean multiple) {
  6.         // 处理确认成功
  7.     }
  8.    
  9.     public void handleNack(long deliveryTag, boolean multiple) {
  10.         // 处理确认失败
  11.     }
  12. });
复制代码
  1. // 声明持久化队列
  2. channel.queueDeclare("durable_queue", true, false, false, null);
  3. // 发送持久化消息
  4. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  5.     .deliveryMode(2) // 持久化消息
  6.     .build();
  7. channel.basicPublish("", "durable_queue", properties, message.getBytes());
复制代码
  1. // 关闭自动确认
  2. channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
  3.     @Override
  4.     public void handleDelivery(String consumerTag, Envelope envelope,
  5.                              AMQP.BasicProperties properties, byte[] body) throws IOException {
  6.         try {
  7.             // 处理消息
  8.             processMessage(body);
  9.             // 手动确认
  10.             channel.basicAck(envelope.getDeliveryTag(), false);
  11.         } catch (Exception e) {
  12.             // 消息处理失败,重新入队
  13.             channel.basicNack(envelope.getDeliveryTag(), false, true);
  14.         }
  15.     }
  16. });
复制代码
2.2 如何实现消息的次序性?

  1. // 确保队列只有一个消费者
  2. channel.basicQos(1); // 限制每次只处理一条消息
复制代码
  1. // 根据业务键将消息发送到不同队列
  2. String businessKey = message.getBusinessKey();
  3. int queueNumber = Math.abs(businessKey.hashCode() % QUEUE_COUNT);
  4. String queueName = "order_queue_" + queueNumber;
  5. channel.basicPublish("", queueName, null, message.getBytes());
复制代码
2.3 如那边理消息堆积问题?

  1. // 创建多个消费者
  2. for (int i = 0; i < consumerCount; i++) {
  3.     Channel channel = connection.createChannel();
  4.     channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
  5.         // 消费者实现
  6.     });
  7. }
复制代码
  1. // 创建多个队列
  2. for (int i = 0; i < QUEUE_COUNT; i++) {
  3.     channel.queueDeclare("queue_" + i, true, false, false, null);
  4. }
  5. // 消息分发
  6. int queueIndex = message.hashCode() % QUEUE_COUNT;
  7. channel.basicPublish("", "queue_" + queueIndex, null, message.getBytes());
复制代码
三、集群与高可用篇

3.1 RabbitMQ 集群有哪些模式?


  1. # rabbitmq.conf
  2. cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
  3. cluster_formation.classic_config.nodes.1 = rabbit@node1
  4. cluster_formation.classic_config.nodes.2 = rabbit@node2
  5. cluster_formation.classic_config.nodes.3 = rabbit@node3
  6. # 设置镜像策略
  7. rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
复制代码
3.2 如何保证集群的可用性?

  1. # haproxy.cfg
  2. frontend rabbitmq_front
  3.     bind *:5672
  4.     mode tcp
  5.     default_backend rabbitmq_back
  6. backend rabbitmq_back
  7.     mode tcp
  8.     balance roundrobin
  9.     server rabbit1 10.0.0.1:5672 check
  10.     server rabbit2 10.0.0.2:5672 check
  11.     server rabbit3 10.0.0.3:5672 check
复制代码
  1. # keepalived.conf
  2. vrrp_instance VI_1 {
  3.     state MASTER
  4.     interface eth0
  5.     virtual_router_id 51
  6.     priority 100
  7.     virtual_ipaddress {
  8.         192.168.1.100
  9.     }
  10. }
复制代码
四、性能优化篇

4.1 如何提高 RabbitMQ 的性能?

  1. // 设置预取值
  2. channel.basicQos(100);
复制代码
  1. // 批量确认
  2. List<Long> deliveryTags = new ArrayList<>();
  3. channel.addConfirmListener(new ConfirmListener() {
  4.     public void handleAck(long deliveryTag, boolean multiple) {
  5.         if (multiple) {
  6.             deliveryTags.removeIf(tag -> tag <= deliveryTag);
  7.         } else {
  8.             deliveryTags.remove(deliveryTag);
  9.         }
  10.     }
  11. });
复制代码
  1. public class RabbitProducer {
  2.     private final ExecutorService executorService =
  3.         new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
  4.             new ArrayBlockingQueue<>(1000));
  5.             
  6.     public void sendMessage(final Message message) {
  7.         executorService.submit(() -> {
  8.             try {
  9.                 channel.basicPublish("exchange", "routingKey", null, message.getBytes());
  10.             } catch (Exception e) {
  11.                 // 处理异常
  12.             }
  13.         });
  14.     }
  15. }
复制代码
4.2 如何监控 RabbitMQ 的性能?

  1. # 启用管理插件
  2. rabbitmq-plugins enable rabbitmq_management
  3. # 访问管理界面
  4. http://localhost:15672
复制代码
  1. // 使用 Prometheus 和 Grafana 监控
  2. rabbitmq_queue_messages_ready{queue="my_queue"}
  3. rabbitmq_queue_messages_unacknowledged{queue="my_queue"}
  4. rabbitmq_channel_consumers
复制代码
五、常见问题与解决方案

5.1 消息丢失问题

  1. try {
  2.     channel.confirmSelect();
  3.     channel.basicPublish("exchange", "routingKey", null, message.getBytes());
  4.     if (!channel.waitForConfirms()) {
  5.         // 消息发送失败,进行重试
  6.         handlePublishFailure(message);
  7.     }
  8. } catch (Exception e) {
  9.     // 异常处理
  10. }
复制代码
  1. channel.basicConsume(queueName, false, (consumerTag, delivery) -> {
  2.     try {
  3.         // 处理消息
  4.         processMessage(delivery.getBody());
  5.         // 确认消息
  6.         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  7.     } catch (Exception e) {
  8.         // 拒绝消息并重新入队
  9.         channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
  10.     }
  11. }, consumerTag -> {});
复制代码
5.2 死信队列处理

  1. // 声明死信交换机和队列
  2. Map<String, Object> args = new HashMap<>();
  3. args.put("x-dead-letter-exchange", "dlx.exchange");
  4. args.put("x-dead-letter-routing-key", "dlx.routing.key");
  5. channel.queueDeclare("original_queue", true, false, false, args);
  6. // 声明死信队列
  7. channel.queueDeclare("dlx.queue", true, false, false, null);
  8. channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routing.key");
复制代码
5.3 延迟队列实现

  1. // 使用 TTL 和死信队列实现延迟队列
  2. Map<String, Object> args = new HashMap<>();
  3. args.put("x-message-ttl", 5000); // 5秒延迟
  4. args.put("x-dead-letter-exchange", "delay.exchange");
  5. args.put("x-dead-letter-routing-key", "delay.routing.key");
  6. channel.queueDeclare("delay_queue", true, false, false, args);
复制代码
六、面试题精选

6.1 核心问题

6.2 实践问题

七、总结

本文涵盖了 RabbitMQ 的核心概念、高级特性、集群架构、性能优化等重要内容,这些都是面试中的重点观察内容。发起:
在面试中,不仅要可以或许答复这些问题,更重要的是要联合实际项目经验,说明在实践中是如何应用这些知识的。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4