东湖之滨 发表于 2024-10-3 02:48:57

golang rpc

RPC(Remote Procedure Call)长途过程调用,简单的理解是一个节点请求另一个节点提供的服务,对应rpc的是本地过程调用,函数调用是最常用的本地过程调用,将本地过程调用变成长途调用碰面对着各种问题。
以两数相加为例
package main

import "fmt"

func add(a, b int) int {
        return a + b
}
func main() {
        fmt.Println(add(1, 2))
}

函数调用过程:
(1)将1和2压入函数的栈中
(2)进入add函数,从栈中取出1和2分别赋值给a和b
(3)实行a+b将效果压栈
(4)将栈中的效果取出打印
长途过程面对的问题
1.本来的本地函数放到另外一个服务器上去运行,但是引入了很多新问题
2.Call的id映射
我们怎么告诉长途呆板我们要调用add,而不是sub或者Foo呢?在本地调用中,函数体是直接通过函数指针来指定的,我们调用add,编译器就自动帮我们调用它相应的函数指针。但是在长途调用中,函数指针是不可的,因为两个进程的地点空间是完全不一样的。以是,在RPC中,所有的函数都必须有自己的一个ID。这个ID在所有进程中都是唯一确定的。客户端在做长途过程调用时,必须附上这个ID。然后我们还需要在客户端和服务端分别维护一个(函数<–>Call ID}的对应表。两者的表不一定需要完全类似,但类似的函数对应的Call ID必须类似。当客户端需要进行长途调用时,它就查一下这个表,找出相应的Call ID,然后把它传给服务端,服务端也通过查表,来确定客户端需要调用的函数,然后实行相应函数的代码。
3.序列化和反序列化
客户端怎么把参数值传给长途的函数呢?在本地调用中,我们只需要把参数压到栈里,然后让函数自己去栈里读就行。但是在长途过程调用时,客户端跟服务端是不同的进程,不能通过内存来传递参数。甚至偶然间客户端和服务端使用的都不是同一种语言(比如服务端用C++,客户端用Java或者Python)。这时间就需要客户端把参数先转成一个字节流,传给服务端后,再把字节流转成自己能读取的格式。这个过程叫序列化和反序列化。同理,从服务端返回的值也需要序列化反序列化的过程。
4.网络传输
长途调用每每用在网络上,客户端和服务端是通过网络连接的。所有的数据都需要通过网络传输,因此就需要有一个网络传输层。网络传输层需要把Call ID和序列化后的参数字节流传给服务端,然后再把序列化后的调用效果传给客户端。只要能完成这两者的,都可以作为传输层使用。因此,他所使用的协议其实是不限的,能完成传输就行,尽管大部分RPC框架都使用TCP协议,但其实UDP也可以,而RPC干脆就用了HTTP2。Java的Netty也属于这层的东西。
RPC第一个要点:数据编码协议

一般采用将数据传输到gin,gin传输到服务端,服务端负责剖析数据
客户端流程
        1.建立连接tcp/http
        2.将employee对象序列化成json字符串-序列化
        3.发送json字符串-调用成功后实际上你接受到的是一个二进制的数据
        4.等待服务器发送结果
        5将服务返回的数据解析成PrintResult对象-反序列化
服务端流程
        1.监听网络接口80
        2.读取数据-二进制的json数据
        3.对数据进行反序列化Employee对象
        4.开始处理业务逻辑
        5.将处理的结果PrintReuslt发序列化成json二进制数据-序列化
        6.将数据返回
序列化和反序列化是可以选择的,不一定要采用json、xml、protobuf、msgpack
RPC第二个要点:传输协议

http协议:http1.x http2.0协议
http协议底层使用的也是tcp,http如今主流的是http1.x,这种协议有性能问题(一次性),一旦效果返回,连接就断开。我们可以直接基于tcp/udp协议去封装一层协议myhttp,没有通用型,http2.0既有http的特性也有长连接的特性(grpc就是基于http2.0的)
http协议是文本协议,http底层的传输协议是tcp。grpc基于http2.0,传输协议也是tcp
https://i-blog.csdnimg.cn/direct/282f30fc8f6e44e5b41e4d7799596401.png
http协议具有一次性的问题:一旦对方返回了效果,连接断开,http2.0通过长连接办理了这个问题。
基于Http Server实现rpc请求

server端
package main

import (
        "encoding/json"
        "fmt"
        "net/http"
        "strconv"
)

func add(a, b int) int {
        return a + b
}

func main() {
        //get方法http://127.0.0.1:8000/add?a=1&b=2或http://127.0.0.1:8000?method=add&a=1&b=2
        //返回的格式化:json{"data":3}
        //1、callId的问题:r.URL.Path,2、数据的传输协议:url的参数传输协议,3、网络传输协议:http
        http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
                err := r.ParseForm() //解析参数
                if err != nil {
                        panic("error")
                }
                fmt.Println("path:", r.URL.Path)
                a, err := strconv.Atoi(r.Form["a"])
                if err != nil {
                        panic("transform error")
                }
                b, err := strconv.Atoi(r.Form["b"])
                if err != nil {
                        panic("transform error")
                }
                w.Header().Set("Content-Type", "application/json")
                jData, err := json.Marshal(mapint{
                        "data": a + b,
                })
                w.Write(jData)
        })
        _ = http.ListenAndServe(":8000", nil)
}

https://i-blog.csdnimg.cn/direct/533edbbd37544982bceb7d64a988b4a9.png
缺点:http1.x,麻烦,性能不高
客户端:
package main

import (
        "encoding/json"
        "fmt"
        "time"

        "github.com/kirinlabs/HttpRequest"
)

type ResponseData struct {
        Data int `json:"data"`
}

func Add(a, b int) int {
        req := HttpRequest.NewRequest()
        res, _ := req.Get(fmt.Sprintf("http://127.0.0.1:8000/%s?a=%d&b=%d", "add", a, b))
        body, _ := res.Body()
        rspData := ResponseData{}
        _ = json.Unmarshal(body, &rspData)
        return rspData.Data
}
func main() {
        fmt.Println(Add(2, 2))
}
这里遇到了一个Get “http://127.0.0.1:8000/add?a=1&b=2”: context deadline exceeded (Client.Timeout exceeded while awaiting headers)的问题,主要是客户端默认网络请求时间太短
修改之后
package main

import (
        "encoding/json"
        "fmt"
        "time"

        "github.com/kirinlabs/HttpRequest"
)

type ResponseData struct {
        Data int `json:"data"`
}

func Add(a, b int) int {
        req := HttpRequest.NewRequest()
        req.SetTimeout(10 * time.Second)
        res, _ := req.Get(fmt.Sprintf("http://127.0.0.1:8000/%s?a=%d&b=%d", "add", a, b))
        body, _ := res.Body()
        rspData := ResponseData{}
        _ = json.Unmarshal(body, &rspData)
        return rspData.Data
}
func main() {
        fmt.Println(Add(2, 2))
}
rpc开发的要素分析

RPC技能在架构设计上有四部分构成,分别是:客户端、客户端存根、服务端、服务端存根。
客户端(Client):服务调用发起方,也称为服务消费者。
客户端存根(Client Stub):该步伐运行在客户端地点的计算机呆板上,主要用来存储要调用的服务器的地点,另外,该步伐还负责将客户端请求远端服务器步伐的数据信息打包成数据包,通过网络发送给服务端Stub步伐;其次,还要接收服务端Stub步伐发送的调用效果数据包,并剖析返回给客户端。
服务端(Server):远端的计算机呆板上运行的步伐,此中有客户端要调用的方法。
服务端存根(Server Stub):接收客户Stub步伐通过网络发送的请求消息数据包,并调用服务端中真正的步伐功能方法,完乐成能调用;其次,将服务端实行调用的效果进行数据处理打包发送给客户端Stub步伐。
rpc需要使用到的术语

1、动态署理技能: 上文中我们提到的Client Stub和Sever Stub步伐,在具体的编码和开发实践过程中,都是使用动态署理技能自动生成的一段步伐。
序列化和反序列化: 在RPC调用的过程中,我们可以看到数据需要在一台呆板上传输到另外一台呆板上。在互联网上,所有的数据都是以字节的形式进行传输的。而我们在编程的过程中,每每都是使用数据对象,因此想要在网络上将数据对象和相关变量进行传输,就需要对数据对象做序列化和反序列化的操作。
序列化:把对象转换为字节序列的过程称为对象的序列化,也就是编码的过程。
反序列化:把字节序列规复为对象的过程称为对象的反序列化,也就是解码的过程。
我们常见的Json,XML等相关框架都可以对数据做序列化和反序列化编解码操作。背面我们要学习的Protobuf协议,这也是一种数据编解码的协议,在RPC框架中使用的更广泛。
简单的rpc实例

服务端
package main

import (
        "net"
        "net/rpc"
)

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
        //返回值是通过修改reply的值
        *reply = "hello, " + request
        return nil
}
func main() {
        //rpc快速开发体验
        //1.实例化一个server
        listener, _ := net.Listen("tcp", ":1234")
        //2.注册处理逻辑
        _ = rpc.RegisterName("HelloService", &HelloService{})
        //3.启动服务
        conn, _ := listener.Accept() //当一个新的连接进来的时候
        rpc.ServeConn(conn)
}

客户端
package main

import (
        "encoding/json"
        "fmt"
        "net/rpc"
        "time"
)

func main() {

        //1.建立连接
        client, err := rpc.Dial("tcp", "localhost:1234")
        if err != nil {
                panic("连接失败")
        }
        var reply *string = new(string)
        err = client.Call("HelloService.Hello", "bobby", reply)
        if err != nil {
                panic("调用失败")
        }
        fmt.Println(*reply)
}

替换rpc的序列化协议为json

序列化协议为json,各种语言都可以调用服务端的内容
服务端
package main

import (
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
)

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
        //返回值是通过修改reply的值
        *reply = "hello, " + request
        return nil
}
func main() {
        //替换rpc的序列化协议为json
        //1.实例化一个server
        listener, _ := net.Listen("tcp", ":1234")
        //2.注册处理逻辑
        _ = rpc.RegisterName("HelloService", &HelloService{})
        //3.启动服务
        conn, _ := listener.Accept() //当一个新的连接进来的时候
        rpc.ServeCodec(jsonrpc.NewServerCodec(conn))

}

客户端
package main

import (
        "encoding/json"
        "fmt"
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
        "time"

        "github.com/kirinlabs/HttpRequest"
)

func main() {
        //替换rpc的序列化协议为json
        //1.建立连接
        conn, err := net.Dial("tcp", "localhost:1234")
        if err != nil {
                panic("连接失败")
        }
        var reply *string = new(string)
        client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
        err = client.Call("HelloService.Hello", "bobby", reply)
        if err != nil {
                panic("调用失败")
        }
        fmt.Println(*reply)
}

python连接rpc,序列化协议为json

服务端
package main

import (
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
)

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
        //返回值是通过修改reply的值
        *reply = "hello, " + request
        return nil
}
func main() {
        //替换rpc的序列化协议为json
        //1.实例化一个server
        listener, _ := net.Listen("tcp", ":1234")
        //2.注册处理逻辑
        _ = rpc.RegisterName("HelloService", &HelloService{})
        //3.启动服务
        conn, _ := listener.Accept() //当一个新的连接进来的时候
        rpc.ServeCodec(jsonrpc.NewServerCodec(conn))

}

客户端
import json
import socket
request={
    "id":0,
    "params":["bobby"],
    "method":"HelloService.Hello"
}
client=socket.create_connection(("localhost",1234))
client.sendall(json.dumps(request).encode())
#获取服务器返回的数据
rsp=client.recv(4096)
rsp=json.loads(rsp.decode())
print(rsp)
替换rpc的传输协议为http

服务端
package main

import (
        "io"
        "net/http"
        "net/rpc"
        "net/rpc/jsonrpc"
)
type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
        //返回值是通过修改reply的值
        *reply = "hello, " + request
        return nil
}
func main() {
        //替换rpc的序列化协议为http
        //2.注册处理逻辑
        _ = rpc.RegisterName("HelloService", &HelloService{})
        http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
                var conn io.ReadWriteCloser = struct {
                        io.Writer
                        io.ReadCloser
                }{
                        ReadCloser: r.Body,
                        Writer:   w,
                }
                rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
        })
        http.ListenAndServe(":1234", nil)
}

客户端
import requests
request={
    "id":0,
    "params":["bobby"],
    "method":"HelloService.Hello"
}
rsp=requests.post("http://localhost:1234/jsonrpc",json=request)
print(rsp.text)
客户端
package main

import (
        "encoding/json"
        "fmt"
        "net/rpc"
        "time"

        "github.com/kirinlabs/HttpRequest"
)

func main() {
        //1.实例化一个
        client, err := rpc.Dial("tcp", "127.0.0.1:1234")
        if err != nil {
                panic("连接失败")
        }
        var reply string
        err = client.Call("HelloService.Hello", "bobby", &reply)
        if err != nil {
                // panic("调用失败")
                fmt.Println(err)
        }
        fmt.Println(reply)
}

署理封装

署理类
package handler

import "net/rpc"

const HelloServiceName = "HelloServiceName"

type HelloServiceStub struct {
        *rpc.Client
}

func NewHelloServiceClient(protcol, address string) HelloServiceStub {
        conn, err := rpc.Dial(protcol, address)
        if err != nil {
                panic("connect error!")
        }
        return HelloServiceStub{conn}
}
func (c *HelloServiceStub) Hello(request string, reply *string) error {
        err := c.Call(HelloServiceName+".Hello", request, reply)
        if err != nil {
                return err
        }
        return nil
}

type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error {
        //返回值是通过修改reply的值
        *reply = "hello, " + request
        return nil
}

func RegisterHelloService() error {
        return rpc.RegisterName(HelloServiceName, &HelloService{})
}

服务端
package main

import (
        "TEMP/handler"
        "net"
        "net/rpc"
)

func main() {
        //进一步改造rpc调用的代码
        //1.实例化一个server
        listener, _ := net.Listen("tcp", ":1234")
        //2.注册处理逻辑handler
        _ = handler.RegisterHelloService()
        //3.启动服务
        conn, _ := listener.Accept() //当一个新的连接进来的时候
        rpc.ServeConn(conn)

}

客户端
package main

import (
        "TEMP/handler"
        "fmt"
)

func main() {
        //进一步改造rpc调用的代码
        //1.实例化一个
        client := handler.NewHelloServiceClient("tcp", "127.0.0.1:1234")
        var reply string
        err := client.Hello("bobby", &reply)
        if err != nil {
                // panic("调用失败")
                fmt.Println(err)
        }
        fmt.Println(reply)
}

解耦合

将服务端的Hello函数传递变换成接口类型
署理类
package handler

import "net/rpc"

const HelloServiceName = "HelloServiceName"

type HelloServiceStub struct {
        *rpc.Client
}

func NewHelloServiceClient(protcol, address string) HelloServiceStub {
        conn, err := rpc.Dial(protcol, address)
        if err != nil {
                panic("connect error!")
        }
        return HelloServiceStub{conn}
}
func (c *HelloServiceStub) Hello(request string, reply *string) error {
        err := c.Call(HelloServiceName+".Hello", request, reply)
        if err != nil {
                return err
        }
        return nil
}

type NewHelloService struct {
}
type HelloServicer interface {
        Hello(request string, reply *string) error
}

func (s *NewHelloService) Hello(request string, reply *string) error {
        //返回值是通过修改reply的值
        *reply = "hello, " + request
        return nil
}

func RegisterHelloService() error {
        return rpc.RegisterName(HelloServiceName, &NewHelloService{})
}
服务端
package main

import (
        "TEMP/handler"
        "net"
        "net/rpc"
)

func main() {
        //进一步改造rpc调用的代码
        //1.实例化一个server
        listener, _ := net.Listen("tcp", ":1234")
        //2.注册处理逻辑handler
        _ = handler.RegisterHelloService()
        //3.启动服务
        conn, _ := listener.Accept() //当一个新的连接进来的时候
        rpc.ServeConn(conn)

}

客户端
package main

import (
        "TEMP/handler"
        "fmt"
)

func main() {
        //进一步改造rpc调用的代码
        //1.实例化一个
        client := handler.NewHelloServiceClient("tcp", "127.0.0.1:1234")
        var reply string
        err := client.Hello("bobby", &reply)
        if err != nil {
                // panic("调用失败")
                fmt.Println(err)
        }
        fmt.Println(reply)
}


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: golang rpc