详解RabbitMQ工作模式之RPC通信模式

打印 上一主题 下一主题

主题 2469|帖子 2469|积分 7407


目录

RPC通信模式
概述
工作流程
特点
应用场景
代码案例
引入依赖
常量类
编写客户端代码
编写服务端代码
运行步伐(先运行客户端,再运行服务端)


RPC通信模式

概述

   在RabbitMQ中,RPC模式通过消息队列实现长途调勤奋能。客户端(生产者)发送消息到消费队列,服务端(消费者)进行消息消费并实行相应的步伐,然后将效果发送到回调队列供客户端利用。这是一种双向的生产消费模式,其中客户端既是生产者又是消费者,服务端则专注于处理消息并生成响应。
  

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC长途调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.
工作流程


1.客户端发送请求:
   客户端连接到RabbitMQ服务器。
客户端声明一个用于发送RPC请求的队列(通常是固定的,如rpc_queue)。
客户端创建一个临时的回调队列,并在发送请求时,将回调队列的名称作为消息属性(reply_to)发送给交换机。
客户端为每个请求生成一个唯一的correlation_id,并将其作为消息属性发送,以便在接收响应时能够匹配请求与响应。
  2.交换机路由请求:
   交换机接收到RPC请求后,根据路由键将请求路由到服务端监听的队列。
  3.服务端处理请求:
   服务端(消费者)从队列中接收请求。
服务端处理请求,并生成响应。
服务端将响应发送到客户端指定的回调队列,并在消息属性中设置相同的correlation_id。
  4.客户端接收响应:
   客户端监听其回调队列以接收响应。
当接收到响应时,客户端检查correlation_id以确定响应是否与之前的请求匹配。
如果匹配,客户端处理响应;如果不匹配,客户端大概丢弃该响应。
  特点

   1.解耦:客户端和服务端之间不需要直接通信,低落了系统间的耦合度。
2.灵活性:支持多种语言和平台之间的长途调用。
3.可扩展性:通过增加服务端(消费者)的数量,可以轻松扩展RPC服务。
4.性能开销:由于涉及到网络传输和消息队列的处理,RPC调用的性能通常低于本地调用。
5.复杂性:需要处理消息队列的可靠性、持久性、消息确认等复杂问题。
6.安全性:长途调用大概面对更多的安全风险,如消息篡改、中间人攻击等。
  应用场景

   RabbitMQ的RPC通信模式适用于需要长途调用服务的场景,如分布式系统中的服务调用、微服务架构中的服务通信等。通过RabbitMQ的消息队列机制,可以实现跨系统、跨语言的长途调用,进步系统的灵活性和可扩展性。
  代码案例

引入依赖

  
  1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  2. <dependency>
  3.     <groupId>com.rabbitmq</groupId>
  4.     <artifactId>amqp-client</artifactId>
  5.     <version>5.21.0</version>
  6. </dependency>
复制代码
常量类

  1. public class Constants {
  2.     public static final String HOST = "47.98.109.138";
  3.     public static final int PORT = 5672;
  4.     public static final String USER_NAME = "study";
  5.     public static final String PASSWORD = "study";
  6.     public static final String VIRTUAL_HOST = "aaa";
  7.     //rpc 模式
  8.     public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
  9.     public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
  10. }
复制代码
编写客户端代码

  1. import com.rabbitmq.client.*;
  2. import rabbitmq.constant.Constants;
  3. import java.io.IOException;
  4. import java.util.UUID;
  5. import java.util.concurrent.ArrayBlockingQueue;
  6. import java.util.concurrent.BlockingQueue;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * rpc 客户端
  10. * 1. 发送请求
  11. * 2. 接收响应
  12. */
  13. public class RpcClient {
  14.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  15.         //1. 建立连接
  16.         ConnectionFactory connectionFactory = new ConnectionFactory();
  17.         connectionFactory.setHost(Constants.HOST);
  18.         connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  19.         connectionFactory.setUsername(Constants.USER_NAME);//账号
  20.         connectionFactory.setPassword(Constants.PASSWORD);  //密码
  21.         connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  22.         Connection connection = connectionFactory.newConnection();
  23.         //2. 开启信道
  24.         Channel channel = connection.createChannel();
  25.         channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
  26.         channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
  27.         //3. 发送请求
  28.         String msg = "hello rpc...";
  29.         //设置请求的唯一标识
  30.         String correlationID = UUID.randomUUID().toString();
  31.         //设置请求的相关属性
  32.         AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
  33.                 .correlationId(correlationID)
  34.                 .replyTo(Constants.RPC_RESPONSE_QUEUE)
  35.                 .build();
  36.         channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
  37.         //4. 接收响应
  38.         //使用阻塞队列, 来存储响应信息
  39.         final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
  40.         DefaultConsumer consumer = new DefaultConsumer(channel){
  41.             @Override
  42.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  43.                 String respMsg = new String(body);
  44.                 System.out.println("接收到回调消息: "+ respMsg);
  45.                 if (correlationID.equals(properties.getCorrelationId())){
  46.                     //如果correlationID校验一致
  47.                     response.offer(respMsg);
  48.                 }
  49.             }
  50.         };
  51.         channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
  52.         String result = response.take();
  53.         System.out.println("[RPC Client 响应结果]:"+ result);
  54.     }
  55. }
复制代码
编写服务端代码

  1. import com.rabbitmq.client.*;
  2. import rabbitmq.constant.Constants;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * RPC server
  7. * 1. 接收请求
  8. * 2. 发送响应
  9. */
  10. public class RpcServer {
  11.     public static void main(String[] args) throws IOException, TimeoutException {
  12.         //1. 建立连接
  13.         ConnectionFactory connectionFactory = new ConnectionFactory();
  14.         connectionFactory.setHost(Constants.HOST);
  15.         connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  16.         connectionFactory.setUsername(Constants.USER_NAME);//账号
  17.         connectionFactory.setPassword(Constants.PASSWORD);  //密码
  18.         connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  19.         Connection connection = connectionFactory.newConnection();
  20.         //2. 开启信道
  21.         Channel channel = connection.createChannel();
  22.         //3. 接收请求
  23.         channel.basicQos(1);
  24.         DefaultConsumer consumer = new DefaultConsumer(channel){
  25.             @Override
  26.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27.                 String request = new String(body,"UTF-8");
  28.                 System.out.println("接收到请求:"+ request);
  29.                 String response = "针对request:"+ request +", 响应成功";
  30.                 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
  31.                         .correlationId(properties.getCorrelationId())
  32.                         .build();
  33.                 channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
  34.                 channel.basicAck(envelope.getDeliveryTag(), false);
  35.             }
  36.         };
  37.         channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
  38.     }
  39. }
复制代码
运行步伐(先运行客户端,再运行服务端)

可以在管理界面看到其中一个队列中有1条消息



我们可以看到,服务端接收到了消息并给客户端发送了响应,与预期符合。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表