【RabbitMQ】实现RPC通信的完整指南 [复制链接]
发表于 2025-5-21 04:35:28 | 显示全部楼层 |阅读模式
RPC 通信

RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上哀求服务,而不必要了解底层网络的技术


  • 类似 Http 远程调用
RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程



  • 注意

    • 没有生产者和斲丧者,取而代之的是客户端和服务器
    • reply_to:回调队列的名称
    • correlation_id:不能重复,用来确保哀求和相应是一对

  • 大概流程:

    • 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,服务端处理之后,会把相应结果发送到这个队列
    • 服务端收到哀求后,处理哀求并发送相应到 replyTo 指定的回调队列
    • 客户端再回调队列上等待相应消息,一旦收到相应,客户端会检查消息的 correlationID 属性,以确保它是所期望的相应

      • 等待相应消息,是通过一个阻塞队列来实现
      • 如果没有相应进来,就会一直阻塞。通过一个阻塞队列,来让其等待相应完成
      • 如果阻塞队列里面没有消息,就会一直等待,等到有消息为止



大致流程


  • 客户端:

    • 发送哀求(携带 replyTo、CorrelationID)
    • 接收相应(校验 correlationID)

  • 服务端:

    • 接收哀求,进行相应
    • 发送相应(按照客户端指定的 replyTo,设置 correlationID)

创建相干队列

  1. public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";  
  2.     public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";  
复制代码


  • 涉及到两个队列

    • 哀求队列
    • 相应队列

客户端代码

客户端代码主要流程如下:

  • 声明两个队列,包含回调队列 RPC_REQUEST_QUEUE,声明本次哀求的唯一标志 correlationID
  • 将 RPC_REQUEST_QUEUE 和 correlationID 配置到要发送的消息队列中
  • 利用阻塞队列来阻塞当前进程,监听回调队列中的消息,把哀求放到阻塞队列中
  • 利用阻塞队列有消息后,主线程被唤醒,打印返回内容
声明队列

  1. channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
  2. channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
复制代码
发送哀求

  1. //发送请求(使用内置交换机)  
  2. String msg = "hello rpc...";  
  3. //设置请求的唯一标识  
  4. String correlationID = UUID.randomUUID().toString();  
  5. //设置请求的相关属性  
  6. AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  
  7.         .correlationId(correlationID)  
  8.         .replyTo(Constants.RPC_RESPONSE_QUEUE)  
  9.         .build();  
  10. channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
复制代码
接收相应

利用阻塞队列,来存储回调结果
  1. // 接收响应  
  2. // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
  3. final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
  4. DefaultConsumer consumer = new DefaultConsumer(channel){  
  5.     //逻辑是比对 correlationID 是否一致  
  6.     @Override  
  7.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  8.         String respMsg = new String(body);  
  9.         System.out.println("接收到回调信息:" + respMsg);  
  10.         if (correlationID.equals(properties.getCorrelationId())) {  
  11.             // 如果 correlationID 校验一致,说明就是我们要的响应  
  12.             response.offer(respMsg);  
  13.         }  
  14.     }  
  15. };  
  16. channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  
  17. // 获取回调的结果
  18. String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
  19. System.out.println("[RPC Client 响应结果]: " + result);
复制代码
完整代码

  1. package rabbitmq.rpc;  
  2.   
  3. import com.rabbitmq.client.*;  
  4. import rabbitmq.constant.Constants;  
  5.   
  6. import java.io.IOException;  
  7. import java.util.UUID;  
  8. import java.util.concurrent.*;  
  9.   
  10. /**  
  11. * RPC Client * 1. 发送请求  
  12. * 2. 接收响应  
  13. */  
  14. public class RpcClient {  
  15.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
  16.         //1. 建立连接  
  17.         ConnectionFactory connectionFactory = new ConnectionFactory();  
  18.         connectionFactory.setHost(Constants.HOST);  
  19.         connectionFactory.setPort(Constants.PORT);  
  20.         connectionFactory.setUsername(Constants.USER_NAME);  
  21.         connectionFactory.setPassword(Constants.PASSWORD);  
  22.         connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  
  23.         Connection connection = connectionFactory.newConnection();  
  24.   
  25.         //2. 开启信道  
  26.         Channel channel = connection.createChannel();  
  27.   
  28.         //3. 声明队列  
  29.         channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
  30.         channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);  
  31.   
  32.         //4. 发送请求(使用内置交换机)  
  33.         String msg = "hello rpc...";  
  34.         //设置请求的唯一标识  
  35.         String correlationID = UUID.randomUUID().toString();  
  36.         //设置请求的相关属性  
  37.         AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  
  38.                 .correlationId(correlationID)  
  39.                 .replyTo(Constants.RPC_RESPONSE_QUEUE)  
  40.                 .build();  
  41.         channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());  
  42.   
  43.         //5. 接收响应  
  44.         // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
  45.         final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
  46.         DefaultConsumer consumer = new DefaultConsumer(channel){  
  47.             //逻辑是比对 correlationID 是否一致  
  48.             @Override  
  49.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  50.                 String respMsg = new String(body);  
  51.                 System.out.println("接收到回调信息:" + respMsg);  
  52.                 if (correlationID.equals(properties.getCorrelationId())) {  
  53.                     // 如果 correlationID 校验一致,说明就是我们要的响应  
  54.                     response.offer(respMsg);  
  55.                 }  
  56.             }  
  57.         };  
  58.         channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  
  59.         String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
  60.         System.out.println("[RPC Client 响应结果]: " + result);  
  61.     }  
  62. }
复制代码
服务端代码

服务端代码主要流程如下:

  • 接收消息
  • 根据消息内容进行相应处理,把应答结果返回到回调队列中
设置同时只能获取一个消息

  1. //设置一次只能接收一条消息  
  2. channel.basicQos(1);
复制代码
如果不设置 basicQos,RabbitMQ 会利用默认的 Qos 设置,其 prefetchCount 默认值为 0


  • 当 prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给斲丧者
  • 这意味着在默认情况下,斲丧者可能会同时接收到多条消息,但具体数目不是严酷保证的,可能会有所波动
在 RPC 模式下,同上期望的是一对一的消息处理,即一个哀求对应一个相应。斲丧者在处理完一个消息并确认之后,才会接收到下一条消息
接收消息

接收消息,并做出相应处理
  1. DefaultConsumer consumer = new DefaultConsumer(channel) {  
  2.     @Override  
  3.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  4.         String request = new String(body, "UTF-8");  
  5.         System.out.println("接收到请求:" + request);  
  6.         String responses = "针对 request:" + request + ",响应成功";  
  7.         AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  
  8.                 .correlationId(properties.getCorrelationId())  
  9.                 .build();  
  10.         channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  
  11.         channel.basicAck(envelope.getDeliveryTag(), false);  
  12.   
  13.     }  
  14. };  
  15. channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
复制代码
RabbitMQ 消息确定机制


  • 在 RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定斲丧者是否应该主动向消息对类确认消息

    • 主动确认(autoAck=true):消息对类在将消息发送给斲丧者之后,会立即从内存中删除该消息。这意味着,如果斲丧者处理消息失败,消息将丢失,由于消息队列认为消息已经被乐成斲丧
    • 手动确认(autoAck=false):消息队列在将消息发送给斲丧者之后,必要斲丧者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会不测丢失,适用于消息处理紧张且必要确保每个消息都被正确处理的场景

完整代码

  1. package rabbitmq.rpc;  
  2.   
  3. import com.rabbitmq.client.*;  
  4. import rabbitmq.constant.Constants;  
  5.   
  6. import java.io.IOException;  
  7. import java.util.concurrent.TimeoutException;  
  8.   
  9. /**  
  10. * RPC server * 1. 接收请求  
  11. * 2. 发送响应  
  12. */  
  13. public class RpcServer {  
  14.     public static void main(String[] args) throws IOException, TimeoutException {  
  15.   
  16.         //1. 建立连接  
  17.         ConnectionFactory connectionFactory = new ConnectionFactory();  
  18.         connectionFactory.setHost(Constants.HOST);  
  19.         connectionFactory.setPort(Constants.PORT);  
  20.         connectionFactory.setUsername(Constants.USER_NAME);  
  21.         connectionFactory.setPassword(Constants.PASSWORD);  
  22.         connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  
  23.         Connection connection = connectionFactory.newConnection();  
  24.   
  25.         //2. 开启信道  
  26.         Channel channel = connection.createChannel();  
  27.   
  28.         //3. 接收请求  
  29.         //设置一次只能接收一条消息  
  30.         channel.basicQos(1);  
  31.         DefaultConsumer consumer = new DefaultConsumer(channel) {  
  32.             @Override  
  33.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  34.                 String request = new String(body, "UTF-8");  
  35.                 System.out.println("接收到请求:" + request);  
  36.                 String responses = "针对 request:" + request + ",响应成功";  
  37.                 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  
  38.                         .correlationId(properties.getCorrelationId())  
  39.                         .build();  
  40.                 channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  
  41.                 channel.basicAck(envelope.getDeliveryTag(), false);  
  42.   
  43.             }  
  44.         };  
  45.         channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);  
  46.     }  
  47. }
复制代码
运行程序

启动客户端


启动服务端

运行服务端
  1. 接收到请求:hello rpc...
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表