文盘Rust -- tonic-Rust grpc初体验

打印 上一主题 下一主题

主题 894|帖子 894|积分 2682

gRPC 是开发中常用的开源高性能远程过程调用(RPC)框架,tonic 是基于 HTTP/2 的 gRPC 实现,专注于高性能、互操作性和灵活性。该库的创建是为了对 async/await 提供一流的支持,并充当用 Rust 编写的生产系统的核心构建块。今天我们聊聊通过使用tonic 调用grpc的的具体过程。
工程规划

rpc程序一般包含server端和client端,为了方便我们把两个程序打包到一个工程里面 新建tonic_sample工程
  1. cargo new tonic_sample
复制代码
Cargo.toml 如下
  1. [package]
  2. name = "tonic_sample"
  3. version = "0.1.0"
  4. edition = "2021"
  5. [[bin]] # Bin to run the gRPC server
  6. name = "stream-server"
  7. path = "src/stream_server.rs"
  8. [[bin]] # Bin to run the gRPC client
  9. name = "stream-client"
  10. path = "src/stream_client.rs"
  11. [dependencies]
  12. tokio.workspace = true
  13. tonic = "0.9"
  14. tonic-reflection = "0.9.2"
  15. prost = "0.11"
  16. tokio-stream = "0.1"
  17. async-stream = "0.2"
  18. serde = { version = "1.0", features = ["derive"] }
  19. serde_json = "1.0"
  20. rand = "0.7"
  21. h2 = { version = "0.3" }
  22. anyhow = "1.0.75"
  23. futures-util = "0.3.28"
  24. [build-dependencies]
  25. tonic-build = "0.9"
复制代码
tonic 的示例代码还是比较齐全的,本次我们参考 tonic 的 streaming example
首先编写 proto 文件,用来描述报文。 proto/echo.proto
  1. syntax = "proto3";
  2. package stream;
  3. // EchoRequest is the request for echo.
  4. message EchoRequest { string message = 1; }
  5. // EchoResponse is the response for echo.
  6. message EchoResponse { string message = 1; }
  7. // Echo is the echo service.
  8. service Echo {
  9.   // UnaryEcho is unary echo.
  10.   rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  11.   // ServerStreamingEcho is server side streaming.
  12.   rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  13.   // ClientStreamingEcho is client side streaming.
  14.   rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  15.   // BidirectionalStreamingEcho is bidi streaming.
  16.   rpc BidirectionalStreamingEcho(stream EchoRequest)
  17.       returns (stream EchoResponse) {}
  18. }
复制代码
文件并不复杂,只有两个 message 一个请求一个返回,之所以选择这个示例是因为该示例包含了rpc中的流式处理,包扩了server 流、client 流以及双向流的操作。 编辑build.rs 文件
  1. use std::{env, path::PathBuf};
  2. fn main() -> Result<(), Box<dyn std::error::Error>> {
  3.     tonic_build::compile_protos("proto/echo.proto")?;
  4.     Ok(())
  5. }
复制代码
该文件用来通过 tonic-build 生成 grpc 的 rust 基础代码
完成上述工作后就可以构建 server 和 client 代码了
stream_server.rs
  1. pub mod pb {
  2.     tonic::include_proto!("stream");
  3. }
  4. use anyhow::Result;
  5. use futures_util::FutureExt;
  6. use pb::{EchoRequest, EchoResponse};
  7. use std::{
  8.     error::Error,
  9.     io::ErrorKind,
  10.     net::{SocketAddr, ToSocketAddrs},
  11.     pin::Pin,
  12.     thread,
  13.     time::Duration,
  14. };
  15. use tokio::{
  16.     net::TcpListener,
  17.     sync::{
  18.         mpsc,
  19.         oneshot::{self, Receiver, Sender},
  20.         Mutex,
  21.     },
  22.     task::{self, JoinHandle},
  23. };
  24. use tokio_stream::{
  25.     wrappers::{ReceiverStream, TcpListenerStream},
  26.     Stream, StreamExt,
  27. };
  28. use tonic::{transport::Server, Request, Response, Status, Streaming};
  29. type EchoResult<T> = Result<Response<T>, Status>;
  30. type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
  31. fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
  32.     let mut err: &(dyn Error + 'static) = err_status;
  33.     loop {
  34.         if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
  35.             return Some(io_err);
  36.         }
  37.         // h2::Error do not expose std::io::Error with `source()`
  38.         // https://github.com/hyperium/h2/pull/462
  39.         if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
  40.             if let Some(io_err) = h2_err.get_io() {
  41.                 return Some(io_err);
  42.             }
  43.         }
  44.         err = match err.source() {
  45.             Some(err) => err,
  46.             None => return None,
  47.         };
  48.     }
  49. }
  50. #[derive(Debug)]
  51. pub struct EchoServer {}
  52. #[tonic::async_trait]
  53. impl pb::echo_server::Echo for EchoServer {
  54.     async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> {
  55.         let req_str = req.into_inner().message;
  56.         let response = EchoResponse { message: req_str };
  57.         Ok(Response::new(response))
  58.     }
  59.     type ServerStreamingEchoStream = ResponseStream;
  60.     async fn server_streaming_echo(
  61.         &self,
  62.         req: Request<EchoRequest>,
  63.     ) -> EchoResult<Self::ServerStreamingEchoStream> {
  64.         println!("EchoServer::server_streaming_echo");
  65.         println!("\tclient connected from: {:?}", req.remote_addr());
  66.         // creating infinite stream with requested message
  67.         let repeat = std::iter::repeat(EchoResponse {
  68.             message: req.into_inner().message,
  69.         });
  70.         let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
  71.         let (tx, rx) = mpsc::channel(128);
  72.         tokio::spawn(async move {
  73.             while let Some(item) = stream.next().await {
  74.                 match tx.send(Result::<_, Status>::Ok(item)).await {
  75.                     Ok(_) => {
  76.                         // item (server response) was queued to be send to client
  77.                     }
  78.                     Err(_item) => {
  79.                         // output_stream was build from rx and both are dropped
  80.                         break;
  81.                     }
  82.                 }
  83.             }
  84.             println!("\tclient disconnected");
  85.         });
  86.         let output_stream = ReceiverStream::new(rx);
  87.         Ok(Response::new(
  88.             Box::pin(output_stream) as Self::ServerStreamingEchoStream
  89.         ))
  90.     }
  91.     async fn client_streaming_echo(
  92.         &self,
  93.         _: Request<Streaming<EchoRequest>>,
  94.     ) -> EchoResult<EchoResponse> {
  95.         Err(Status::unimplemented("not implemented"))
  96.     }
  97.     type BidirectionalStreamingEchoStream = ResponseStream;
  98.     async fn bidirectional_streaming_echo(
  99.         &self,
  100.         req: Request<Streaming<EchoRequest>>,
  101.     ) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
  102.         println!("EchoServer::bidirectional_streaming_echo");
  103.         let mut in_stream = req.into_inner();
  104.         let (tx, rx) = mpsc::channel(128);
  105.         tokio::spawn(async move {
  106.             while let Some(result) = in_stream.next().await {
  107.                 match result {
  108.                     Ok(v) => tx
  109.                         .send(Ok(EchoResponse { message: v.message }))
  110.                         .await
  111.                         .expect("working rx"),
  112.                     Err(err) => {
  113.                         if let Some(io_err) = match_for_io_error(&err) {
  114.                             if io_err.kind() == ErrorKind::BrokenPipe {
  115.                                 eprintln!("\tclient disconnected: broken pipe");
  116.                                 break;
  117.                             }
  118.                         }
  119.                         match tx.send(Err(err)).await {
  120.                             Ok(_) => (),
  121.                             Err(_err) => break, // response was droped
  122.                         }
  123.                     }
  124.                 }
  125.             }
  126.             println!("\tstream ended");
  127.         });
  128.         // echo just write the same data that was received
  129.         let out_stream = ReceiverStream::new(rx);
  130.         Ok(Response::new(
  131.             Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
  132.         ))
  133.     }
  134. }
  135. #[tokio::main]
  136. async fn main() -> Result<(), Box<dyn std::error::Error>> {
  137.     // 基础server
  138.     let server = EchoServer {};
  139.     Server::builder()
  140.         .add_service(pb::echo_server::EchoServer::new(server))
  141.         .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
  142.         .await
  143.         .unwrap();
  144.     Ok(())
  145. }
复制代码
server 端的代码还是比较清晰的,首先通过 tonic::include_proto! 宏引入grpc定义,参数是 proto 文件中定义的 package 。我们重点说说 server_streaming_echo function 。这个function 的处理流程明白了,其他的流式处理大同小异。首先 通过std::iter::repeat function 定义一个迭代器;然后构建 tokio_stream 在本示例中 每 200毫秒产生一个 repeat;最后构建一个 channel ,tx 用来发送从stream中获取的内容太,rx 封装到response 中返回。 最后 main 函数 拉起服务。
client 代码如下
  1. pub mod pb {
  2.     tonic::include_proto!("stream");
  3. }
  4. use std::time::Duration;
  5. use tokio_stream::{Stream, StreamExt};
  6. use tonic::transport::Channel;
  7. use pb::{echo_client::EchoClient, EchoRequest};
  8. fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
  9.     tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
  10.         message: format!("msg {:02}", i),
  11.     })
  12. }
  13. async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) {
  14.     for i in 0..num {
  15.         let req = tonic::Request::new(EchoRequest {
  16.             message: "msg".to_string() + &i.to_string(),
  17.         });
  18.         let resp = client.unary_echo(req).await.unwrap();
  19.         println!("resp:{}", resp.into_inner().message);
  20.     }
  21. }
  22. async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
  23.     let stream = client
  24.         .server_streaming_echo(EchoRequest {
  25.             message: "foo".into(),
  26.         })
  27.         .await
  28.         .unwrap()
  29.         .into_inner();
  30.     // stream is infinite - take just 5 elements and then disconnect
  31.     let mut stream = stream.take(num);
  32.     while let Some(item) = stream.next().await {
  33.         println!("\treceived: {}", item.unwrap().message);
  34.     }
  35.     // stream is droped here and the disconnect info is send to server
  36. }
  37. async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
  38.     let in_stream = echo_requests_iter().take(num);
  39.     let response = client
  40.         .bidirectional_streaming_echo(in_stream)
  41.         .await
  42.         .unwrap();
  43.     let mut resp_stream = response.into_inner();
  44.     while let Some(received) = resp_stream.next().await {
  45.         let received = received.unwrap();
  46.         println!("\treceived message: `{}`", received.message);
  47.     }
  48. }
  49. async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
  50.     let in_stream = echo_requests_iter().throttle(dur);
  51.     let response = client
  52.         .bidirectional_streaming_echo(in_stream)
  53.         .await
  54.         .unwrap();
  55.     let mut resp_stream = response.into_inner();
  56.     while let Some(received) = resp_stream.next().await {
  57.         let received = received.unwrap();
  58.         println!("\treceived message: `{}`", received.message);
  59.     }
  60. }
  61. #[tokio::main]
  62. async fn main() -> Result<(), Box<dyn std::error::Error>> {
  63.     let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();
  64.     println!("Unary echo:");
  65.     unary_echo(&mut client, 10).await;
  66.     tokio::time::sleep(Duration::from_secs(1)).await;
  67.     println!("Streaming echo:");
  68.     streaming_echo(&mut client, 5).await;
  69.     tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
  70.     // Echo stream that sends 17 requests then graceful end that connection
  71.     println!("\r\nBidirectional stream echo:");
  72.     bidirectional_streaming_echo(&mut client, 17).await;
  73.     // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
  74.     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
  75.     // graceful client disconnection (above example) on the server side.
  76.     println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
  77.     bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
  78.     Ok(())
  79. }
复制代码
测试一下,分别运行 server 和 client
  1. cargo run --bin stream-server
  2. cargo run --bin stream-client
复制代码
在开发中,我们通常不会再 client 和 server都开发好的情况下才开始测试。通常在开发server 端的时候采用 grpcurl 工具进行测试工作
  1. grpcurl -import-path ./proto -proto echo.proto list
  2. grpcurl -import-path ./proto -proto  echo.proto describe stream.Echo
  3. grpcurl -plaintext -import-path ./proto -proto  echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho
复制代码
此时,如果我们不指定 -import-path 参数,执行如下命令
  1. grpcurl -plaintext 127.0.0.1:50051 list
复制代码
会出现如下报错信息
  1. Failed to list services: server does not support the reflection API
复制代码
让服务端程序支持 reflection API

首先改造build.rs
  1. use std::{env, path::PathBuf};
  2. fn main() -> Result<(), Box<dyn std::error::Error>> {
  3.     let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
  4.     tonic_build::configure()
  5.         .file_descriptor_set_path(out_dir.join("stream_descriptor.bin"))
  6.         .compile(&["proto/echo.proto"], &["proto"])
  7.         .unwrap();
  8.     Ok(())
  9. }
复制代码
file_descriptor_set_path 生成一个文件,其中包含为协议缓冲模块编码的 prost_types::FileDescriptorSet 文件。这是实现 gRPC 服务器反射所必需的。
接下来改造一下 stream-server.rs,涉及两处更改。
新增 STREAM_DESCRIPTOR_SET 常量
  1. pub mod pb {
  2.     tonic::include_proto!("stream");
  3.     pub const STREAM_DESCRIPTOR_SET: &[u8] =
  4.         tonic::include_file_descriptor_set!("stream_descriptor");
  5. }
复制代码
修改main函数
  1. #[tokio::main]
  2. async fn main() -> Result<(), Box<dyn std::error::Error>> {
  3.     // 基础server
  4.     // let server = EchoServer {};
  5.     // Server::builder()
  6.     //     .add_service(pb::echo_server::EchoServer::new(server))
  7.     //     .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
  8.     //     .await
  9.     //     .unwrap();
  10.     // tonic_reflection
  11.     let service = tonic_reflection::server::Builder::configure()
  12.         .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET)
  13.         .with_service_name("stream.Echo")
  14.         .build()
  15.         .unwrap();
  16.     let addr = "0.0.0.0:50051".parse().unwrap();
  17.     let server = EchoServer {};
  18.     Server::builder()
  19.         .add_service(service)
  20.         .add_service(pb::echo_server::EchoServer::new(server))
  21.         .serve(addr)
  22.         .await?;
  23.     Ok(())
  24. }
复制代码
register_encoded_file_descriptor_set 将包含编码的 prost_types::FileDescriptorSet 的 byte slice 注册到 gRPC Reflection 服务生成器注册。
再次测试
  1. grpcurl -plaintext 127.0.0.1:50051 list
  2. grpcurl -plaintext 127.0.0.1:50051 describe stream.Echo
复制代码
返回正确结果。
以上完整代码地址
作者:京东科技 贾世闻
来源:京东云开发者社区 转载请注明来源

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美丽的神话

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表