RPC
1.界说
远程过程调用(remote procedure call)
2.概念
广义:全部通过网络进行通讯,的调用统称为RPC调用
狭义:不采用http协议的方式,采用自界说格式的二进制方式
3.优缺点
- 优点
- 服从高
- 发起rpc调用的一方,可以忽略RPC的具体实现,犹如编写当地函数调用
- 缺点
4.RPC结构
- client(caller):调用者
- client stub(bundle args/unbundle ret vals):客户端存根
- client network service
- server network service
- server stub(bundle ret vals/unbundle args)
5.RPC消息协议
5.1 消息界限
- 分隔符(\r\n)
- 长度声明法(例如HTTP中 Content-Length)
5.2 内容
5.3 压缩
- 压缩处理是一把双刃剑,淘汰数据量减轻带宽压力同时,额外增长了压缩和解压缩的时间
6.RPC的实现
6.1 divide_protocol.py
- import struct
- from io import BytesIO
- class InvalidOperation(Exception):
- ...
- class DivideProtocol(object):
- """
- float divide(1:int num1, 2:int num2=1)
- """
- def _read_all(self, size):
- """
- 读取指定长度的字节
- :param size: 长度
- :return: 读取出的二进制数据
- """
- if isinstance(self.conn, BytesIO):
- # BytesIO类型,用于演示
- buff = b''
- have = 0
- while have < size:
- chunk = self.conn.read(size - have)
- have += len(chunk)
- buff += chunk
- return buff
- else:
- # socket类型
- buff = b''
- have = 0
- while have < size:
- chunk = self.conn.recv(size - have)
- have += len(chunk)
- buff += chunk
- # 客户端关闭了连接
- if len(chunk) == 0:
- raise EOFError()
- return buff
- def args_encode(self, num1, num2=1):
- """
- 对调用参数进行编码
- :param num1: int
- :param num2: int
- :return: 编码后的二进制数据
- """
- # 处理参数num1, 4字节整型
- buff = struct.pack('!B', 1)
- buff += struct.pack('!i', num1)
- # 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中
- if num2 != 1:
- buff += struct.pack('!B', 2)
- buff += struct.pack('!i', num2)
- # 处理消息总长度,4字节无符号整型
- length = len(buff)
- # 处理方法名,字符串类型
- name = 'divide'
- # 字符串长度,4字节无符号整型
- msg = struct.pack('!I', len(name))
- msg += name.encode()
- msg += struct.pack('!I', length) + buff
- return msg
- def args_decode(self, connection):
- """
- 获取调用参数并进行解码
- :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
- :return: 解码后的参数字典
- """
- # 保存到当前对象中,供_read_all方式使用
- self.conn = connection
- param_name_map = {
- 1: 'num1',
- 2: 'num2'
- }
- param_len_map = {
- 1: 4,
- 2: 4
- }
- # 用于保存解码后的参数字典
- args = dict()
- # 读取消息总长度,4字无节符号整数
- buff = self._read_all(4)
- length = struct.unpack('!I', buff)[0]
- # 记录已读取的长度
- have = 0
- # 读取第一个参数,4字节整型
- buff = self._read_all(1)
- have += 1
- param_seq = struct.unpack('!B', buff)[0]
- param_len = param_len_map[param_seq]
- buff = self._read_all(param_len)
- have += param_len
- args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]
- if have >= length:
- return args
- # 读取第二个参数,4字节整型
- buff = self._read_all(1)
- have += 1
- param_seq = struct.unpack('!B', buff)[0]
- param_len = param_len_map[param_seq]
- buff = self._read_all(param_len)
- have += param_len
- args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]
- return args
- def result_encode(self, result):
- """
- 对调用的结果进行编码
- :param result: float 或 InvalidOperation对象
- :return: 编码后的二进制数据
- """
- if isinstance(result, float):
- # 没有异常,正常执行
- # 处理结果类型,1字节无符号整数
- buff = struct.pack('!B', 1)
- # 处理结果值, 4字节float
- buff += struct.pack('!f', result)
- else:
- # 发生了InvalidOperation异常
- # 处理结果类型,1字节无符号整数
- buff = struct.pack('!B', 2)
- # 处理异常结果值, 字符串
- # 处理字符串长度, 4字节无符号整数
- buff += struct.pack('!I', len(result.message))
- # 处理字符串内容
- buff += result.message.encode()
- return buff
- def result_decode(self, connection):
- """
- 对调用结果进行解码
- :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
- :return: 结果数据
- """
- self.conn = connection
- # 取出结果类型, 1字节无符号整数
- buff = self._read_all(1)
- result_type = struct.unpack('!B', buff)[0]
- if result_type == 1:
- # float的结果值, 4字节float
- buff = self._read_all(4)
- result = struct.unpack('!f', buff)[0]
- return result
- else:
- # InvalidOperation对象
- # 取出字符串长度, 4字节无符号整数
- buff = self._read_all(4)
- str_len = struct.unpack('!I', buff)[0]
- buff = self._read_all(str_len)
- message = buff.decode()
- return InvalidOperation(message)
- class MethodProtocol(object):
- def __init__(self, connection):
- self.conn = connection
- def _read_all(self, size):
- """
- 读取指定长度的字节
- :param size: 长度
- :return: 读取出的二进制数据
- """
- if isinstance(self.conn, BytesIO):
- # BytesIO类型,用于演示
- buff = b''
- have = 0
- while have < size:
- chunk = self.conn.read(size - have)
- have += len(chunk)
- buff += chunk
- return buff
- else:
- # socket类型
- buff = b''
- have = 0
- while have < size:
- print('have=%d size=%d' % (have, size))
- chunk = self.conn.recv(size - have)
- have += len(chunk)
- buff += chunk
- if len(chunk) == 0:
- raise EOFError()
- return buff
- def get_method_name(self):
- # 获取方法名
- # 读取字符串长度,4字节无符号整型
- buff = self._read_all(4)
- str_len = struct.unpack('!I', buff)[0]
- # 读取字符串
- buff = self._read_all(str_len)
- name = buff.decode()
- return name
复制代码 6.2 server.py
- import socket
- import threading
- from customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperation
- class Handlers:
- @staticmethod
- def divide(num1, num2=1):
- """
- 除法
- :param num1:
- :param num2:
- :return:
- """
- if num2 == 0:
- raise InvalidOperation()
- val = num1 / num2
- return val
- class ServerStub(object):
- def __init__(self, connection, handlers):
- """
- 服务器存根
- :param connection: 与客户端的socket连接
- :param handlers: 存放被调用的方法
- """
- self._process_map = {
- 'divide': self._process_divide,
- }
- self.conn = connection
- self.method_proto = MethodProtocol(self.conn)
- self.handlers = handlers
- def process(self):
- """
- 被服务器调用的入口,服务器收到请求后调用该方法
- """
- # 获取解析调用请求的方法名
- name = self.method_proto.get_method_name()
- # 调用对应的处理方法
- self._process_map[name]()
- def _process_divide(self):
- """
- 执行divide本地调用,并将结果返回给客户端
- """
- # 接收调用参数
- proto = DivideProtocol()
- args = proto.args_decode(self.conn)
- # 进行本地divide调用
- try:
- result = self.handlers.divide(**args)
- except InvalidOperation as e:
- result = e
- # 构造返回值消息并返回
- result = proto.result_encode(result)
- self.conn.sendall(result)
- class Server(object):
- def __init__(self, host, port, handlers):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.host = host
- self.port = port
- self.sock.bind((host, port))
- self.handlers = handlers
- def serve(self):
- """
- 开始服务
- """
- self.sock.listen(128)
- print("开始监听")
- while True:
- conn, addr = self.sock.accept()
- print("建立链接%s" % str(addr))
- stub = ServerStub(conn, self.handlers)
- try:
- while True:
- stub.process()
- except EOFError:
- print("客户端关闭连接")
- # 关闭服务端连接
- conn.close()
- class ThreadServer(object):
- def __init__(self, host, port, handlers):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.host = host
- self.port = port
- self.sock.bind((host, port))
- self.handlers = handlers
- def serve(self):
- """
- 开始服务
- """
- self.sock.listen(128)
- print("开始监听")
- while True:
- conn, addr = self.sock.accept()
- print("建立链接%s" % str(addr))
- t = threading.Thread(target=self.handle, args=(conn,))
- t.start()
- def handle(self, client):
- stub = ServerStub(client, self.handlers)
- try:
- while True:
- stub.process()
- except EOFError:
- print("客户端关闭连接")
- client.close()
- if __name__ == '__main__':
- server = Server('127.0.0.1', 8000, Handlers)
- server.serve()
复制代码 6.3 client.py
- import time
- import socket
- from customize_rpc.divide_protocol import DivideProtocol, InvalidOperation
- class Channel(object):
- """
- 连接通道
- """
- def __init__(self, host, port):
- self.host = host
- self.port = port
- def get_connection(self):
- """
- 获取一个tcp连接
- """
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((self.host, self.port))
- return sock
- class ClientStub(object):
- """
- 客户端存根
- """
- def __init__(self, channel: Channel):
- self.channel = channel
- self.conn = self.channel.get_connection()
- def divide(self, num1, num2=1):
- # 构造
- proto = DivideProtocol()
- args = proto.args_encode(num1, num2)
- self.conn.sendall(args)
- result = proto.result_decode(self.conn)
- if isinstance(result, InvalidOperation):
- raise result
- else:
- return result
- if __name__ == '__main__':
- channel = Channel('127.0.0.1', 8000)
- stub = ClientStub(channel)
- for i in range(5):
- try:
- val = stub.divide(i * 100, 10)
- except InvalidOperation as e:
- print(e.message)
- else:
- print(val)
- time.sleep(1)
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |