用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本1(Client端) ...

打印 上一主题 下一主题

主题 1749|帖子 1749|积分 5247

这里我们来实现这个RPC的client端
为了实现RPC的效果,我们调用的Hello方法,即server端的方法,应该是由署理来调用,让proxy内里封装网络请求,消息的发送和接受处置惩罚。而上一篇文章提到的服务端的署理已经在.rpc.go文件中实现,我们将客户端的实现也写在这里
ClientProxy

  1. // 客户端代理接口
  2. type HelloClientProxy interface  {
  3.         Hello(ctx context.Context, in *HelloRequest, opts ...client.Option) (*HelloReply, error)
  4. }
  5. // 客户端代理实现
  6. type HelloClientProxyImpl struct {
  7.         client client.Client
  8.         opts   []client.Option
  9. }
  10. // 创建客户端代理
  11. func NewHelloClientProxy(opts ...client.Option) HelloClientProxy {
  12.         return &HelloClientProxyImpl{
  13.                 client: client.DefaultClient,
  14.                 opts:   opts,
  15.         }
  16. }
复制代码


  • 这里的HelloClientProxyImpl此中的client类主要是负责invoke方法,抽象网络IO和编解码,opts主要是记录客户端启动时传入的配置项,如server的ip地址等
  • 创建出客户端署理,我们就可以通过署理来调用Hello方法
  1. // 实现Hello方法
  2. func (c *HelloClientProxyImpl) Hello(ctx context.Context, req *HelloRequest, opts ...client.Option) (*HelloReply, error) {
  3.         // 创建一个msg结构,存储service相关的数据,如serviceName等,并放到context中
  4.         // 用msg结构可以避免在context中太多withValue传递过多的参数
  5.        
  6.         msg := internel.NewMsg()
  7.         msg.WithServiceName("helloworld")
  8.         msg.WithMethodName("Hello")
  9.         ctx = context.WithValue(ctx, internel.ContextMsgKey, msg)
  10.         rsp := &HelloReply{}
  11.         // 这里需要将opts添加前面newProxy时传入的opts
  12.         newOpts := append(c.opts, opts...)
  13.         err := c.client.Invoke(ctx, req, rsp, newOpts...)
  14.         if err != nil {
  15.                 return nil, err
  16.         }
  17.         return rsp, nil
  18. }
复制代码


  • 这里需要明确service的名字和对应方法,为了后续封装在协议数据里,到达server端才能正确路由。当署理类实现了这个Hello后,我们就可以通过proxy.Hello得到相应结果,Invoke方法隐藏了详细的网络处置惩罚,我们跟进Invoke方法
Client(clientTransPort)

上文提到,client类主要处置惩罚invoke方法,我们可以预见它的职责就是,

  • 序列化请求体
  • 编码
  • 发送请求,接受响应
  • 解码
  • 反序列化响应体
  • 返回客户端
    为了代码的解耦,我们和server的处置惩罚一样,将以上操纵放到clientTransPort上,client持有transPort,让transPort处置惩罚以上的逻辑
  1. // 实现Send方法
  2. func (c *clientTransport) Send(ctx context.Context, reqBody interface{}, rspBody interface{}, opt *ClientTransportOption) error {
  3.         // 获取连接
  4.         // TODO 这里的连接后续可以优化从连接池获取
  5.         conn, err := net.Dial("tcp", opt.Address)
  6.         if err != nil {
  7.                 return err
  8.         }
  9.         defer conn.Close()
  10.         // reqbody序列化
  11.         reqData, err := codec.Marshal(reqBody)
  12.         if err != nil {
  13.                 return err
  14.         }
  15.         // reqbody编码,返回请求帧
  16.         framedata, err := opt.Codec.Encode(ctx, reqData)
  17.         if err != nil {
  18.                 return err
  19.         }
  20.         // 写数据到连接中
  21.         err = c.tcpWriteFrame(ctx, conn, framedata)
  22.         if err != nil {
  23.                 return err
  24.         }
  25.         // 读取tcp帧
  26.         rspDataBuf, err := c.tcpReadFrame(ctx, conn)
  27.         if err != nil {
  28.                 return err
  29.         }
  30.         // 获取msg
  31.         ctx, msg := internel.GetMessage(ctx)
  32.         // rspDataBuf解码,提取响应体数据
  33.         rspData, err := opt.Codec.Decode(msg, rspDataBuf)
  34.         if err != nil {
  35.                 return err
  36.         }
  37.         // 将rspData反序列化为rspBody
  38.     err = codec.Unmarshal(rspData, rspBody)
  39.         if err != nil {
  40.                 return err
  41.         }
  42.         return nil
  43. }
复制代码


  • 序列化是根据protobuf协议,编码的格式我们之间写Server的时间提到,我们需要将数据编码成以下格式:

  • 当编码完成后,我们就需要写数据到连接中,并监听该连接的数据,当有数据后,我们再依次解码得到响应体,再将响应体反序列化,返回客户端。
  • 写数据到连接和读连接中的数据也很简朴,这里我们直接开启一个连接,调用Write写,而codec.ReadFrame在server端的时间已经介绍过
  1. func (c *clientTransport) tcpWriteFrame(ctx context.Context, conn net.Conn, frame []byte) error {
  2.         // 写入tcp
  3.         _, err := conn.Write(frame)
  4.         if err != nil {
  5.                 return fmt.Errorf("write frame error: %v", err)
  6.         }
  7.         return nil
  8. }
  9. func (c *clientTransport) tcpReadFrame(ctx context.Context, conn net.Conn) ([]byte, error) {
  10.         return codec.ReadFrame(conn)
  11. }
复制代码
效果测试

至此client端处置惩罚完毕,我们来看看效果:
  1. //client端的测试main.go:
  2. func main() {
  3.         c := pb.NewHelloClientProxy(client.WithTarget("127.0.0.1:8000"))
  4.         if c == nil {
  5.                 fmt.Println("Failed to create client")
  6.                 return
  7.         }
  8.         rsp, err := c.Hello(context.Background(), &pb.HelloRequest{Msg: "world"})
  9.         if err != nil {
  10.                 fmt.Println("RPC call error:", err)
  11.                 return
  12.         }
  13.         fmt.Println("Response:", rsp.Msg)
  14. }
  15. // server端的测试的main.go
  16. func main() {
  17.         // Create a new server instance
  18.         s := server.NewServer()
  19.         // Register the HelloService with the server
  20.         pb.RegisterHelloServer(s, &HelloServerImpl{})
  21.         // Start the server on port 50051
  22.         if err := s.Serve(":8000"); err != nil {
  23.                 panic(err)
  24.         }
  25.         fmt.Print("启动成功")
  26. }
  27. // 创建一个HelloServer的实现类
  28. type HelloServerImpl struct{}
  29. // 实现HelloServer接口的Hello方法
  30. func (h *HelloServerImpl) Hello(req *pb.HelloRequest) (*pb.HelloReply, error) {
  31.         // 这里可以实现具体的业务逻辑
  32.         reply := &pb.HelloReply{
  33.                 Msg: "Hello " + req.Msg,
  34.         }
  35.         return reply, nil
  36. }
复制代码
server端启动:

server端吸收到client的连接请求:

client收到响应:

如今version1开发完了,现在的版本主要是实现底子功能,而且为了考虑后续的扩展性做了比力多的解耦,在后面的版本,我们可以徐徐提拔这个rpc的性能和融入更多的功能

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

十念

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表