day1 服务端与消息编码

打印 上一主题 下一主题

主题 527|帖子 527|积分 1581

本文代码地址:
本文是7天用Go从零实现RPC框架GeeRPC的第一篇。


  • 使用 encoding/gob 实现消息的编解码(序列化与反序列化)
  • 实现一个浅易的服务端,仅接受消息,不处理,代码约 200 行
消息的序列化与反序列化

一个典型的 RPC 调用如下:
  1. err = client.Call("Arith.Multiply", args, &reply)
复制代码
客户端发送的哀求包括服务名 Arith,方法名 Multiply,参数 args 三个,服务端的响应包括错误 error,返回值 reply 2 个。我们将哀求和响应中的参数和返回值抽象为 body,剩余的信息放在 header 中,那么就可以抽象出数据布局 Header:
day1-codec/codec/codec.go
  1. package codec
  2. import "io"
  3. type Header struct {
  4.         ServiceMethod string // format "Service.Method"
  5.         Seq           uint64 // sequence number chosen by client
  6.         Error         string
  7. }
复制代码


  • ServiceMethod 是服务名和方法名,通常与 Go 语言中的布局体和方法相映射。
  • Seq 是哀求的序号,也可以认为是某个哀求的 ID,用来区分不同的哀求。
  • Error 是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于 Error 中。
我们将和消息编解码相关的代码都放到 codec 子目录中,在此之前,还需要在根目录下使用 go mod init geerpc 初始化项目,方便后续子 package 之间的引用。
进一步,抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例:
  1. type Codec interface {
  2.         io.Closer
  3.         ReadHeader(*Header) error
  4.         ReadBody(interface{}) error
  5.         Write(*Header, interface{}) error
  6. }
复制代码
紧接着,抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的 Type 得到构造函数,从而创建 Codec 实例。这部门代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。
  1. type NewCodecFunc func(io.ReadWriteCloser) Codec
  2. type Type string
  3. const (
  4.         GobType  Type = "application/gob"
  5.         JsonType Type = "application/json" // not implemented
  6. )
  7. var NewCodecFuncMap map[Type]NewCodecFunc
  8. func init() {
  9.         NewCodecFuncMap = make(map[Type]NewCodecFunc)
  10.         NewCodecFuncMap[GobType] = NewGobCodec
  11. }
复制代码
我们定义了 2 种 Codec,Gob 和 Json,但是现实代码中只实现了 Gob 一种,究竟上,2 者的实现非常靠近,甚至只需要把 gob 换成 json 即可。
首先定义 GobCodec 布局体,这个布局体由四部门构成,conn 是由构建函数传入,通常是通过 TCP 或者 Unix 建立 socket 时得到的链接实例,dec 和 enc 对应 gob 的 Decoder 和 Encoder,buf 是为了防止壅闭而创建的带缓冲的 Writer,一样平常这么做能提拔性能。
day1-codec/codec/gob.go
  1. package codec
  2. import (
  3.         "bufio"
  4.         "encoding/gob"
  5.         "io"
  6.         "log"
  7. )
  8. type GobCodec struct {
  9.         conn io.ReadWriteCloser
  10.         buf  *bufio.Writer
  11.         dec  *gob.Decoder
  12.         enc  *gob.Encoder
  13. }
  14. var _ Codec = (*GobCodec)(nil)
  15. func NewGobCodec(conn io.ReadWriteCloser) Codec {
  16.         buf := bufio.NewWriter(conn)
  17.         return &GobCodec{
  18.                 conn: conn,
  19.                 buf:  buf,
  20.                 dec:  gob.NewDecoder(conn),
  21.                 enc:  gob.NewEncoder(buf),
  22.         }
  23. }
复制代码
  在 Go 语言中,json.NewDecoder 和 json.Unmarshal 都用于将 JSON 数据解析为 Go中的数据布局,但它们有一些区别:
  

  • json.NewDecoder 是通过创建一个 Decoder 对象,从一个 io.Reader(如os.Stdin、文件、网络连接等)中读取 JSON 数据并进行解码。
  • json.Unmarshal 则是直接将 JSON 数据(以字节切片 []byte 或者字符串的情势)解析并映射到指定的数据布局。
  使用场景上,如果数据是从一个输入流中读取,通常使用 json.NewDecoder;如果已经有了 JSON 数据的字节切片或字符串,使用json.Unmarshal 会更方便。json.NewEncoder 和 json.Marshal 同理。
  接着实现 ReadHeader、ReadBody、Write 和 Close 方法。
  1. func (c *GobCodec) ReadHeader(h *Header) error {
  2.         return c.dec.Decode(h)
  3. }
  4. func (c *GobCodec) ReadBody(body interface{}) error {
  5.         return c.dec.Decode(body)
  6. }
  7. func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
  8.         defer func() {
  9.                 _ = c.buf.Flush()
  10.                 if err != nil {
  11.                         _ = c.Close()
  12.                 }
  13.         }()
  14.         if err := c.enc.Encode(h); err != nil {
  15.                 log.Println("rpc codec: gob error encoding header:", err)
  16.                 return err
  17.         }
  18.         if err := c.enc.Encode(body); err != nil {
  19.                 log.Println("rpc codec: gob error encoding body:", err)
  20.                 return err
  21.         }
  22.         return nil
  23. }
  24. func (c *GobCodec) Close() error {
  25.         return c.conn.Close()
  26. }
复制代码
通信过程

客户端与服务端的通信需要协商一些内容,比方 HTTP 报文,分为header和 body 2 部门,body 的格式和长度通过 header 中的 Content-Type 和 Content-Length 指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。对于 RPC 协议来说,这部门协商是需要自主设计的。为了提拔性能,一样平常在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来体现序列化方式,第2个字节体现压缩方式,第3-6字节体现 header 的长度,7-10 字节体现 body 的长度。
对于 GeeRPC 来说,现在需要协商的唯逐一项内容是消息的编解码方式。我们将这部门信息,放到布局体 Option 中承载。现在,已经进入到服务端的实现阶段了。
day1-codec/server.go
  1. package geerpc
  2. const MagicNumber = 0x3bef5c
  3. type Option struct {
  4.         MagicNumber int        // MagicNumber marks this's a geerpc request
  5.         CodecType   codec.Type // client may choose different Codec to encode body
  6. }
  7. var DefaultOption = &Option{
  8.         MagicNumber: MagicNumber,
  9.         CodecType:   codec.GobType,
  10. }
复制代码
一样平常来说,涉及协议协商的这部门信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定接纳 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的情势发送:
  1. | Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
  2. | <------      固定 JSON 编码      ------>  | <-------   编码方式由 CodeType 决定   ------->|
复制代码
在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个,即报文可能是这样的。
  1. | Option | Header1 | Body1 | Header2 | Body2 | ...
复制代码
服务端的实现

通信过程已经定义清晰了,那么服务端的实现就比力直接了。
day1-codec/server.go
  1. // Server represents an RPC Server.
  2. type Server struct{}
  3. // NewServer returns a new Server.
  4. func NewServer() *Server {
  5.         return &Server{}
  6. }
  7. // DefaultServer is the default instance of *Server.
  8. var DefaultServer = NewServer()
  9. // Accept accepts connections on the listener and serves requests
  10. // for each incoming connection.
  11. func (server *Server) Accept(lis net.Listener) {
  12.         for {
  13.                 conn, err := lis.Accept()
  14.                 if err != nil {
  15.                         log.Println("rpc server: accept error:", err)
  16.                         return
  17.                 }
  18.                 go server.ServeConn(conn)
  19.         }
  20. }
  21. // Accept accepts connections on the listener and serves requests
  22. // for each incoming connection.
  23. func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
复制代码


  • 首先定义告终构体 Server,没有任何的成员字段。
  • 实现了 Accept 方式,net.Listener 作为参数,for 循环等候 socket 连接建立,并开启子协程处理,处理过程交给了 ServerConn 方法。
  • DefaultServer 是一个默认的 Server 实例,主要为了用户使用方便。
如果想启动服务,过程黑白常简单的,传入 listener 即可,tcp 协议和 unix 协议都支持。
  1. lis, _ := net.Listen("tcp", ":9999")
  2. geerpc.Accept(lis)
复制代码
ServeConn 的实现就和之前讨论的通信过程紧密相关了,首先使用 json.NewDecoder 反序列化得到 Option 实例,查抄 MagicNumber 和 CodeType 的值是否精确。然后根据 CodeType 得到对应的消息编解码器,接下来的处理交给 serverCodec。
  1. // ServeConn runs the server on a single connection.
  2. // ServeConn blocks, serving the connection until the client hangs up.
  3. func (server *Server) ServeConn(conn io.ReadWriteCloser) {
  4.         defer func() { _ = conn.Close() }()
  5.         var opt Option
  6.         if err := json.NewDecoder(conn).Decode(&opt); err != nil {
  7.                 log.Println("rpc server: options error: ", err)
  8.                 return
  9.         }
  10.         if opt.MagicNumber != MagicNumber {
  11.                 log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
  12.                 return
  13.         }
  14.         f := codec.NewCodecFuncMap[opt.CodecType]
  15.         if f == nil {
  16.                 log.Printf("rpc server: invalid codec type %s", opt.CodecType)
  17.                 return
  18.         }
  19.         server.serveCodec(f(conn))
  20. }
  21. // invalidRequest is a placeholder for response argv when error occurs
  22. var invalidRequest = struct{}{}
  23. func (server *Server) serveCodec(cc codec.Codec) {
  24.         sending := new(sync.Mutex) // make sure to send a complete response
  25.         wg := new(sync.WaitGroup)  // wait until all request are handled
  26.         for {
  27.                 req, err := server.readRequest(cc)
  28.                 if err != nil {
  29.                         if req == nil {
  30.                                 break // it's not possible to recover, so close the connection
  31.                         }
  32.                         req.h.Error = err.Error()
  33.                         server.sendResponse(cc, req.h, invalidRequest, sending)
  34.                         continue
  35.                 }
  36.                 wg.Add(1)
  37.                 go server.handleRequest(cc, req, sending, wg)
  38.         }
  39.         wg.Wait()
  40.         _ = cc.Close()
  41. }
复制代码
serveCodec 的过程非常简单。主要包罗三个阶段


  • 读取哀求 readRequest
  • 处理哀求 handleRequest
  • 复兴哀求 sendResponse
之条件到过,在一次连接中,允许吸收多个哀求,即多个 request header 和 request body,因此这里使用了for无限制地等候哀求的到来,直到发生错误(比方连接被关闭,吸收到的报文有题目等),这里需要注意的点有三个:


  • handleRequest 使用了协程并发实行哀求。
  • 处理哀求是并发的,但是复兴哀求的报文必须是逐个发送的,并发容易导致多个复兴报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
  • 尽力而为,只有在 header 解析失败时,才终止循环。
  1. // request stores all information of a call
  2. type request struct {
  3.         h            *codec.Header // header of request
  4.         argv, replyv reflect.Value // argv and replyv of request
  5. }
  6. func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
  7.         var h codec.Header
  8.         if err := cc.ReadHeader(&h); err != nil {
  9.                 if err != io.EOF && err != io.ErrUnexpectedEOF {
  10.                         log.Println("rpc server: read header error:", err)
  11.                 }
  12.                 return nil, err
  13.         }
  14.         return &h, nil
  15. }
  16. func (server *Server) readRequest(cc codec.Codec) (*request, error) {
  17.         h, err := server.readRequestHeader(cc)
  18.         if err != nil {
  19.                 return nil, err
  20.         }
  21.         req := &request{h: h}
  22.         // TODO: now we don't know the type of request argv
  23.         // day 1, just suppose it's string
  24.         req.argv = reflect.New(reflect.TypeOf(""))
  25.         if err = cc.ReadBody(req.argv.Interface()); err != nil {
  26.                 log.Println("rpc server: read argv err:", err)
  27.         }
  28.         return req, nil
  29. }
  30. func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
  31.         sending.Lock()
  32.         defer sending.Unlock()
  33.         if err := cc.Write(h, body); err != nil {
  34.                 log.Println("rpc server: write response error:", err)
  35.         }
  36. }
  37. func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
  38.         // TODO, should call registered rpc methods to get the right replyv
  39.         // day 1, just print argv and send a hello message
  40.         defer wg.Done()
  41.         log.Println(req.h, req.argv.Elem())
  42.         req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
  43.         server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
  44. }
复制代码
现在还不能判定 body 的类型,因此在readRequest和 handleRequest 中,day1 将 body 作为字符串处理。吸收到哀求,打印 header,并复兴 geerpc resp ${req.h.Seq}。这一部门后续再实现。
main 函数(一个浅易的客户端)

day1 的内容就到此为止了,在这里我们已经实现了一个消息的编解码器 GobCodec,并且客户端与服务端实现了简单的协议交换(protocol exchange),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并复兴客户端的哀求。
接下来,我们就在 main 函数中看看如何使用刚实现的 GeeRPC 吧。
day1-codec/main/main.go
  1. package main
  2. import (
  3.         "encoding/json"
  4.         "fmt"
  5.         "geerpc"
  6.         "geerpc/codec"
  7.         "log"
  8.         "net"
  9.         "time"
  10. )
  11. func startServer(addr chan string) {
  12.         // pick a free port
  13.         l, err := net.Listen("tcp", ":0")
  14.         if err != nil {
  15.                 log.Fatal("network error:", err)
  16.         }
  17.         log.Println("start rpc server on", l.Addr())
  18.         addr <- l.Addr().String()
  19.         geerpc.Accept(l)
  20. }
  21. func main() {
  22.         addr := make(chan string)
  23.         go startServer(addr)
  24.         // in fact, following code is like a simple geerpc client
  25.         conn, _ := net.Dial("tcp", <-addr)
  26.         defer func() { _ = conn.Close() }()
  27.         time.Sleep(time.Second)
  28.         // send options
  29.         _ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
  30.         cc := codec.NewGobCodec(conn)
  31.         // send request & receive response
  32.         for i := 0; i < 5; i++ {
  33.                 h := &codec.Header{
  34.                         ServiceMethod: "Foo.Sum",
  35.                         Seq:           uint64(i),
  36.                 }
  37.                 _ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
  38.                 _ = cc.ReadHeader(h)
  39.                 var reply string
  40.                 _ = cc.ReadBody(&reply)
  41.                 log.Println("reply:", reply)
  42.         }
  43. }
复制代码


  • 在 startServer 中使用了信道 addr,确保服务端端口监听成功,客户端再发起哀求。
  • 客户端首先发送 Option 进行协议交换,接下来发送消息头 h := &codec.Header{},和消息体 geerpc req ${h.Seq}。
  • 最后解析服务端的响应 reply,并打印出来。
实行结果如下:
  1. start rpc server on [::]:63662
  2. &{Foo.Sum 0 } geerpc req 0
  3. reply: geerpc resp 0
  4. &{Foo.Sum 1 } geerpc req 1
  5. reply: geerpc resp 1
  6. &{Foo.Sum 2 } geerpc req 2
  7. reply: geerpc resp 2
  8. &{Foo.Sum 3 } geerpc req 3
  9. reply: geerpc resp 3
  10. &{Foo.Sum 4 } geerpc req 4
  11. reply: geerpc resp 4
复制代码
原文链接:https://geektutu.com/post/geerpc-day1.html

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

缠丝猫

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表