tsx81429 发表于 2024-6-15 00:50:19

Golang——RPC

一. RPC简介



[*]长途过程调用(Remote Procedure Call,RPC)是一个盘算机通信协议。
[*]该协议运行于一台盘算机的程序调用别的一台盘算机的子程序,而程序员无需额外的为这个交互作用编程。
[*]如果涉及的软件采用面向对象编程,那么长途过程调用亦可称作长途调用或长途方法调用。
[*]长途过程调用是一个分布式盘算的客户端-服务器(Client/Server)的例子。
[*]长途过程调用总是由客户端对服务器发出一个实行多少过程请求,并由客户端提供参数,实行效果将返回客户端。由于存在各式各样的变体和差异,对应的派生了各式长途过程调用协议,而他们并不相互兼容。
[*]RPC本身是client-server模型,也是一种request-response协议。有些实现扩展了长途调用的模型。实现了双向的服务调用,但是不管怎么样,调用过程还是由一个客户端发起,服务端提供响应。
[*]调用过程为:

[*]client调用client stub,这是一次本地过程调用
[*]client stub将参数打包成一个消息,然后发送这个消息
[*]client地点的系统将消息发送给server
[*]server的系统将收到的包传给server stub
[*]server stub解包得到参数
[*]末了server stub调用服务过程,返回效果按照相反的步骤传给client

二. golang中如何实现RPC



[*]golang中实现RPC非常简单,官方提供了封装好的库,还有一些第三方的库。
[*]golang官方的net/rpc库利用encoding/gob举行编解码,支持tcp和http数据传输方式。由于其它语言不支持gob编解码,所以golang的RPC只支持golang开发的服务器与客户端之间的交互。
[*]官方还提供了net/rpc/jsonrpc库实现RPC方法,jsonrpc采用json举行数据编解码,因而支持跨语言调用,如今jsonrpc库是基于tcp协议实现的,暂不支持http传输方式。
例题:golang实现RPC程序,实现求矩形面积和周长。
服务器:
        注册服务,监听请求,处理请求。
package main

import (
        "fmt"
        "log"
        "net/http"
        "net/rpc"
)

// 接收参数
type Params struct {
        Length int
        Heigth int
}

// 用于注册
type Rect struct{}

func (r *Rect) Area(p Params, ret *int) error {
        fmt.Println("参数:", p.Heigth, p.Length)
        *ret = p.Heigth * p.Length
        return nil
}

func (r *Rect) Perimeter(p Params, ret *int) error {
        fmt.Println("参数:", p.Heigth, p.Length)
        *ret = 2 * (p.Heigth + p.Length)
        return nil
}

func main() {
        //1. 注册服务
        rect := new(Rect)
        rpc.Register(rect)

        //2. 服务器处理绑定到http协议上
        rpc.HandleHTTP()
        //3. 监听服务
        err := http.ListenAndServe(":8080", nil)
        if err != nil {
                log.Panicln(err)
        }
}
客户端:
        连接rpc服务,发送请求调用方法。
package main

import (
        "fmt"
        "log"
        "net/rpc"
)

// 参数字段要和服务器的一样
type Params struct {
        Length int
        Heigth int
}

func main() {
        conn, err := rpc.DialHTTP("tcp", ":8080")
        if err != nil {
                log.Fatal(err)
        }

        ret := 0
        p := Params{
                20,
                30,
        }
        //调用服务器的方法
        err = conn.Call("Rect.Area", p, &ret)
        if err != nil {
                log.Fatal(err)
        }
        fmt.Println("面积", ret)

        err = conn.Call("Rect.Perimeter", p, &ret)
        if err != nil {
                log.Fatal(err)
        }
        fmt.Println("周长", ret)
}


[*]golang写RPC程序必须符合4个基本条件,不然RPC用不了

[*]结构体字段首字母必须大写,可以别人调用
[*]函数名必须首字母大写
[*]函数(服务方法)第一参数是担当参数,第二个参数是返回客户端的参数,必须是指针范例
[*]函数(服务方法)还必须有一个返回值error

   别的,net/rpc/jsonrpc库通过json格式编解码,支持跨语言调用。
服务端:
package main

import (
        "fmt"
        "log"
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
)

// 接收参数
type Params struct {
        A int
        B int
}

type Rect struct{}

func (r *Rect) Area(p Params, ret *int) error {
        *ret = p.A * p.B
        return nil
}

func (r *Rect) Perimeter(p Params, ret *int) error {
        *ret = 2 * (p.A + p.B)
        return nil
}

func main() {
        //注册服务
        rpc.Register(new(Rect))
        //监听连接
        l, err := net.Listen("tcp", ":8080")
        if err != nil {
                log.Fatal(err)
        }

        for {
                //接收客户端连接
                conn, err := l.Accept()
                if err != nil {
                        continue
                }
                //创建协程处理请求
                go func(conn net.Conn) {
                        fmt.Println("get a new client")
                        //只是处理服务,没有监听和接收请求
                        jsonrpc.ServeConn(conn)
                }(conn)
        }
} 客户端:
package main

import (
        "fmt"
        "log"
        "net/rpc/jsonrpc"
)

// 参数字段要和服务器的一样
type Params struct {
        A int
        B int
}

func main() {
        //远程连接rpc服务
        conn, err := jsonrpc.Dial("tcp", ":8080")
        if err != nil {
                log.Fatal(err)
        }
        //调用方法
        ret := 0
        err = conn.Call("Rect.Area", Params{10, 20}, &ret)
        if err != nil {
                log.Fatal(err)
        }
        fmt.Println("ret=", ret)

        err = conn.Call("Rect.Perimeter", Params{10, 20}, &ret)
        if err != nil {
                log.Fatal(err)
        }
        fmt.Println("ret=", ret)
} 三.  RPC调用流程



[*]微服务架构下数据交互一般是对内RPC,对外REST
[*]将业务按功能模块拆分到各个微服务,具有进步项目协作效率,降低模块耦合度,进步系统可用性等优点
[*]一般情况下,我们将功能代码在本地直接调用,微服务框架下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用。
我理解的微服务(RPC),将项目中须要的功能注册成一个服务,客户端可以直接调用。
四. 网络传输数据格式



[*]两头要约定好数据包的格式
[*]成熟的RPC框架会有自界说传输协议,这里网络传输格式界说如下,前面是固定长度的消息头,后面是变长消息体
https://img-blog.csdnimg.cn/direct/024246f98aca4b429955c099c87ce722.png


[*]本身界说数据格式的读写
package rpc

import (
        "encoding/binary"
        "fmt"
        "io"
        "net"
)

type Session struct {
        Conn net.Conn
}

func (s *Session) Write(data []byte) error {
        //定义写数据格式
        //4字节包头 + 变长数据
        buf := make([]byte, 4+len(data))
        //写入头部
        binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
        //将有效数据写到头部后面
        copy(buf, data)
        //发送
        _, err := s.Conn.Write(buf)
        if err != nil {
                return err
        }
        return nil
}

func (s *Session) Read() ([]byte, error) {
        //读取头部
        header := make([]byte, 4)
        //按长度读取数据
        _, err := io.ReadFull(s.Conn, header)
        if err != nil {
                return nil, err
        }

        //读取数据,报头保存的是有效数据长度
        dataLen := binary.BigEndian.Uint32(header)
        fmt.Println(dataLen)
        data := make([]byte, dataLen)

        _, err = io.ReadFull(s.Conn, data)
        if err != nil {
                return nil, err
        }
        return data, nil
}
测试代码:
package rpc

import (
        "errors"
        "fmt"
        "net"
        "sync"
        "testing"
)

func TestSession_ReadWrite(t *testing.T) {
        //地址和数据
        addr := "127.0.0.1:8080"
        my_data := "hello"

        //等待组
        wg := sync.WaitGroup{}
        wg.Add(2)
        go func() {
                //监听
                defer wg.Done()
                l, err := net.Listen("tcp", addr)
                if err != nil {
                        t.Fatal(err)
                }
                //接收连接
                conn, err := l.Accept()
                if err != nil {
                        t.Fatal(err)
                }
                //写数据
                s := Session{Conn: conn}
                err = s.Write([]byte(my_data))
                if err != nil {
                        t.Fatal(err)
                }
        }()

        go func() {
                defer wg.Done()
                conn, err := net.Dial("tcp", addr)
                if err != nil {
                        t.Fatal(err)
                }

                s := Session{Conn: conn}
                data, err := s.Read()
                if err != nil {
                        t.Fatal(err)
                }

                //校验
                if string(data) != my_data {
                        t.Fatal(errors.New("数据错误"))
                }
                fmt.Println(string(data))
        }()
        wg.Wait()
}
https://img-blog.csdnimg.cn/direct/1251c41dcc274de491c2ded4ebb6fbfb.png
 编码息争码:
func Encode(data RPCData) ([]byte, error) {
        //得到字节数组编码器
        var buf bytes.Buffer
        buffEnc := gob.NewEncoder(&buf)
        //编码器对数据编码
        if err := buffEnc.Encode(data); err != nil {
                return nil, err
        }

        return buf.Bytes(), nil
}

func Decode(data []byte) (RPCData, error) {
        buff := bytes.NewBuffer(data)
        //得到字节数组解码器
        buffDec := gob.NewDecoder(buff)
        //解码器对数据解码
        var res RPCData
        if err := buffDec.Decode(&res); err != nil {
                return res, err
        }

        return res, nil
} 五. 实现RPC服务端

        要实现的一个功能为,接收到客户端发过来需要调用的函数和参数,实现对应函数的调用,将效果返回。


[*]服务端接收的数据包罗

[*]调用的函数名,参数,返回值

[*]服务器端需要办理的题目

[*]需要维护一个map,来生存调用的函数

[*]服务器的焦点功能

[*]维护函数map
[*]接收到客户端的数据,并解析
[*]找到调用的函数,将返回值打包穿给客户端

package main

import (
        "bytes"
        "encoding/binary"
        "encoding/gob"
        "errors"
        "fmt"
        "io"
        "log"
        "net"
        "reflect"
)

// 定义交互的数据结构
type RPCData struct {
        //访问的函数
        Name string
        //参数
        Args []interface{}
}

func encode(data RPCData) ([]byte, error) {
        //得到字节数组编码器
        var buf bytes.Buffer
        gob.Register(User{})
        buffEnc := gob.NewEncoder(&buf)
        //编码器对数据编码
        if err := buffEnc.Encode(data); err != nil {
                return nil, err
        }

        return buf.Bytes(), nil
}

func decode(data []byte) (RPCData, error) {
        buff := bytes.NewBuffer(data)
        //得到字节数组解码器
        gob.Register(User{})
        buffDec := gob.NewDecoder(buff)
        //解码器对数据解码
        var res RPCData
        if err := buffDec.Decode(&res); err != nil {
                return res, err
        }

        return res, nil
}

// 发送和接收数据结构
type Session struct {
        Conn net.Conn
}

func NewSession(conn net.Conn) *Session {
        return &Session{Conn: conn}
}

func (s *Session) Write(data []byte) error {
        //定义写数据格式
        //4字节包头 + 变长数据
        buf := make([]byte, 4+len(data))
        //写入头部
        binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
        //将有效数据写到头部后面
        copy(buf, data)
        //发送
        _, err := s.Conn.Write(buf)
        if err != nil {
                return err
        }
        return nil
}

func (s *Session) Read() ([]byte, error) {
        //读取头部
        header := make([]byte, 4)
        //按长度读取数据
        _, err := io.ReadFull(s.Conn, header)
        if err != nil {
                return nil, err
        }

        //读取数据,报头保存的是有效数据长度
        dataLen := binary.BigEndian.Uint32(header)
        fmt.Println(dataLen)
        data := make([]byte, dataLen)

        _, err = io.ReadFull(s.Conn, data)
        if err != nil {
                return nil, err
        }
        return data, nil
}

// 服务器
type Server struct {
        //访问地址
        addr string
        //维护函数map
        funcs mapreflect.Value
}

func NewServer(addr string) *Server {
        return &Server{addr: addr, funcs: make(mapreflect.Value)}
}

func (s *Server) Register(key string, value interface{}) error {
        if _, ok := s.funcs; ok {
                return errors.New(key + "已存在")
        }

        s.funcs = reflect.ValueOf(value)
        return nil
}

func (s *Server) Run() {
        //监听
        ls, err := net.Listen("tcp", s.addr)
        if err != nil {
                log.Printf("监听addr:%s, 失败", s.addr)
                return
        }

        for {
                conn, err := ls.Accept()
                if err != nil {
                        return
                }

                //读数据
                session := NewSession(conn)
                data, err := session.Read()
                if err != nil {
                        return
                }
                //解码
                rpc_data, err := decode(data)
                if err != nil {
                        return
                }

                //查找访问的函数
                f, ok := s.funcs
                if !ok {
                        log.Printf("%s不存在\n", rpc_data.Name)
                        continue
                }
                //参数
                args := make([]reflect.Value, 0, len(rpc_data.Args))
                for _, v := range rpc_data.Args {
                        if v != nil {
                                args = append(args, reflect.ValueOf(v))
                        }
                }
                //反射调用方法
                //返回value类型,用于客户端传递返回结果,out是所有返回结果
                out := f.Call(args)
                outInters := make([]interface{}, 0, len(out))
                for _, v := range out {
                        outInters = append(outInters, v.Interface())
                }
                fmt.Println("args: ", args, "-", len(args), "-", outInters)

                //发送给客户端
                //编码
                respRpcData := RPCData{Name: rpc_data.Name, Args: outInters}
                respdata, err := encode(respRpcData)
                if err != nil {
                        log.Printf("encode fail %v\n", err)
                        continue
                }

                err = session.Write(respdata)
                if err != nil {
                        log.Printf("Write fail %v\n", err)
                        continue
                }
        }

}

type User struct {
        Name string
        Ageint
}

func queryUser(uid int) (User, error) {
        users := make(mapUser)
        users = User{"zs", 10}
        users = User{"ls", 20}
        users = User{"ww", 25}
        users = User{"lc", 18}
        fmt.Println(uid)
        if v, ok := users; ok {
                fmt.Println(v)
                return v, nil
        }
        return User{}, errors.New("Not Found")
}

func main() {
        s := NewServer("127.0.0.1:8080")

        //给服务器注册函数
        s.Register("queryUser", queryUser)

        s.Run()
}
六. 实现RPC客户端



[*]客户端只有函数原型,利用reflect.MakeFunc()可以完成原型到函数的调用。

[*]即reflect.MakeFunc()函数可以将一个函数范例和函数实现结合起来,通过函数原型调用对应函数实现。

https://img-blog.csdnimg.cn/direct/3a05958ac5594324a4fa4942f4da7e1a.png
package main

import (
        "bytes"
        "encoding/binary"
        "encoding/gob"
        "fmt"
        "io"
        "log"
        "net"
        "reflect"
)

type RPCData struct {
        //访问的函数
        Name string
        //参数
        Args []interface{}
}

func encode(data RPCData) ([]byte, error) {
        //得到字节数组编码器
        var buf bytes.Buffer
        //注册接口类型
        gob.Register(User{})
        buffEnc := gob.NewEncoder(&buf)
        //编码器对数据编码
        if err := buffEnc.Encode(data); err != nil {
                return nil, err
        }

        return buf.Bytes(), nil
}

func decode(data []byte) (RPCData, error) {
        buff := bytes.NewBuffer(data)
        //得到字节数组解码器
        gob.Register(User{})
        buffDec := gob.NewDecoder(buff)
        //解码器对数据解码
        var res RPCData
        if err := buffDec.Decode(&res); err != nil {
                return res, err
        }

        return res, nil
}

type Session struct {
        Conn net.Conn
}

func NewSession(conn net.Conn) *Session {
        return &Session{Conn: conn}
}

func (s *Session) Write(data []byte) error {
        //定义写数据格式
        //4字节包头 + 变长数据
        buf := make([]byte, 4+len(data))
        //写入头部
        binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
        //将有效数据写到头部后面
        copy(buf, data)
        //发送
        _, err := s.Conn.Write(buf)
        if err != nil {
                return err
        }
        return nil
}

func (s *Session) Read() ([]byte, error) {
        //读取头部
        header := make([]byte, 4)
        //按长度读取数据
        _, err := io.ReadFull(s.Conn, header)
        if err != nil {
                return nil, err
        }

        //读取数据,报头保存的是有效数据长度
        dataLen := binary.BigEndian.Uint32(header)
        fmt.Println(dataLen)
        data := make([]byte, dataLen)

        _, err = io.ReadFull(s.Conn, data)
        if err != nil {
                return nil, err
        }
        return data, nil
}

// 客户端
type Client struct {
        Conn net.Conn
}

func NewClient(conn net.Conn) *Client {
        return &Client{Conn: conn}
}

// fname为访问服务器函数名
// fptr为函数原型
func (c *Client) callRpc(fname string, fptr interface{}) {

        //获取函数原型
        fnptr := reflect.ValueOf(fptr).Elem()
        //函数实现,发送数据给服务器,收到服务端数据
        //args调用时传进来的参数
        //返回值为得到的结果
        f := func(args []reflect.Value) []reflect.Value {
                //获得参数
                argSlice := make([]interface{}, len(args))
                for _, v := range args {
                        argSlice = append(argSlice, v.Interface())
                }

                session := NewSession(c.Conn)

                //发送到服务器
                rpcData := RPCData{Name: fname, Args: argSlice}
                //编码
                data, err := encode(rpcData)
                if err != nil {
                        log.Println("encode fail", err)
                        return nil
                }
                err = session.Write(data)
                if err != nil {
                        log.Println("Write fail", err)
                        return nil
                }

                //接收服务器响应
                respData, err := session.Read()
                if err != nil {
                        log.Println("Read fail", err)
                        return nil
                }
                //结果在Args里
                respRpcData, err := decode(respData)
                if err != nil {
                        log.Println("decode fail", err)
                        return nil
                }

                res := make([]reflect.Value, 0, len(respRpcData.Args))
                for i, v := range respRpcData.Args {
                        if v == nil {
                                //因为返回err可能为nil
                                //进行nil转换
                                //reflect.Zero会返回类型的零值的value
                                //.out会返回函数输出的参数类型
                                res = append(res, reflect.Zero(fnptr.Type().Out(i)))
                        } else {
                                res = append(res, reflect.ValueOf(v))
                        }
                }
                return res
        }

        real_fn := reflect.MakeFunc(fnptr.Type(), f)
        //为函数原型赋值函数实现
        fnptr.Set(real_fn)
}

type User struct {
        Name string
        Ageint
}

func main() {
        conn, err := net.Dial("tcp", ":8080")
        if err != nil {
                log.Fatal(err)
                return
        }

        c := NewClient(conn)
        var query func(int) (User, error)
        c.callRpc("queryUser", &query)
    //进行查询
        u, err := query(2)
        if err != nil {
                log.Fatal(err)
                return
        }
        fmt.Println(u)
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Golang——RPC