1.RPC基本原理

打印 上一主题 下一主题

主题 850|帖子 850|积分 2550

RPC

1.界说

远程过程调用(remote procedure call)
2.概念

广义:全部通过网络进行通讯,的调用统称为RPC调用
狭义:不采用http协议的方式,采用自界说格式的二进制方式
3.优缺点



  • 优点

    • 服从高
    • 发起rpc调用的一方,可以忽略RPC的具体实现,犹如编写当地函数调用

  • 缺点

    • 通用性不高

4.RPC结构



  • client(caller):调用者
  • client stub(bundle args/unbundle ret vals):客户端存根
  • client network service
  • server network service
  • server stub(bundle ret vals/unbundle args)
5.RPC消息协议

5.1 消息界限



  • 分隔符(\r\n)
  • 长度声明法(例如HTTP中 Content-Length)
5.2 内容



  • 二进制
  • 文本内容
5.3 压缩



  • 压缩处理是一把双刃剑,淘汰数据量减轻带宽压力同时,额外增长了压缩和解压缩的时间
6.RPC的实现

6.1 divide_protocol.py

  1. import struct
  2. from io import BytesIO
  3. class InvalidOperation(Exception):
  4.     ...
  5. class DivideProtocol(object):
  6.     """
  7.     float divide(1:int num1, 2:int num2=1)
  8.     """
  9.     def _read_all(self, size):
  10.         """
  11.         读取指定长度的字节
  12.         :param size: 长度
  13.         :return: 读取出的二进制数据
  14.         """
  15.         if isinstance(self.conn, BytesIO):
  16.             # BytesIO类型,用于演示
  17.             buff = b''
  18.             have = 0
  19.             while have < size:
  20.                 chunk = self.conn.read(size - have)
  21.                 have += len(chunk)
  22.                 buff += chunk
  23.             return buff
  24.         else:
  25.             # socket类型
  26.             buff = b''
  27.             have = 0
  28.             while have < size:
  29.                 chunk = self.conn.recv(size - have)
  30.                 have += len(chunk)
  31.                 buff += chunk
  32.                 # 客户端关闭了连接
  33.                 if len(chunk) == 0:
  34.                     raise EOFError()
  35.             return buff
  36.     def args_encode(self, num1, num2=1):
  37.         """
  38.         对调用参数进行编码
  39.         :param num1: int
  40.         :param num2: int
  41.         :return: 编码后的二进制数据
  42.         """
  43.         # 处理参数num1, 4字节整型
  44.         buff = struct.pack('!B', 1)
  45.         buff += struct.pack('!i', num1)
  46.         # 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中
  47.         if num2 != 1:
  48.             buff += struct.pack('!B', 2)
  49.             buff += struct.pack('!i', num2)
  50.         # 处理消息总长度,4字节无符号整型
  51.         length = len(buff)
  52.         # 处理方法名,字符串类型
  53.         name = 'divide'
  54.         # 字符串长度,4字节无符号整型
  55.         msg = struct.pack('!I', len(name))
  56.         msg += name.encode()
  57.         msg += struct.pack('!I', length) + buff
  58.         return msg
  59.     def args_decode(self, connection):
  60.         """
  61.         获取调用参数并进行解码
  62.         :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
  63.         :return: 解码后的参数字典
  64.         """
  65.         # 保存到当前对象中,供_read_all方式使用
  66.         self.conn = connection
  67.         param_name_map = {
  68.             1: 'num1',
  69.             2: 'num2'
  70.         }
  71.         param_len_map = {
  72.             1: 4,
  73.             2: 4
  74.         }
  75.         # 用于保存解码后的参数字典
  76.         args = dict()
  77.         # 读取消息总长度,4字无节符号整数
  78.         buff = self._read_all(4)
  79.         length = struct.unpack('!I', buff)[0]
  80.         # 记录已读取的长度
  81.         have = 0
  82.         # 读取第一个参数,4字节整型
  83.         buff = self._read_all(1)
  84.         have += 1
  85.         param_seq = struct.unpack('!B', buff)[0]
  86.         param_len = param_len_map[param_seq]
  87.         buff = self._read_all(param_len)
  88.         have += param_len
  89.         args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]
  90.         if have >= length:
  91.             return args
  92.         # 读取第二个参数,4字节整型
  93.         buff = self._read_all(1)
  94.         have += 1
  95.         param_seq = struct.unpack('!B', buff)[0]
  96.         param_len = param_len_map[param_seq]
  97.         buff = self._read_all(param_len)
  98.         have += param_len
  99.         args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]
  100.         return args
  101.     def result_encode(self, result):
  102.         """
  103.         对调用的结果进行编码
  104.         :param result: float 或 InvalidOperation对象
  105.         :return: 编码后的二进制数据
  106.         """
  107.         if isinstance(result, float):
  108.             # 没有异常,正常执行
  109.             # 处理结果类型,1字节无符号整数
  110.             buff = struct.pack('!B', 1)
  111.             # 处理结果值, 4字节float
  112.             buff += struct.pack('!f', result)
  113.         else:
  114.             # 发生了InvalidOperation异常
  115.             # 处理结果类型,1字节无符号整数
  116.             buff = struct.pack('!B', 2)
  117.             # 处理异常结果值, 字符串
  118.             # 处理字符串长度, 4字节无符号整数
  119.             buff += struct.pack('!I', len(result.message))
  120.             # 处理字符串内容
  121.             buff += result.message.encode()
  122.         return buff
  123.     def result_decode(self, connection):
  124.         """
  125.         对调用结果进行解码
  126.         :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
  127.         :return: 结果数据
  128.         """
  129.         self.conn = connection
  130.         # 取出结果类型, 1字节无符号整数
  131.         buff = self._read_all(1)
  132.         result_type = struct.unpack('!B', buff)[0]
  133.         if result_type == 1:
  134.             # float的结果值, 4字节float
  135.             buff = self._read_all(4)
  136.             result = struct.unpack('!f', buff)[0]
  137.             return result
  138.         else:
  139.             # InvalidOperation对象
  140.             # 取出字符串长度, 4字节无符号整数
  141.             buff = self._read_all(4)
  142.             str_len = struct.unpack('!I', buff)[0]
  143.             buff = self._read_all(str_len)
  144.             message = buff.decode()
  145.             return InvalidOperation(message)
  146. class MethodProtocol(object):
  147.     def __init__(self, connection):
  148.         self.conn = connection
  149.     def _read_all(self, size):
  150.         """
  151.         读取指定长度的字节
  152.         :param size: 长度
  153.         :return: 读取出的二进制数据
  154.         """
  155.         if isinstance(self.conn, BytesIO):
  156.             # BytesIO类型,用于演示
  157.             buff = b''
  158.             have = 0
  159.             while have < size:
  160.                 chunk = self.conn.read(size - have)
  161.                 have += len(chunk)
  162.                 buff += chunk
  163.             return buff
  164.         else:
  165.             # socket类型
  166.             buff = b''
  167.             have = 0
  168.             while have < size:
  169.                 print('have=%d size=%d' % (have, size))
  170.                 chunk = self.conn.recv(size - have)
  171.                 have += len(chunk)
  172.                 buff += chunk
  173.                 if len(chunk) == 0:
  174.                     raise EOFError()
  175.             return buff
  176.     def get_method_name(self):
  177.         # 获取方法名
  178.         # 读取字符串长度,4字节无符号整型
  179.         buff = self._read_all(4)
  180.         str_len = struct.unpack('!I', buff)[0]
  181.         # 读取字符串
  182.         buff = self._read_all(str_len)
  183.         name = buff.decode()
  184.         return name
复制代码
6.2 server.py

  1. import socket
  2. import threading
  3. from customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperation
  4. class Handlers:
  5.     @staticmethod
  6.     def divide(num1, num2=1):
  7.         """
  8.         除法
  9.         :param num1:
  10.         :param num2:
  11.         :return:
  12.         """
  13.         if num2 == 0:
  14.             raise InvalidOperation()
  15.         val = num1 / num2
  16.         return val
  17. class ServerStub(object):
  18.     def __init__(self, connection, handlers):
  19.         """
  20.         服务器存根
  21.         :param connection: 与客户端的socket连接
  22.         :param handlers: 存放被调用的方法
  23.         """
  24.         self._process_map = {
  25.             'divide': self._process_divide,
  26.         }
  27.         self.conn = connection
  28.         self.method_proto = MethodProtocol(self.conn)
  29.         self.handlers = handlers
  30.     def process(self):
  31.         """
  32.         被服务器调用的入口,服务器收到请求后调用该方法
  33.         """
  34.         # 获取解析调用请求的方法名
  35.         name = self.method_proto.get_method_name()
  36.         # 调用对应的处理方法
  37.         self._process_map[name]()
  38.     def _process_divide(self):
  39.         """
  40.         执行divide本地调用,并将结果返回给客户端
  41.         """
  42.         # 接收调用参数
  43.         proto = DivideProtocol()
  44.         args = proto.args_decode(self.conn)
  45.         # 进行本地divide调用
  46.         try:
  47.             result = self.handlers.divide(**args)
  48.         except InvalidOperation as e:
  49.             result = e
  50.         # 构造返回值消息并返回
  51.         result = proto.result_encode(result)
  52.         self.conn.sendall(result)
  53. class Server(object):
  54.     def __init__(self, host, port, handlers):
  55.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  56.         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  57.         self.host = host
  58.         self.port = port
  59.         self.sock.bind((host, port))
  60.         self.handlers = handlers
  61.     def serve(self):
  62.         """
  63.         开始服务
  64.         """
  65.         self.sock.listen(128)
  66.         print("开始监听")
  67.         while True:
  68.             conn, addr = self.sock.accept()
  69.             print("建立链接%s" % str(addr))
  70.             stub = ServerStub(conn, self.handlers)
  71.             try:
  72.                 while True:
  73.                     stub.process()
  74.             except EOFError:
  75.                 print("客户端关闭连接")
  76.             # 关闭服务端连接
  77.             conn.close()
  78. class ThreadServer(object):
  79.     def __init__(self, host, port, handlers):
  80.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  81.         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  82.         self.host = host
  83.         self.port = port
  84.         self.sock.bind((host, port))
  85.         self.handlers = handlers
  86.     def serve(self):
  87.         """
  88.         开始服务
  89.         """
  90.         self.sock.listen(128)
  91.         print("开始监听")
  92.         while True:
  93.             conn, addr = self.sock.accept()
  94.             print("建立链接%s" % str(addr))
  95.             t = threading.Thread(target=self.handle, args=(conn,))
  96.             t.start()
  97.     def handle(self, client):
  98.         stub = ServerStub(client, self.handlers)
  99.         try:
  100.             while True:
  101.                 stub.process()
  102.         except EOFError:
  103.             print("客户端关闭连接")
  104.         client.close()
  105. if __name__ == '__main__':
  106.     server = Server('127.0.0.1', 8000, Handlers)
  107.     server.serve()   
复制代码
6.3 client.py

  1. import time
  2. import socket
  3. from customize_rpc.divide_protocol import DivideProtocol, InvalidOperation
  4. class Channel(object):
  5.     """
  6.     连接通道
  7.     """
  8.     def __init__(self, host, port):
  9.         self.host = host
  10.         self.port = port
  11.     def get_connection(self):
  12.         """
  13.         获取一个tcp连接
  14.         """
  15.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  16.         sock.connect((self.host, self.port))
  17.         return sock
  18. class ClientStub(object):
  19.     """
  20.     客户端存根
  21.     """
  22.     def __init__(self, channel: Channel):
  23.         self.channel = channel
  24.         self.conn = self.channel.get_connection()
  25.     def divide(self, num1, num2=1):
  26.         # 构造
  27.         proto = DivideProtocol()
  28.         args = proto.args_encode(num1, num2)
  29.         self.conn.sendall(args)
  30.         result = proto.result_decode(self.conn)
  31.         if isinstance(result, InvalidOperation):
  32.             raise result
  33.         else:
  34.             return result
  35. if __name__ == '__main__':
  36.     channel = Channel('127.0.0.1', 8000)
  37.     stub = ClientStub(channel)
  38.     for i in range(5):
  39.         try:
  40.             val = stub.divide(i * 100, 10)
  41.         except InvalidOperation as e:
  42.             print(e.message)
  43.         else:
  44.             print(val)
  45.         time.sleep(1)
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表