ToB企服应用市场:ToB评测及商务社交产业平台

标题: Golang——RPC [打印本页]

作者: tsx81429    时间: 2024-6-15 00:50
标题: Golang——RPC
一. RPC简介


二. golang中如何实现RPC


例题:golang实现RPC程序,实现求矩形面积和周长。
服务器:
        注册服务,监听请求,处理请求。
  1. package main
  2. import (
  3.         "fmt"
  4.         "log"
  5.         "net/http"
  6.         "net/rpc"
  7. )
  8. // 接收参数
  9. type Params struct {
  10.         Length int
  11.         Heigth int
  12. }
  13. // 用于注册
  14. type Rect struct{}
  15. func (r *Rect) Area(p Params, ret *int) error {
  16.         fmt.Println("参数:", p.Heigth, p.Length)
  17.         *ret = p.Heigth * p.Length
  18.         return nil
  19. }
  20. func (r *Rect) Perimeter(p Params, ret *int) error {
  21.         fmt.Println("参数:", p.Heigth, p.Length)
  22.         *ret = 2 * (p.Heigth + p.Length)
  23.         return nil
  24. }
  25. func main() {
  26.         //1. 注册服务
  27.         rect := new(Rect)
  28.         rpc.Register(rect)
  29.         //2. 服务器处理绑定到http协议上
  30.         rpc.HandleHTTP()
  31.         //3. 监听服务
  32.         err := http.ListenAndServe(":8080", nil)
  33.         if err != nil {
  34.                 log.Panicln(err)
  35.         }
  36. }
复制代码
客户端:
        连接rpc服务,发送请求调用方法。
  1. package main
  2. import (
  3.         "fmt"
  4.         "log"
  5.         "net/rpc"
  6. )
  7. // 参数字段要和服务器的一样
  8. type Params struct {
  9.         Length int
  10.         Heigth int
  11. }
  12. func main() {
  13.         conn, err := rpc.DialHTTP("tcp", ":8080")
  14.         if err != nil {
  15.                 log.Fatal(err)
  16.         }
  17.         ret := 0
  18.         p := Params{
  19.                 20,
  20.                 30,
  21.         }
  22.         //调用服务器的方法
  23.         err = conn.Call("Rect.Area", p, &ret)
  24.         if err != nil {
  25.                 log.Fatal(err)
  26.         }
  27.         fmt.Println("面积", ret)
  28.         err = conn.Call("Rect.Perimeter", p, &ret)
  29.         if err != nil {
  30.                 log.Fatal(err)
  31.         }
  32.         fmt.Println("周长", ret)
  33. }
复制代码

   别的,net/rpc/jsonrpc库通过json格式编解码,支持跨语言调用。
  服务端:
  1. package main
  2. import (
  3.         "fmt"
  4.         "log"
  5.         "net"
  6.         "net/rpc"
  7.         "net/rpc/jsonrpc"
  8. )
  9. // 接收参数
  10. type Params struct {
  11.         A int
  12.         B int
  13. }
  14. type Rect struct{}
  15. func (r *Rect) Area(p Params, ret *int) error {
  16.         *ret = p.A * p.B
  17.         return nil
  18. }
  19. func (r *Rect) Perimeter(p Params, ret *int) error {
  20.         *ret = 2 * (p.A + p.B)
  21.         return nil
  22. }
  23. func main() {
  24.         //注册服务
  25.         rpc.Register(new(Rect))
  26.         //监听连接
  27.         l, err := net.Listen("tcp", ":8080")
  28.         if err != nil {
  29.                 log.Fatal(err)
  30.         }
  31.         for {
  32.                 //接收客户端连接
  33.                 conn, err := l.Accept()
  34.                 if err != nil {
  35.                         continue
  36.                 }
  37.                 //创建协程处理请求
  38.                 go func(conn net.Conn) {
  39.                         fmt.Println("get a new client")
  40.                         //只是处理服务,没有监听和接收请求
  41.                         jsonrpc.ServeConn(conn)
  42.                 }(conn)
  43.         }
  44. }
复制代码
客户端:
  1. package main
  2. import (
  3.         "fmt"
  4.         "log"
  5.         "net/rpc/jsonrpc"
  6. )
  7. // 参数字段要和服务器的一样
  8. type Params struct {
  9.         A int
  10.         B int
  11. }
  12. func main() {
  13.         //远程连接rpc服务
  14.         conn, err := jsonrpc.Dial("tcp", ":8080")
  15.         if err != nil {
  16.                 log.Fatal(err)
  17.         }
  18.         //调用方法
  19.         ret := 0
  20.         err = conn.Call("Rect.Area", Params{10, 20}, &ret)
  21.         if err != nil {
  22.                 log.Fatal(err)
  23.         }
  24.         fmt.Println("ret=", ret)
  25.         err = conn.Call("Rect.Perimeter", Params{10, 20}, &ret)
  26.         if err != nil {
  27.                 log.Fatal(err)
  28.         }
  29.         fmt.Println("ret=", ret)
  30. }
复制代码
三.  RPC调用流程


我理解的微服务(RPC),将项目中须要的功能注册成一个服务,客户端可以直接调用。
四. 网络传输数据格式




  1. package rpc
  2. import (
  3.         "encoding/binary"
  4.         "fmt"
  5.         "io"
  6.         "net"
  7. )
  8. type Session struct {
  9.         Conn net.Conn
  10. }
  11. func (s *Session) Write(data []byte) error {
  12.         //定义写数据格式
  13.         //4字节包头 + 变长数据
  14.         buf := make([]byte, 4+len(data))
  15.         //写入头部
  16.         binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
  17.         //将有效数据写到头部后面
  18.         copy(buf[4:], data)
  19.         //发送
  20.         _, err := s.Conn.Write(buf)
  21.         if err != nil {
  22.                 return err
  23.         }
  24.         return nil
  25. }
  26. func (s *Session) Read() ([]byte, error) {
  27.         //读取头部
  28.         header := make([]byte, 4)
  29.         //按长度读取数据
  30.         _, err := io.ReadFull(s.Conn, header)
  31.         if err != nil {
  32.                 return nil, err
  33.         }
  34.         //读取数据,报头保存的是有效数据长度
  35.         dataLen := binary.BigEndian.Uint32(header)
  36.         fmt.Println(dataLen)
  37.         data := make([]byte, dataLen)
  38.         _, err = io.ReadFull(s.Conn, data)
  39.         if err != nil {
  40.                 return nil, err
  41.         }
  42.         return data, nil
  43. }
复制代码
测试代码:
  1. package rpc
  2. import (
  3.         "errors"
  4.         "fmt"
  5.         "net"
  6.         "sync"
  7.         "testing"
  8. )
  9. func TestSession_ReadWrite(t *testing.T) {
  10.         //地址和数据
  11.         addr := "127.0.0.1:8080"
  12.         my_data := "hello"
  13.         //等待组
  14.         wg := sync.WaitGroup{}
  15.         wg.Add(2)
  16.         go func() {
  17.                 //监听
  18.                 defer wg.Done()
  19.                 l, err := net.Listen("tcp", addr)
  20.                 if err != nil {
  21.                         t.Fatal(err)
  22.                 }
  23.                 //接收连接
  24.                 conn, err := l.Accept()
  25.                 if err != nil {
  26.                         t.Fatal(err)
  27.                 }
  28.                 //写数据
  29.                 s := Session{Conn: conn}
  30.                 err = s.Write([]byte(my_data))
  31.                 if err != nil {
  32.                         t.Fatal(err)
  33.                 }
  34.         }()
  35.         go func() {
  36.                 defer wg.Done()
  37.                 conn, err := net.Dial("tcp", addr)
  38.                 if err != nil {
  39.                         t.Fatal(err)
  40.                 }
  41.                 s := Session{Conn: conn}
  42.                 data, err := s.Read()
  43.                 if err != nil {
  44.                         t.Fatal(err)
  45.                 }
  46.                 //校验
  47.                 if string(data) != my_data {
  48.                         t.Fatal(errors.New("数据错误"))
  49.                 }
  50.                 fmt.Println(string(data))
  51.         }()
  52.         wg.Wait()
  53. }
复制代码

 编码息争码:
  1. func Encode(data RPCData) ([]byte, error) {
  2.         //得到字节数组编码器
  3.         var buf bytes.Buffer
  4.         buffEnc := gob.NewEncoder(&buf)
  5.         //编码器对数据编码
  6.         if err := buffEnc.Encode(data); err != nil {
  7.                 return nil, err
  8.         }
  9.         return buf.Bytes(), nil
  10. }
  11. func Decode(data []byte) (RPCData, error) {
  12.         buff := bytes.NewBuffer(data)
  13.         //得到字节数组解码器
  14.         buffDec := gob.NewDecoder(buff)
  15.         //解码器对数据解码
  16.         var res RPCData
  17.         if err := buffDec.Decode(&res); err != nil {
  18.                 return res, err
  19.         }
  20.         return res, nil
  21. }
复制代码
五. 实现RPC服务端

        要实现的一个功能为,接收到客户端发过来需要调用的函数和参数,实现对应函数的调用,将效果返回。

  1. package main
  2. import (
  3.         "bytes"
  4.         "encoding/binary"
  5.         "encoding/gob"
  6.         "errors"
  7.         "fmt"
  8.         "io"
  9.         "log"
  10.         "net"
  11.         "reflect"
  12. )
  13. // 定义交互的数据结构
  14. type RPCData struct {
  15.         //访问的函数
  16.         Name string
  17.         //参数
  18.         Args []interface{}
  19. }
  20. func encode(data RPCData) ([]byte, error) {
  21.         //得到字节数组编码器
  22.         var buf bytes.Buffer
  23.         gob.Register(User{})
  24.         buffEnc := gob.NewEncoder(&buf)
  25.         //编码器对数据编码
  26.         if err := buffEnc.Encode(data); err != nil {
  27.                 return nil, err
  28.         }
  29.         return buf.Bytes(), nil
  30. }
  31. func decode(data []byte) (RPCData, error) {
  32.         buff := bytes.NewBuffer(data)
  33.         //得到字节数组解码器
  34.         gob.Register(User{})
  35.         buffDec := gob.NewDecoder(buff)
  36.         //解码器对数据解码
  37.         var res RPCData
  38.         if err := buffDec.Decode(&res); err != nil {
  39.                 return res, err
  40.         }
  41.         return res, nil
  42. }
  43. // 发送和接收数据结构
  44. type Session struct {
  45.         Conn net.Conn
  46. }
  47. func NewSession(conn net.Conn) *Session {
  48.         return &Session{Conn: conn}
  49. }
  50. func (s *Session) Write(data []byte) error {
  51.         //定义写数据格式
  52.         //4字节包头 + 变长数据
  53.         buf := make([]byte, 4+len(data))
  54.         //写入头部
  55.         binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
  56.         //将有效数据写到头部后面
  57.         copy(buf[4:], data)
  58.         //发送
  59.         _, err := s.Conn.Write(buf)
  60.         if err != nil {
  61.                 return err
  62.         }
  63.         return nil
  64. }
  65. func (s *Session) Read() ([]byte, error) {
  66.         //读取头部
  67.         header := make([]byte, 4)
  68.         //按长度读取数据
  69.         _, err := io.ReadFull(s.Conn, header)
  70.         if err != nil {
  71.                 return nil, err
  72.         }
  73.         //读取数据,报头保存的是有效数据长度
  74.         dataLen := binary.BigEndian.Uint32(header)
  75.         fmt.Println(dataLen)
  76.         data := make([]byte, dataLen)
  77.         _, err = io.ReadFull(s.Conn, data)
  78.         if err != nil {
  79.                 return nil, err
  80.         }
  81.         return data, nil
  82. }
  83. // 服务器
  84. type Server struct {
  85.         //访问地址
  86.         addr string
  87.         //维护函数map
  88.         funcs map[string]reflect.Value
  89. }
  90. func NewServer(addr string) *Server {
  91.         return &Server{addr: addr, funcs: make(map[string]reflect.Value)}
  92. }
  93. func (s *Server) Register(key string, value interface{}) error {
  94.         if _, ok := s.funcs[key]; ok {
  95.                 return errors.New(key + "已存在")
  96.         }
  97.         s.funcs[key] = reflect.ValueOf(value)
  98.         return nil
  99. }
  100. func (s *Server) Run() {
  101.         //监听
  102.         ls, err := net.Listen("tcp", s.addr)
  103.         if err != nil {
  104.                 log.Printf("监听addr:%s, 失败", s.addr)
  105.                 return
  106.         }
  107.         for {
  108.                 conn, err := ls.Accept()
  109.                 if err != nil {
  110.                         return
  111.                 }
  112.                 //读数据
  113.                 session := NewSession(conn)
  114.                 data, err := session.Read()
  115.                 if err != nil {
  116.                         return
  117.                 }
  118.                 //解码
  119.                 rpc_data, err := decode(data)
  120.                 if err != nil {
  121.                         return
  122.                 }
  123.                 //查找访问的函数
  124.                 f, ok := s.funcs[rpc_data.Name]
  125.                 if !ok {
  126.                         log.Printf("%s不存在\n", rpc_data.Name)
  127.                         continue
  128.                 }
  129.                 //参数
  130.                 args := make([]reflect.Value, 0, len(rpc_data.Args))
  131.                 for _, v := range rpc_data.Args {
  132.                         if v != nil {
  133.                                 args = append(args, reflect.ValueOf(v))
  134.                         }
  135.                 }
  136.                 //反射调用方法
  137.                 //返回value类型,用于客户端传递返回结果,out是所有返回结果
  138.                 out := f.Call(args)
  139.                 outInters := make([]interface{}, 0, len(out))
  140.                 for _, v := range out {
  141.                         outInters = append(outInters, v.Interface())
  142.                 }
  143.                 fmt.Println("args: ", args, "-", len(args), "-", outInters)
  144.                 //发送给客户端
  145.                 //编码
  146.                 respRpcData := RPCData{Name: rpc_data.Name, Args: outInters}
  147.                 respdata, err := encode(respRpcData)
  148.                 if err != nil {
  149.                         log.Printf("encode fail %v\n", err)
  150.                         continue
  151.                 }
  152.                 err = session.Write(respdata)
  153.                 if err != nil {
  154.                         log.Printf("Write fail %v\n", err)
  155.                         continue
  156.                 }
  157.         }
  158. }
  159. type User struct {
  160.         Name string
  161.         Age  int
  162. }
  163. func queryUser(uid int) (User, error) {
  164.         users := make(map[int]User)
  165.         users[0] = User{"zs", 10}
  166.         users[1] = User{"ls", 20}
  167.         users[2] = User{"ww", 25}
  168.         users[3] = User{"lc", 18}
  169.         fmt.Println(uid)
  170.         if v, ok := users[uid]; ok {
  171.                 fmt.Println(v)
  172.                 return v, nil
  173.         }
  174.         return User{}, errors.New("Not Found")
  175. }
  176. func main() {
  177.         s := NewServer("127.0.0.1:8080")
  178.         //给服务器注册函数
  179.         s.Register("queryUser", queryUser)
  180.         s.Run()
  181. }
复制代码
六. 实现RPC客户端



  1. package main
  2. import (
  3.         "bytes"
  4.         "encoding/binary"
  5.         "encoding/gob"
  6.         "fmt"
  7.         "io"
  8.         "log"
  9.         "net"
  10.         "reflect"
  11. )
  12. type RPCData struct {
  13.         //访问的函数
  14.         Name string
  15.         //参数
  16.         Args []interface{}
  17. }
  18. func encode(data RPCData) ([]byte, error) {
  19.         //得到字节数组编码器
  20.         var buf bytes.Buffer
  21.         //注册接口类型
  22.         gob.Register(User{})
  23.         buffEnc := gob.NewEncoder(&buf)
  24.         //编码器对数据编码
  25.         if err := buffEnc.Encode(data); err != nil {
  26.                 return nil, err
  27.         }
  28.         return buf.Bytes(), nil
  29. }
  30. func decode(data []byte) (RPCData, error) {
  31.         buff := bytes.NewBuffer(data)
  32.         //得到字节数组解码器
  33.         gob.Register(User{})
  34.         buffDec := gob.NewDecoder(buff)
  35.         //解码器对数据解码
  36.         var res RPCData
  37.         if err := buffDec.Decode(&res); err != nil {
  38.                 return res, err
  39.         }
  40.         return res, nil
  41. }
  42. type Session struct {
  43.         Conn net.Conn
  44. }
  45. func NewSession(conn net.Conn) *Session {
  46.         return &Session{Conn: conn}
  47. }
  48. func (s *Session) Write(data []byte) error {
  49.         //定义写数据格式
  50.         //4字节包头 + 变长数据
  51.         buf := make([]byte, 4+len(data))
  52.         //写入头部
  53.         binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
  54.         //将有效数据写到头部后面
  55.         copy(buf[4:], data)
  56.         //发送
  57.         _, err := s.Conn.Write(buf)
  58.         if err != nil {
  59.                 return err
  60.         }
  61.         return nil
  62. }
  63. func (s *Session) Read() ([]byte, error) {
  64.         //读取头部
  65.         header := make([]byte, 4)
  66.         //按长度读取数据
  67.         _, err := io.ReadFull(s.Conn, header)
  68.         if err != nil {
  69.                 return nil, err
  70.         }
  71.         //读取数据,报头保存的是有效数据长度
  72.         dataLen := binary.BigEndian.Uint32(header)
  73.         fmt.Println(dataLen)
  74.         data := make([]byte, dataLen)
  75.         _, err = io.ReadFull(s.Conn, data)
  76.         if err != nil {
  77.                 return nil, err
  78.         }
  79.         return data, nil
  80. }
  81. // 客户端
  82. type Client struct {
  83.         Conn net.Conn
  84. }
  85. func NewClient(conn net.Conn) *Client {
  86.         return &Client{Conn: conn}
  87. }
  88. // fname为访问服务器函数名
  89. // fptr为函数原型
  90. func (c *Client) callRpc(fname string, fptr interface{}) {
  91.         //获取函数原型
  92.         fnptr := reflect.ValueOf(fptr).Elem()
  93.         //函数实现,发送数据给服务器,收到服务端数据
  94.         //args调用时传进来的参数
  95.         //返回值为得到的结果
  96.         f := func(args []reflect.Value) []reflect.Value {
  97.                 //获得参数
  98.                 argSlice := make([]interface{}, len(args))
  99.                 for _, v := range args {
  100.                         argSlice = append(argSlice, v.Interface())
  101.                 }
  102.                 session := NewSession(c.Conn)
  103.                 //发送到服务器
  104.                 rpcData := RPCData{Name: fname, Args: argSlice}
  105.                 //编码
  106.                 data, err := encode(rpcData)
  107.                 if err != nil {
  108.                         log.Println("encode fail", err)
  109.                         return nil
  110.                 }
  111.                 err = session.Write(data)
  112.                 if err != nil {
  113.                         log.Println("Write fail", err)
  114.                         return nil
  115.                 }
  116.                 //接收服务器响应
  117.                 respData, err := session.Read()
  118.                 if err != nil {
  119.                         log.Println("Read fail", err)
  120.                         return nil
  121.                 }
  122.                 //结果在Args里
  123.                 respRpcData, err := decode(respData)
  124.                 if err != nil {
  125.                         log.Println("decode fail", err)
  126.                         return nil
  127.                 }
  128.                 res := make([]reflect.Value, 0, len(respRpcData.Args))
  129.                 for i, v := range respRpcData.Args {
  130.                         if v == nil {
  131.                                 //因为返回err可能为nil
  132.                                 //进行nil转换
  133.                                 //reflect.Zero会返回类型的零值的value
  134.                                 //.out会返回函数输出的参数类型
  135.                                 res = append(res, reflect.Zero(fnptr.Type().Out(i)))
  136.                         } else {
  137.                                 res = append(res, reflect.ValueOf(v))
  138.                         }
  139.                 }
  140.                 return res
  141.         }
  142.         real_fn := reflect.MakeFunc(fnptr.Type(), f)
  143.         //为函数原型赋值函数实现
  144.         fnptr.Set(real_fn)
  145. }
  146. type User struct {
  147.         Name string
  148.         Age  int
  149. }
  150. func main() {
  151.         conn, err := net.Dial("tcp", ":8080")
  152.         if err != nil {
  153.                 log.Fatal(err)
  154.                 return
  155.         }
  156.         c := NewClient(conn)
  157.         var query func(int) (User, error)
  158.         c.callRpc("queryUser", &query)
  159.     //进行查询
  160.         u, err := query(2)
  161.         if err != nil {
  162.                 log.Fatal(err)
  163.                 return
  164.         }
  165.         fmt.Println(u)
  166. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4