马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一. RPC简介
- 长途过程调用(Remote Procedure Call,RPC)是一个盘算机通信协议。
- 该协议运行于一台盘算机的程序调用别的一台盘算机的子程序,而程序员无需额外的为这个交互作用编程。
- 如果涉及的软件采用面向对象编程,那么长途过程调用亦可称作长途调用或长途方法调用。
- 长途过程调用是一个分布式盘算的客户端-服务器(Client/Server)的例子。
- 长途过程调用总是由客户端对服务器发出一个实行多少过程请求,并由客户端提供参数,实行效果将返回客户端。由于存在各式各样的变体和差异,对应的派生了各式长途过程调用协议,而他们并不相互兼容。
- RPC本身是client-server模型,也是一种request-response协议。有些实现扩展了长途调用的模型。实现了双向的服务调用,但是不管怎么样,调用过程还是由一个客户端发起,服务端提供响应。
- 调用过程为:
- client调用client stub,这是一次本地过程调用
- client stub将参数打包成一个消息,然后发送这个消息
- client地点的系统将消息发送给server
- server的系统将收到的包传给server stub
- server stub解包得到参数
- 末了server stub调用服务过程,返回效果按照相反的步骤传给client
二. golang中如何实现RPC
- golang中实现RPC非常简单,官方提供了封装好的库,还有一些第三方的库。
- golang官方的net/rpc库利用encoding/gob举行编解码,支持tcp和http数据传输方式。由于其它语言不支持gob编解码,所以golang的RPC只支持golang开发的服务器与客户端之间的交互。
- 官方还提供了net/rpc/jsonrpc库实现RPC方法,jsonrpc采用json举行数据编解码,因而支持跨语言调用,如今jsonrpc库是基于tcp协议实现的,暂不支持http传输方式。
例题:golang实现RPC程序,实现求矩形面积和周长。
服务器:
注册服务,监听请求,处理请求。
- package main
- import (
- "fmt"
- "log"
- "net/http"
- "net/rpc"
- )
- // 接收参数
- type Params struct {
- Length int
- Heigth int
- }
- // 用于注册
- type Rect struct{}
- func (r *Rect) Area(p Params, ret *int) error {
- fmt.Println("参数:", p.Heigth, p.Length)
- *ret = p.Heigth * p.Length
- return nil
- }
- func (r *Rect) Perimeter(p Params, ret *int) error {
- fmt.Println("参数:", p.Heigth, p.Length)
- *ret = 2 * (p.Heigth + p.Length)
- return nil
- }
- func main() {
- //1. 注册服务
- rect := new(Rect)
- rpc.Register(rect)
- //2. 服务器处理绑定到http协议上
- rpc.HandleHTTP()
- //3. 监听服务
- err := http.ListenAndServe(":8080", nil)
- if err != nil {
- log.Panicln(err)
- }
- }
复制代码 客户端:
连接rpc服务,发送请求调用方法。
- package main
- import (
- "fmt"
- "log"
- "net/rpc"
- )
- // 参数字段要和服务器的一样
- type Params struct {
- Length int
- Heigth int
- }
- func main() {
- conn, err := rpc.DialHTTP("tcp", ":8080")
- if err != nil {
- log.Fatal(err)
- }
- ret := 0
- p := Params{
- 20,
- 30,
- }
- //调用服务器的方法
- err = conn.Call("Rect.Area", p, &ret)
- if err != nil {
- log.Fatal(err)
- }
- fmt.Println("面积", ret)
- err = conn.Call("Rect.Perimeter", p, &ret)
- if err != nil {
- log.Fatal(err)
- }
- fmt.Println("周长", ret)
- }
复制代码
- golang写RPC程序必须符合4个基本条件,不然RPC用不了
- 结构体字段首字母必须大写,可以别人调用
- 函数名必须首字母大写
- 函数(服务方法)第一参数是担当参数,第二个参数是返回客户端的参数,必须是指针范例
- 函数(服务方法)还必须有一个返回值error
别的,net/rpc/jsonrpc库通过json格式编解码,支持跨语言调用。
服务端:
- package main
- import (
- "fmt"
- "log"
- "net"
- "net/rpc"
- "net/rpc/jsonrpc"
- )
- // 接收参数
- type Params struct {
- A int
- B int
- }
- type Rect struct{}
- func (r *Rect) Area(p Params, ret *int) error {
- *ret = p.A * p.B
- return nil
- }
- func (r *Rect) Perimeter(p Params, ret *int) error {
- *ret = 2 * (p.A + p.B)
- return nil
- }
- func main() {
- //注册服务
- rpc.Register(new(Rect))
- //监听连接
- l, err := net.Listen("tcp", ":8080")
- if err != nil {
- log.Fatal(err)
- }
- for {
- //接收客户端连接
- conn, err := l.Accept()
- if err != nil {
- continue
- }
- //创建协程处理请求
- go func(conn net.Conn) {
- fmt.Println("get a new client")
- //只是处理服务,没有监听和接收请求
- jsonrpc.ServeConn(conn)
- }(conn)
- }
- }
复制代码 客户端:
- package main
- import (
- "fmt"
- "log"
- "net/rpc/jsonrpc"
- )
- // 参数字段要和服务器的一样
- type Params struct {
- A int
- B int
- }
- func main() {
- //远程连接rpc服务
- conn, err := jsonrpc.Dial("tcp", ":8080")
- if err != nil {
- log.Fatal(err)
- }
- //调用方法
- ret := 0
- err = conn.Call("Rect.Area", Params{10, 20}, &ret)
- if err != nil {
- log.Fatal(err)
- }
- fmt.Println("ret=", ret)
- err = conn.Call("Rect.Perimeter", Params{10, 20}, &ret)
- if err != nil {
- log.Fatal(err)
- }
- fmt.Println("ret=", ret)
- }
复制代码 三. RPC调用流程
- 微服务架构下数据交互一般是对内RPC,对外REST
- 将业务按功能模块拆分到各个微服务,具有进步项目协作效率,降低模块耦合度,进步系统可用性等优点
- 一般情况下,我们将功能代码在本地直接调用,微服务框架下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用。
我理解的微服务(RPC),将项目中须要的功能注册成一个服务,客户端可以直接调用。
四. 网络传输数据格式
- 两头要约定好数据包的格式
- 成熟的RPC框架会有自界说传输协议,这里网络传输格式界说如下,前面是固定长度的消息头,后面是变长消息体
- package rpc
- import (
- "encoding/binary"
- "fmt"
- "io"
- "net"
- )
- type Session struct {
- Conn net.Conn
- }
- func (s *Session) Write(data []byte) error {
- //定义写数据格式
- //4字节包头 + 变长数据
- buf := make([]byte, 4+len(data))
- //写入头部
- binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
- //将有效数据写到头部后面
- copy(buf[4:], data)
- //发送
- _, err := s.Conn.Write(buf)
- if err != nil {
- return err
- }
- return nil
- }
- func (s *Session) Read() ([]byte, error) {
- //读取头部
- header := make([]byte, 4)
- //按长度读取数据
- _, err := io.ReadFull(s.Conn, header)
- if err != nil {
- return nil, err
- }
- //读取数据,报头保存的是有效数据长度
- dataLen := binary.BigEndian.Uint32(header)
- fmt.Println(dataLen)
- data := make([]byte, dataLen)
- _, err = io.ReadFull(s.Conn, data)
- if err != nil {
- return nil, err
- }
- return data, nil
- }
复制代码 测试代码:
- package rpc
- import (
- "errors"
- "fmt"
- "net"
- "sync"
- "testing"
- )
- func TestSession_ReadWrite(t *testing.T) {
- //地址和数据
- addr := "127.0.0.1:8080"
- my_data := "hello"
- //等待组
- wg := sync.WaitGroup{}
- wg.Add(2)
- go func() {
- //监听
- defer wg.Done()
- l, err := net.Listen("tcp", addr)
- if err != nil {
- t.Fatal(err)
- }
- //接收连接
- conn, err := l.Accept()
- if err != nil {
- t.Fatal(err)
- }
- //写数据
- s := Session{Conn: conn}
- err = s.Write([]byte(my_data))
- if err != nil {
- t.Fatal(err)
- }
- }()
- go func() {
- defer wg.Done()
- conn, err := net.Dial("tcp", addr)
- if err != nil {
- t.Fatal(err)
- }
- s := Session{Conn: conn}
- data, err := s.Read()
- if err != nil {
- t.Fatal(err)
- }
- //校验
- if string(data) != my_data {
- t.Fatal(errors.New("数据错误"))
- }
- fmt.Println(string(data))
- }()
- wg.Wait()
- }
复制代码
编码息争码:
- func Encode(data RPCData) ([]byte, error) {
- //得到字节数组编码器
- var buf bytes.Buffer
- buffEnc := gob.NewEncoder(&buf)
- //编码器对数据编码
- if err := buffEnc.Encode(data); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- func Decode(data []byte) (RPCData, error) {
- buff := bytes.NewBuffer(data)
- //得到字节数组解码器
- buffDec := gob.NewDecoder(buff)
- //解码器对数据解码
- var res RPCData
- if err := buffDec.Decode(&res); err != nil {
- return res, err
- }
- return res, nil
- }
复制代码 五. 实现RPC服务端
要实现的一个功能为,接收到客户端发过来需要调用的函数和参数,实现对应函数的调用,将效果返回。
- 服务端接收的数据包罗
- 服务器端需要办理的题目
- 服务器的焦点功能
- 维护函数map
- 接收到客户端的数据,并解析
- 找到调用的函数,将返回值打包穿给客户端
- package main
- import (
- "bytes"
- "encoding/binary"
- "encoding/gob"
- "errors"
- "fmt"
- "io"
- "log"
- "net"
- "reflect"
- )
- // 定义交互的数据结构
- type RPCData struct {
- //访问的函数
- Name string
- //参数
- Args []interface{}
- }
- func encode(data RPCData) ([]byte, error) {
- //得到字节数组编码器
- var buf bytes.Buffer
- gob.Register(User{})
- buffEnc := gob.NewEncoder(&buf)
- //编码器对数据编码
- if err := buffEnc.Encode(data); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- func decode(data []byte) (RPCData, error) {
- buff := bytes.NewBuffer(data)
- //得到字节数组解码器
- gob.Register(User{})
- buffDec := gob.NewDecoder(buff)
- //解码器对数据解码
- var res RPCData
- if err := buffDec.Decode(&res); err != nil {
- return res, err
- }
- return res, nil
- }
- // 发送和接收数据结构
- type Session struct {
- Conn net.Conn
- }
- func NewSession(conn net.Conn) *Session {
- return &Session{Conn: conn}
- }
- func (s *Session) Write(data []byte) error {
- //定义写数据格式
- //4字节包头 + 变长数据
- buf := make([]byte, 4+len(data))
- //写入头部
- binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
- //将有效数据写到头部后面
- copy(buf[4:], data)
- //发送
- _, err := s.Conn.Write(buf)
- if err != nil {
- return err
- }
- return nil
- }
- func (s *Session) Read() ([]byte, error) {
- //读取头部
- header := make([]byte, 4)
- //按长度读取数据
- _, err := io.ReadFull(s.Conn, header)
- if err != nil {
- return nil, err
- }
- //读取数据,报头保存的是有效数据长度
- dataLen := binary.BigEndian.Uint32(header)
- fmt.Println(dataLen)
- data := make([]byte, dataLen)
- _, err = io.ReadFull(s.Conn, data)
- if err != nil {
- return nil, err
- }
- return data, nil
- }
- // 服务器
- type Server struct {
- //访问地址
- addr string
- //维护函数map
- funcs map[string]reflect.Value
- }
- func NewServer(addr string) *Server {
- return &Server{addr: addr, funcs: make(map[string]reflect.Value)}
- }
- func (s *Server) Register(key string, value interface{}) error {
- if _, ok := s.funcs[key]; ok {
- return errors.New(key + "已存在")
- }
- s.funcs[key] = reflect.ValueOf(value)
- return nil
- }
- func (s *Server) Run() {
- //监听
- ls, err := net.Listen("tcp", s.addr)
- if err != nil {
- log.Printf("监听addr:%s, 失败", s.addr)
- return
- }
- for {
- conn, err := ls.Accept()
- if err != nil {
- return
- }
- //读数据
- session := NewSession(conn)
- data, err := session.Read()
- if err != nil {
- return
- }
- //解码
- rpc_data, err := decode(data)
- if err != nil {
- return
- }
- //查找访问的函数
- f, ok := s.funcs[rpc_data.Name]
- if !ok {
- log.Printf("%s不存在\n", rpc_data.Name)
- continue
- }
- //参数
- args := make([]reflect.Value, 0, len(rpc_data.Args))
- for _, v := range rpc_data.Args {
- if v != nil {
- args = append(args, reflect.ValueOf(v))
- }
- }
- //反射调用方法
- //返回value类型,用于客户端传递返回结果,out是所有返回结果
- out := f.Call(args)
- outInters := make([]interface{}, 0, len(out))
- for _, v := range out {
- outInters = append(outInters, v.Interface())
- }
- fmt.Println("args: ", args, "-", len(args), "-", outInters)
- //发送给客户端
- //编码
- respRpcData := RPCData{Name: rpc_data.Name, Args: outInters}
- respdata, err := encode(respRpcData)
- if err != nil {
- log.Printf("encode fail %v\n", err)
- continue
- }
- err = session.Write(respdata)
- if err != nil {
- log.Printf("Write fail %v\n", err)
- continue
- }
- }
- }
- type User struct {
- Name string
- Age int
- }
- func queryUser(uid int) (User, error) {
- users := make(map[int]User)
- users[0] = User{"zs", 10}
- users[1] = User{"ls", 20}
- users[2] = User{"ww", 25}
- users[3] = User{"lc", 18}
- fmt.Println(uid)
- if v, ok := users[uid]; ok {
- fmt.Println(v)
- return v, nil
- }
- return User{}, errors.New("Not Found")
- }
- func main() {
- s := NewServer("127.0.0.1:8080")
- //给服务器注册函数
- s.Register("queryUser", queryUser)
- s.Run()
- }
复制代码 六. 实现RPC客户端
- 客户端只有函数原型,利用reflect.MakeFunc()可以完成原型到函数的调用。
- 即reflect.MakeFunc()函数可以将一个函数范例和函数实现结合起来,通过函数原型调用对应函数实现。

- package main
- import (
- "bytes"
- "encoding/binary"
- "encoding/gob"
- "fmt"
- "io"
- "log"
- "net"
- "reflect"
- )
- type RPCData struct {
- //访问的函数
- Name string
- //参数
- Args []interface{}
- }
- func encode(data RPCData) ([]byte, error) {
- //得到字节数组编码器
- var buf bytes.Buffer
- //注册接口类型
- gob.Register(User{})
- buffEnc := gob.NewEncoder(&buf)
- //编码器对数据编码
- if err := buffEnc.Encode(data); err != nil {
- return nil, err
- }
- return buf.Bytes(), nil
- }
- func decode(data []byte) (RPCData, error) {
- buff := bytes.NewBuffer(data)
- //得到字节数组解码器
- gob.Register(User{})
- buffDec := gob.NewDecoder(buff)
- //解码器对数据解码
- var res RPCData
- if err := buffDec.Decode(&res); err != nil {
- return res, err
- }
- return res, nil
- }
- type Session struct {
- Conn net.Conn
- }
- func NewSession(conn net.Conn) *Session {
- return &Session{Conn: conn}
- }
- func (s *Session) Write(data []byte) error {
- //定义写数据格式
- //4字节包头 + 变长数据
- buf := make([]byte, 4+len(data))
- //写入头部
- binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
- //将有效数据写到头部后面
- copy(buf[4:], data)
- //发送
- _, err := s.Conn.Write(buf)
- if err != nil {
- return err
- }
- return nil
- }
- func (s *Session) Read() ([]byte, error) {
- //读取头部
- header := make([]byte, 4)
- //按长度读取数据
- _, err := io.ReadFull(s.Conn, header)
- if err != nil {
- return nil, err
- }
- //读取数据,报头保存的是有效数据长度
- dataLen := binary.BigEndian.Uint32(header)
- fmt.Println(dataLen)
- data := make([]byte, dataLen)
- _, err = io.ReadFull(s.Conn, data)
- if err != nil {
- return nil, err
- }
- return data, nil
- }
- // 客户端
- type Client struct {
- Conn net.Conn
- }
- func NewClient(conn net.Conn) *Client {
- return &Client{Conn: conn}
- }
- // fname为访问服务器函数名
- // fptr为函数原型
- func (c *Client) callRpc(fname string, fptr interface{}) {
- //获取函数原型
- fnptr := reflect.ValueOf(fptr).Elem()
- //函数实现,发送数据给服务器,收到服务端数据
- //args调用时传进来的参数
- //返回值为得到的结果
- f := func(args []reflect.Value) []reflect.Value {
- //获得参数
- argSlice := make([]interface{}, len(args))
- for _, v := range args {
- argSlice = append(argSlice, v.Interface())
- }
- session := NewSession(c.Conn)
- //发送到服务器
- rpcData := RPCData{Name: fname, Args: argSlice}
- //编码
- data, err := encode(rpcData)
- if err != nil {
- log.Println("encode fail", err)
- return nil
- }
- err = session.Write(data)
- if err != nil {
- log.Println("Write fail", err)
- return nil
- }
- //接收服务器响应
- respData, err := session.Read()
- if err != nil {
- log.Println("Read fail", err)
- return nil
- }
- //结果在Args里
- respRpcData, err := decode(respData)
- if err != nil {
- log.Println("decode fail", err)
- return nil
- }
- res := make([]reflect.Value, 0, len(respRpcData.Args))
- for i, v := range respRpcData.Args {
- if v == nil {
- //因为返回err可能为nil
- //进行nil转换
- //reflect.Zero会返回类型的零值的value
- //.out会返回函数输出的参数类型
- res = append(res, reflect.Zero(fnptr.Type().Out(i)))
- } else {
- res = append(res, reflect.ValueOf(v))
- }
- }
- return res
- }
- real_fn := reflect.MakeFunc(fnptr.Type(), f)
- //为函数原型赋值函数实现
- fnptr.Set(real_fn)
- }
- type User struct {
- Name string
- Age int
- }
- func main() {
- conn, err := net.Dial("tcp", ":8080")
- if err != nil {
- log.Fatal(err)
- return
- }
- c := NewClient(conn)
- var query func(int) (User, error)
- c.callRpc("queryUser", &query)
- //进行查询
- u, err := query(2)
- if err != nil {
- log.Fatal(err)
- return
- }
- fmt.Println(u)
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |