f 数据仓库与分析-gRPC学习条记记录以及整合gin开辟 - Powered by qidao123.com技术社区

gRPC学习条记记录以及整合gin开辟

打印 上一主题 下一主题

主题 2035|帖子 2035|积分 6105

gprc底子

前置环境准备

grpc下载

项目目录下执行
  1. go get google.golang.org/grpc@latest
复制代码
Protocol Buffers v3


https://github.com/protocolbuffers/protobuf/releases/download/v3.20.1/protoc-3.20.1-linux-x86_64.zip
go语言插件:

  1. go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
复制代码
rpc插件

  1. go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
复制代码

gRPC Hello World快速上手


基本rpc调用


服务端protoc编写


定义一个hello.proto文件
  1. syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本
  2. option go_package = "hello_server/pb";  // 指定生成的Go代码在你项目中的导入路径
  3. package pb; // 包名
  4. // 定义服务
  5. service Greeter {
  6.     // SayHello 方法
  7.     rpc SayHello (HelloRequest) returns (HelloResponse) {}
  8. }
  9. // 请求消息
  10. message HelloRequest {
  11.     string name = 1;
  12. }
  13. // 响应消息
  14. message HelloResponse {
  15.     string reply = 1;
  16. }
复制代码
服务端编写

  1. package main
  2. import (
  3.         "context"
  4.         "fmt"
  5.         "hello_server/pb"
  6.         "net"
  7.         "google.golang.org/grpc"
  8. )
  9. // hello server
  10. type server struct {
  11.         pb.UnimplementedGreeterServer
  12. }
  13. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
  14.         return &pb.HelloResponse{Reply: "Hello " + in.Name}, nil
  15. }
  16. func main() {
  17.         // 监听本地的8972端口
  18.         lis, err := net.Listen("tcp", ":8972")
  19.         if err != nil {
  20.                 fmt.Printf("failed to listen: %v", err)
  21.                 return
  22.         }
  23.         s := grpc.NewServer()                  // 创建gRPC服务器
  24.         pb.RegisterGreeterServer(s, &server{}) // 在gRPC服务端注册服务
  25.         // 启动服务
  26.         err = s.Serve(lis)
  27.         if err != nil {
  28.                 fmt.Printf("failed to serve: %v", err)
  29.                 return
  30.         }
  31. }
复制代码
客户端protoc编写

  1. syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本
  2. option go_package = "hello_client/pb"; // 指定生成的Go代码在你项目中的导入路径
  3. package pb; // 包名
  4. // 定义服务
  5. service Greeter {
  6.     // SayHello 方法
  7.     rpc SayHello (HelloRequest) returns (HelloResponse) {}
  8. }
  9. // 请求消息
  10. message HelloRequest {
  11.     string name = 1;
  12. }
  13. // 响应消息
  14. message HelloResponse {
  15.     string reply = 1;
  16. }
复制代码
客户端编写

  1. package main
  2. import (
  3.         "context"
  4.         "flag"
  5.         "log"
  6.         "time"
  7.         "hello_client/pb"
  8.         "google.golang.org/grpc"
  9.         "google.golang.org/grpc/credentials/insecure"
  10. )
  11. // hello_client
  12. const (
  13.         defaultName = "world"
  14. )
  15. var (
  16.         addr = flag.String("addr", "127.0.0.1:8972", "the address to connect to")
  17.         name = flag.String("name", defaultName, "Name to greet")
  18. )
  19. func main() {
  20.         flag.Parse()
  21.         // 连接到server端,此处禁用安全传输
  22.         conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  23.         if err != nil {
  24.                 log.Fatalf("did not connect: %v", err)
  25.         }
  26.         defer conn.Close()
  27.         c := pb.NewGreeterClient(conn)
  28.         // 执行RPC调用并打印收到的响应数据
  29.         ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  30.         defer cancel()
  31.         r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  32.         if err != nil {
  33.                 log.Fatalf("could not greet: %v", err)
  34.         }
  35.         log.Printf("Greeting: %s", r.GetReply())
  36. }
复制代码

分别在客户端和服务端执行如下步伐生成代码
  1. protoc --go_out=. --go_opt=paths=source_relative \
  2. --go-grpc_out=. --go-grpc_opt=paths=source_relative \
  3. pb/hello.proto
复制代码

生成服务端目录布局


生成客户端目录布局

服务端编译执行

go build编译生成hello_server
执行


客户段编译执行



如今来看看为啥客户端会打印”Hello 哈哈哈“
起首看下客户端main函数
其中有一行关键代码
  1.         r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
复制代码
这里调用了SayHello函数而客户段的Sayhello函数又调用了服务端的SayHello函数
服务端SayHello函数
  1. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
  2.         return &pb.HelloResponse{Reply: "Hello " + in.Name}, nil
  3. }
复制代码



流式rpc调用

服务端流式调用

在原有底子上proto文件上增加如下函数
  1. rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
复制代码
重新生成代码
重新go build
服务端增加LotsOfReplies实现

  1. // LotsOfReplies 返回使用多种语言打招呼
  2. func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {
  3.         words := []string{
  4.                 "你好",
  5.                 "hello",
  6.                 "こんにちは",
  7.                 "안녕하세요",
  8.         }
  9.         for _, word := range words {
  10.                 data := &pb.HelloResponse{
  11.              //循环拼接打招呼信息与客户端传过来的用户
  12.                         Reply: word + in.GetName(),
  13.                 }
  14.                 // 拼接打招呼信息使用流式的Send方法返回多个数据
  15.                 if err := stream.Send(data); err != nil {
  16.                         return err
  17.                 }
  18.         }
  19.         return nil
  20. }
复制代码
客户端增加LotsOfReplies实现

  1. func runLotsOfReplies(c pb.GreeterClient) {
  2.         // server端流式RPC
  3.         // 延长超时时间避免中断
  4.         ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  5.         defer cancel()
  6.         stream, err := c.LotsOfReplies(ctx, &pb.HelloRequest{Name: *name})
  7.         if err != nil {
  8.                 log.Fatalf("c.LotsOfReplies failed, err: %v", err)
  9.         }
  10.         for {
  11.                 // 接收服务端返回的流式数据,服务端通过send发送,客户端通过Recv接受,当收到io.EOF或错误时退出
  12.                 res, err := stream.Recv()
  13.                 if err == io.EOF {
  14.                         break
  15.                 }
  16.                 if err != nil {
  17.                         log.Fatalf("c.LotsOfReplies failed, err: %v", err)
  18.                 }
  19.                 log.Printf("got reply: %q\n", res.GetReply())
  20.         }
  21. }
复制代码
留意这里服务端是通过stream.Send(data)发送数据的 客户端是通过stream.Recv()接受数据的

执行


留意此时的流的流向为服务端流向客户端 所以称之为服务端流式调用
服务端实时发送数据到流中,客户端实时监听流中有无数据,当监听到没有数据了流关闭,客户端关闭
场景举例:


  • 股票行情推送:客户端哀求某股票代码后,服务端连续推送实时价格颠簸数据
  • 物联网物联网设备监控监控:服务端连续推送温度传感器、GPS定位等实时收罗数据流
  • 在线游戏状态同步:服务端向玩家客户端连续推送其他玩家的位置和动作数据
  • 视频传播输:客户端哀求视频文件后,服务端分块传输视频流数据
  • 日记文件传输:服务端将大型日记文件拆分为多个数据包流式传输
  • 数据库查询结果集传输:当查询结果包含百万级记录时,服务端分批次流式返回数据
客户端流式调用

在客户端和服务端的proto文件依次增加如下步伐
  1.     rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
复制代码
重新生成代码
重新go build

服务端增加LotsOfGreetings实现

  1. func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error {
  2.         reply := "你好:"
  3.         for {
  4.                 // 接收客户端发来的流式数据
  5.                 res, err := stream.Recv()
  6.                 if err == io.EOF {
  7.                         // 最终统一回复
  8.                         return stream.SendAndClose(&pb.HelloResponse{
  9.                                 Reply: reply,
  10.                         })
  11.                 }
  12.                 if err != nil {
  13.                         return err
  14.                 }
  15.                 reply += res.GetName()
  16.         }
  17. }
复制代码

客户端增加LotsOfGreetings实现

  1. func runLotsOfGreeting(c pb.GreeterClient) {
  2.         ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  3.         defer cancel()
  4.         // 客户端流式RPC
  5.         stream, err := c.LotsOfGreetings(ctx)
  6.         if err != nil {
  7.                 log.Fatalf("c.LotsOfGreetings failed, err: %v", err)
  8.         }
  9.         names := []string{"风清扬,", "扫地僧,", "无嗔大师"}
  10.         for _, name := range names {
  11.                 // 发送流式数据
  12.                 err := stream.Send(&pb.HelloRequest{Name: name})
  13.                 if err != nil {
  14.                         log.Fatalf("c.LotsOfGreetings stream.Send(%v) failed, err: %v", name, err)
  15.                 }
  16.         }
  17.         res, err := stream.CloseAndRecv()
  18.         if err != nil {
  19.                 log.Fatalf("c.LotsOfGreetings failed: %v", err)
  20.         }
  21.         log.Printf("got reply: %v", res.GetReply())
  22. }
复制代码
这里的调用和服务端流式调用反过来了
流式数据由客户端进行发送多次数据stream.Send,客户端统一做接受stream.Recv()
执行

场景举例:


  • 日记聚合系统:多个客户端步伐连续发送日记片段,服务端进行归并存储并返回写入状态
  • 图片分块上传:移动端将大图拆分为多个数据包流式传输,服务端完成重组后返回MD5校验
  • 直播推流场景:客户端分片上传视频流,服务端转码后返回转码乐成相应


双向流式调用

在客户端和服务端的proto文件中加上如下步伐
  1. // 双向流式数据
  2. rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
复制代码
重新生成代码
重新go build
服务端增加BidiHello实现

  1. func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error {
  2.         for {
  3.                 // 接收流式请求
  4.                 in, err := stream.Recv()
  5.                 if err == io.EOF {
  6.                         return nil
  7.                 }
  8.                 if err != nil {
  9.                         return err
  10.                 }
  11.                 reply := in.GetName() + "收到了你的问候,祝你生活愉快!" // 对收到的数据做些处理
  12.                 // 返回流式响应
  13.                 if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil {
  14.                         return err
  15.                 }
  16.         }
  17. }
复制代码

客户端增加BidiHello实现

  1. func runBidiHello(c pb.GreeterClient) {
  2.         ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
  3.         defer cancel()
  4.         // 双向流模式
  5.         stream, err := c.BidiHello(ctx)
  6.         if err != nil {
  7.                 log.Fatalf("c.BidiHello failed, err: %v", err)
  8.         }
  9.         waitc := make(chan struct{})
  10.         go func() {
  11.                 for {
  12.                         // 接收服务端返回的响应
  13.                         in, err := stream.Recv()
  14.                         if err == io.EOF {
  15.                                 // read done.
  16.                                 close(waitc)
  17.                                 return
  18.                         }
  19.                         if err != nil {
  20.                                 log.Fatalf("c.BidiHello stream.Recv() failed, err: %v", err)
  21.                         }
  22.                         fmt.Printf("回答:%s\n", in.GetReply())
  23.                 }
  24.         }()
  25.         // 从标准输入获取用户输入
  26.         reader := bufio.NewReader(os.Stdin) // 从标准输入生成读对象
  27.         for {
  28.                 cmd, _ := reader.ReadString('\n') // 读到换行
  29.                 cmd = strings.TrimSpace(cmd)
  30.                 if len(cmd) == 0 {
  31.                         continue
  32.                 }
  33.                 if strings.ToUpper(cmd) == "QUIT" {
  34.                         break
  35.                 }
  36.                 // 将获取到的数据发送至服务端
  37.                 if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil {
  38.                         log.Fatalf("c.BidiHello stream.Send(%v) failed: %v", cmd, err)
  39.                 }
  40.         }
  41.         stream.CloseSend()
  42.         <-waitc
  43. }
复制代码

main函数调用
  1.         runBidiHello(c)
复制代码
执行

这里的流式数据传输是双向的
调用步骤

1,客户端创建流式调用
  1.         stream, err := c.BidiHello(ctx)
复制代码
2,客户端发送终端输入的指令进行send发送流式数据给服务端
  1. if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil {
  2.                         log.Fatalf("c.BidiHello stream.Send(%v) failed: %v", cmd, err)
  3.                 }
复制代码
3,服务端循环接受客户端的流式相应数据
  1. in, err := stream.Recv()
复制代码
4,服务端对于客户端的做加工处理和返回
  1. reply := in.GetName() + "收到了你的问候,祝你生活愉快!" // 对收到的数据做些处理
  2.                 // 返回流式响应
  3.                 if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil {
  4.                         return err
  5.                 }
复制代码




gRPC联合gin开辟用户注册接口

需求:利用gin+grpc+gorm实现用户注册接口,分模块微服务设计
项目总目录

[code]douyin
├─
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

罪恶克星

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表