道家人 发表于 2024-12-29 21:26:34

1.RPC基本原理

RPC

1.界说

远程过程调用(remote procedure call)
2.概念

广义:全部通过网络进行通讯,的调用统称为RPC调用
狭义:不采用http协议的方式,采用自界说格式的二进制方式
3.优缺点



[*]优点

[*]服从高
[*]发起rpc调用的一方,可以忽略RPC的具体实现,犹如编写当地函数调用

[*]缺点

[*]通用性不高

4.RPC结构



[*]client(caller):调用者
[*]client stub(bundle args/unbundle ret vals):客户端存根
[*]client network service
[*]server network service
[*]server stub(bundle ret vals/unbundle args)
5.RPC消息协议

5.1 消息界限



[*]分隔符(\r\n)
[*]长度声明法(例如HTTP中 Content-Length)
5.2 内容



[*]二进制
[*]文本内容
5.3 压缩



[*]压缩处理是一把双刃剑,淘汰数据量减轻带宽压力同时,额外增长了压缩和解压缩的时间
6.RPC的实现

6.1 divide_protocol.py

import struct
from io import BytesIO


class InvalidOperation(Exception):
    ...


class DivideProtocol(object):
    """
    float divide(1:int num1, 2:int num2=1)
    """

    def _read_all(self, size):
      """
      读取指定长度的字节
      :param size: 长度
      :return: 读取出的二进制数据
      """
      if isinstance(self.conn, BytesIO):
            # BytesIO类型,用于演示
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.read(size - have)
                have += len(chunk)
                buff += chunk
            return buff

      else:
            # socket类型
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.recv(size - have)
                have += len(chunk)
                buff += chunk
                # 客户端关闭了连接
                if len(chunk) == 0:
                  raise EOFError()
            return buff

    def args_encode(self, num1, num2=1):
      """
      对调用参数进行编码
      :param num1: int
      :param num2: int
      :return: 编码后的二进制数据
      """
      # 处理参数num1, 4字节整型
      buff = struct.pack('!B', 1)
      buff += struct.pack('!i', num1)

      # 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中
      if num2 != 1:
            buff += struct.pack('!B', 2)
            buff += struct.pack('!i', num2)

      # 处理消息总长度,4字节无符号整型
      length = len(buff)

      # 处理方法名,字符串类型
      name = 'divide'
      # 字符串长度,4字节无符号整型
      msg = struct.pack('!I', len(name))
      msg += name.encode()

      msg += struct.pack('!I', length) + buff

      return msg

    def args_decode(self, connection):
      """
      获取调用参数并进行解码
      :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
      :return: 解码后的参数字典
      """
      # 保存到当前对象中,供_read_all方式使用
      self.conn = connection
      param_name_map = {
            1: 'num1',
            2: 'num2'
      }
      param_len_map = {
            1: 4,
            2: 4
      }
      # 用于保存解码后的参数字典
      args = dict()

      # 读取消息总长度,4字无节符号整数
      buff = self._read_all(4)
      length = struct.unpack('!I', buff)

      # 记录已读取的长度
      have = 0

      # 读取第一个参数,4字节整型
      buff = self._read_all(1)
      have += 1
      param_seq = struct.unpack('!B', buff)
      param_len = param_len_map
      buff = self._read_all(param_len)
      have += param_len
      args] = struct.unpack('!i', buff)

      if have >= length:
            return args

      # 读取第二个参数,4字节整型
      buff = self._read_all(1)
      have += 1
      param_seq = struct.unpack('!B', buff)
      param_len = param_len_map
      buff = self._read_all(param_len)
      have += param_len
      args] = struct.unpack('!i', buff)

      return args

    def result_encode(self, result):
      """
      对调用的结果进行编码
      :param result: float 或 InvalidOperation对象
      :return: 编码后的二进制数据
      """
      if isinstance(result, float):
            # 没有异常,正常执行
            # 处理结果类型,1字节无符号整数
            buff = struct.pack('!B', 1)

            # 处理结果值, 4字节float
            buff += struct.pack('!f', result)
      else:
            # 发生了InvalidOperation异常
            # 处理结果类型,1字节无符号整数
            buff = struct.pack('!B', 2)

            # 处理异常结果值, 字符串
            # 处理字符串长度, 4字节无符号整数
            buff += struct.pack('!I', len(result.message))
            # 处理字符串内容
            buff += result.message.encode()

      return buff

    def result_decode(self, connection):
      """
      对调用结果进行解码
      :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
      :return: 结果数据
      """
      self.conn = connection

      # 取出结果类型, 1字节无符号整数
      buff = self._read_all(1)
      result_type = struct.unpack('!B', buff)
      if result_type == 1:
            # float的结果值, 4字节float
            buff = self._read_all(4)
            result = struct.unpack('!f', buff)
            return result
      else:
            # InvalidOperation对象
            # 取出字符串长度, 4字节无符号整数
            buff = self._read_all(4)
            str_len = struct.unpack('!I', buff)
            buff = self._read_all(str_len)
            message = buff.decode()
            return InvalidOperation(message)


class MethodProtocol(object):
    def __init__(self, connection):
      self.conn = connection

    def _read_all(self, size):
      """
      读取指定长度的字节
      :param size: 长度
      :return: 读取出的二进制数据
      """
      if isinstance(self.conn, BytesIO):
            # BytesIO类型,用于演示
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.read(size - have)
                have += len(chunk)
                buff += chunk

            return buff

      else:
            # socket类型
            buff = b''
            have = 0
            while have < size:
                print('have=%d size=%d' % (have, size))
                chunk = self.conn.recv(size - have)
                have += len(chunk)
                buff += chunk

                if len(chunk) == 0:
                  raise EOFError()

            return buff

    def get_method_name(self):
      # 获取方法名
      # 读取字符串长度,4字节无符号整型
      buff = self._read_all(4)
      str_len = struct.unpack('!I', buff)

      # 读取字符串
      buff = self._read_all(str_len)
      name = buff.decode()
      return name

6.2 server.py

import socket
import threading

from customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperation


class Handlers:
    @staticmethod
    def divide(num1, num2=1):
      """
      除法
      :param num1:
      :param num2:
      :return:
      """
      if num2 == 0:
            raise InvalidOperation()
      val = num1 / num2
      return val


class ServerStub(object):
    def __init__(self, connection, handlers):
      """
      服务器存根
      :param connection: 与客户端的socket连接
      :param handlers: 存放被调用的方法
      """
      self._process_map = {
            'divide': self._process_divide,
      }
      self.conn = connection
      self.method_proto = MethodProtocol(self.conn)
      self.handlers = handlers

    def process(self):
      """
      被服务器调用的入口,服务器收到请求后调用该方法
      """
      # 获取解析调用请求的方法名
      name = self.method_proto.get_method_name()

      # 调用对应的处理方法
      self._process_map()

    def _process_divide(self):
      """
      执行divide本地调用,并将结果返回给客户端
      """
      # 接收调用参数
      proto = DivideProtocol()
      args = proto.args_decode(self.conn)

      # 进行本地divide调用
      try:
            result = self.handlers.divide(**args)
      except InvalidOperation as e:
            result = e

      # 构造返回值消息并返回
      result = proto.result_encode(result)
      self.conn.sendall(result)


class Server(object):
    def __init__(self, host, port, handlers):
      self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      self.host = host
      self.port = port
      self.sock.bind((host, port))
      self.handlers = handlers

    def serve(self):
      """
      开始服务
      """
      self.sock.listen(128)
      print("开始监听")
      while True:
            conn, addr = self.sock.accept()
            print("建立链接%s" % str(addr))
            stub = ServerStub(conn, self.handlers)
            try:
                while True:
                  stub.process()
            except EOFError:
                print("客户端关闭连接")
            # 关闭服务端连接
            conn.close()


class ThreadServer(object):
    def __init__(self, host, port, handlers):
      self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
      self.host = host
      self.port = port
      self.sock.bind((host, port))
      self.handlers = handlers

    def serve(self):
      """
      开始服务
      """
      self.sock.listen(128)
      print("开始监听")
      while True:
            conn, addr = self.sock.accept()
            print("建立链接%s" % str(addr))
            t = threading.Thread(target=self.handle, args=(conn,))
            t.start()

    def handle(self, client):
      stub = ServerStub(client, self.handlers)
      try:
            while True:
                stub.process()
      except EOFError:
            print("客户端关闭连接")

      client.close()


if __name__ == '__main__':
    server = Server('127.0.0.1', 8000, Handlers)
    server.serve()   
6.3 client.py

import time
import socket

from customize_rpc.divide_protocol import DivideProtocol, InvalidOperation


class Channel(object):
    """
    连接通道
    """

    def __init__(self, host, port):
      self.host = host
      self.port = port

    def get_connection(self):
      """
      获取一个tcp连接
      """
      sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      sock.connect((self.host, self.port))
      return sock


class ClientStub(object):
    """
    客户端存根
    """

    def __init__(self, channel: Channel):
      self.channel = channel
      self.conn = self.channel.get_connection()

    def divide(self, num1, num2=1):
      # 构造
      proto = DivideProtocol()
      args = proto.args_encode(num1, num2)
      self.conn.sendall(args)
      result = proto.result_decode(self.conn)
      if isinstance(result, InvalidOperation):
            raise result
      else:
            return result


if __name__ == '__main__':
    channel = Channel('127.0.0.1', 8000)
    stub = ClientStub(channel)

    for i in range(5):
      try:
            val = stub.divide(i * 100, 10)
      except InvalidOperation as e:
            print(e.message)
      else:
            print(val)
      time.sleep(1)


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 1.RPC基本原理