本文整理了 RabbitMQ 最常见和最重要的面试题,包括底子概念、工作原理、高可用架构、性能优化等方面的内容。
一、底子概念篇
1.1 什么是 RabbitMQ?其重要特点是什么?
RabbitMQ 是一个开源的消息代理和队列服务器,用来通过平凡协议在不同的应用之间共享数据。
重要特点:
- 支持多种消息协议(AMQP、MQTT、STOMP等)
- 支持多种语言客户端
- 提供可靠性消息投递模式
- 支持集群和高可用
- 提供管理界面
- 插件化机制
1.2 RabbitMQ 中的重要概念
- Producer(生产者):发送消息的应用步伐
- Consumer(消费者):吸收消息的应用步伐
- Queue(队列):存储消息的缓冲区
- Exchange(互换机):吸收生产者发送的消息,根据路由规则将消息路由到队列
- Binding(绑定):Exchange 和 Queue 之间的假造连接
- Routing Key(路由键):Exchange 根据路由键将消息路由到队列
1.3 Exchange 类型有哪些?
- // 声明 Direct Exchange
- channel.exchangeDeclare("direct_exchange", "direct");
- // 发送消息
- channel.basicPublish("direct_exchange", "routing_key", null, message.getBytes());
复制代码- // 声明 Fanout Exchange
- channel.exchangeDeclare("fanout_exchange", "fanout");
- // 发送消息(忽略 routing key)
- channel.basicPublish("fanout_exchange", "", null, message.getBytes());
复制代码- // 声明 Topic Exchange
- channel.exchangeDeclare("topic_exchange", "topic");
- // 发送消息
- channel.basicPublish("topic_exchange", "user.created", null, message.getBytes());
复制代码- // 声明 Headers Exchange
- channel.exchangeDeclare("headers_exchange", "headers");
- // 发送消息
- Map<String, Object> headers = new HashMap<>();
- headers.put("format", "pdf");
- headers.put("type", "report");
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .headers(headers)
- .build();
- channel.basicPublish("headers_exchange", "", properties, message.getBytes());
复制代码 二、高级特性篇
2.1 如何保证消息的可靠性投递?
- // 开启发布确认
- channel.confirmSelect();
- // 异步确认
- channel.addConfirmListener(new ConfirmListener() {
- public void handleAck(long deliveryTag, boolean multiple) {
- // 处理确认成功
- }
-
- public void handleNack(long deliveryTag, boolean multiple) {
- // 处理确认失败
- }
- });
复制代码- // 声明持久化队列
- channel.queueDeclare("durable_queue", true, false, false, null);
- // 发送持久化消息
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
- .deliveryMode(2) // 持久化消息
- .build();
- channel.basicPublish("", "durable_queue", properties, message.getBytes());
复制代码- // 关闭自动确认
- channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
- // 处理消息
- processMessage(body);
- // 手动确认
- channel.basicAck(envelope.getDeliveryTag(), false);
- } catch (Exception e) {
- // 消息处理失败,重新入队
- channel.basicNack(envelope.getDeliveryTag(), false, true);
- }
- }
- });
复制代码 2.2 如何实现消息的次序性?
- // 确保队列只有一个消费者
- channel.basicQos(1); // 限制每次只处理一条消息
复制代码- // 根据业务键将消息发送到不同队列
- String businessKey = message.getBusinessKey();
- int queueNumber = Math.abs(businessKey.hashCode() % QUEUE_COUNT);
- String queueName = "order_queue_" + queueNumber;
- channel.basicPublish("", queueName, null, message.getBytes());
复制代码 2.3 如那边理消息堆积问题?
- // 创建多个消费者
- for (int i = 0; i < consumerCount; i++) {
- Channel channel = connection.createChannel();
- channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
- // 消费者实现
- });
- }
复制代码- // 创建多个队列
- for (int i = 0; i < QUEUE_COUNT; i++) {
- channel.queueDeclare("queue_" + i, true, false, false, null);
- }
- // 消息分发
- int queueIndex = message.hashCode() % QUEUE_COUNT;
- channel.basicPublish("", "queue_" + queueIndex, null, message.getBytes());
复制代码 三、集群与高可用篇
3.1 RabbitMQ 集群有哪些模式?
- 队列数据只存在于单个节点
- 其他节点存储队列的元数据
- 性能好但可用性差
- # rabbitmq.conf
- cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
- cluster_formation.classic_config.nodes.1 = rabbit@node1
- cluster_formation.classic_config.nodes.2 = rabbit@node2
- cluster_formation.classic_config.nodes.3 = rabbit@node3
- # 设置镜像策略
- rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
复制代码 3.2 如何保证集群的可用性?
- # haproxy.cfg
- frontend rabbitmq_front
- bind *:5672
- mode tcp
- default_backend rabbitmq_back
- backend rabbitmq_back
- mode tcp
- balance roundrobin
- server rabbit1 10.0.0.1:5672 check
- server rabbit2 10.0.0.2:5672 check
- server rabbit3 10.0.0.3:5672 check
复制代码- # keepalived.conf
- vrrp_instance VI_1 {
- state MASTER
- interface eth0
- virtual_router_id 51
- priority 100
- virtual_ipaddress {
- 192.168.1.100
- }
- }
复制代码 四、性能优化篇
4.1 如何提高 RabbitMQ 的性能?
- // 设置预取值
- channel.basicQos(100);
复制代码- // 批量确认
- List<Long> deliveryTags = new ArrayList<>();
- channel.addConfirmListener(new ConfirmListener() {
- public void handleAck(long deliveryTag, boolean multiple) {
- if (multiple) {
- deliveryTags.removeIf(tag -> tag <= deliveryTag);
- } else {
- deliveryTags.remove(deliveryTag);
- }
- }
- });
复制代码- public class RabbitProducer {
- private final ExecutorService executorService =
- new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1000));
-
- public void sendMessage(final Message message) {
- executorService.submit(() -> {
- try {
- channel.basicPublish("exchange", "routingKey", null, message.getBytes());
- } catch (Exception e) {
- // 处理异常
- }
- });
- }
- }
复制代码 4.2 如何监控 RabbitMQ 的性能?
- # 启用管理插件
- rabbitmq-plugins enable rabbitmq_management
- # 访问管理界面
- http://localhost:15672
复制代码- // 使用 Prometheus 和 Grafana 监控
- rabbitmq_queue_messages_ready{queue="my_queue"}
- rabbitmq_queue_messages_unacknowledged{queue="my_queue"}
- rabbitmq_channel_consumers
复制代码 五、常见问题与解决方案
5.1 消息丢失问题
- try {
- channel.confirmSelect();
- channel.basicPublish("exchange", "routingKey", null, message.getBytes());
- if (!channel.waitForConfirms()) {
- // 消息发送失败,进行重试
- handlePublishFailure(message);
- }
- } catch (Exception e) {
- // 异常处理
- }
复制代码- channel.basicConsume(queueName, false, (consumerTag, delivery) -> {
- try {
- // 处理消息
- processMessage(delivery.getBody());
- // 确认消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- } catch (Exception e) {
- // 拒绝消息并重新入队
- channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
- }
- }, consumerTag -> {});
复制代码 5.2 死信队列处理
- // 声明死信交换机和队列
- Map<String, Object> args = new HashMap<>();
- args.put("x-dead-letter-exchange", "dlx.exchange");
- args.put("x-dead-letter-routing-key", "dlx.routing.key");
- channel.queueDeclare("original_queue", true, false, false, args);
- // 声明死信队列
- channel.queueDeclare("dlx.queue", true, false, false, null);
- channel.queueBind("dlx.queue", "dlx.exchange", "dlx.routing.key");
复制代码 5.3 延迟队列实现
- // 使用 TTL 和死信队列实现延迟队列
- Map<String, Object> args = new HashMap<>();
- args.put("x-message-ttl", 5000); // 5秒延迟
- args.put("x-dead-letter-exchange", "delay.exchange");
- args.put("x-dead-letter-routing-key", "delay.routing.key");
- channel.queueDeclare("delay_queue", true, false, false, args);
复制代码 六、面试题精选
6.1 核心问题
- RabbitMQ 如何保证消息不重复消费?
- RabbitMQ 如何保证消息的可靠性传输?
- RabbitMQ 的集群架构原理是什么?
- 如何解决消息队列的延时以及过期失效问题?
- 如何筹划一个消息队列?
6.2 实践问题
- 如那边理消息积压问题?
- 如何保证消息的次序性?
- 如那边理重复消息?
- 如何实现分布式变乱?
- 如何举行限流?
七、总结
本文涵盖了 RabbitMQ 的核心概念、高级特性、集群架构、性能优化等重要内容,这些都是面试中的重点观察内容。发起:
- 深入明白 RabbitMQ 的根本概念和工作原理
- 掌握消息可靠性投递的各种机制
- 相识集群架构和高可用方案
- 熟悉性能优化的方法
- 可以或许解决实际工作中遇到的各种问题
在面试中,不仅要可以或许答复这些问题,更重要的是要联合实际项目经验,说明在实践中是如何应用这些知识的。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |