40. 干货系列从零用Rust编写负载均衡及代理,websocket的实现 ...

打印 上一主题 下一主题

主题 874|帖子 874|积分 2622

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子
项目地址

国内: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
简单介绍websocket

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API 也被 W3C 定为标准。
也就是在web环境中,websocket就是socket的一种标准形式的体现。类似的还要SSE基于HTTP中的text/event-stream
源码文件含义

协议层的编码解码主要在webparse/ws

  • frame_header协议头的解码与编码
  • dataframe 基础单位为帧,存在多帧组成一个数据包的情况
  • message 协议的基本信息,包含Text必须为UTF-8字符串文本,Binary二进制数据流,Close关闭信息,Ping,Pong用来做心跳包相关的信息。
  • mask 是否为数据进行基本的加密,服务端要求客户端传来的数据必须加密
网络处理层的源码主要在wenmeng/ws

  • codec/framed_read 每一帧的读,以帧为单位进行读取
  • codec/framed_write 每一帧的写,以帧为单位进行写入
  • state/state_handshake websocket连接内部的握手状态
  • client_connection 客户端的状态连接
  • server_connection 服务端的状态连接
  • control 状态的控制,写入读取的pending等,核心处理源码
  • handshake 定义on_open回调后的WsHandshake类
  • option 定义on_open回调后返回的WsOption类,当下只包含定时器,即客户端多久时间唤醒一次interval
  • ws_trait websocket的核心回调
  1. #[async_trait]
  2. pub trait WsTrait: Send {
  3.     /// 通过请求连接构建出返回的握手连接信息
  4.     #[inline]
  5.     fn on_request(&mut self, req: &RecvRequest) -> ProtResult<RecvResponse> {
  6.         // warn!("Handler received request:\n{}", req);
  7.         WsHandshake::build_request(req)
  8.     }
  9.     /// 握手完成后之后的回调,服务端返回了Response之后就认为握手成功
  10.     fn on_open(&mut self, shake: WsHandshake) -> ProtResult<Option<WsOption>>;
  11.     /// 接受到远端的关闭消息
  12.     async fn on_close(&mut self, reason: &Option<CloseData>) {}
  13.     /// 服务内部出现了错误代码
  14.     async fn on_error(&mut self, err: ProtError) {}
  15.     /// 收到来在远端的ping消息, 默认返回pong消息
  16.     async fn on_ping(&mut self, val: Vec<u8>) -> ProtResult<OwnedMessage> {
  17.         return Ok(OwnedMessage::Pong(val));
  18.     }
  19.     /// 收到来在远端的pong消息, 默认不做任何处理, 可自定义处理如ttl等
  20.     async fn on_pong(&mut self, val: Vec<u8>) {}
  21.     /// 收到来在远端的message消息, 必须覆写该函数
  22.     async fn on_message(&mut self, msg: OwnedMessage) -> ProtResult<()>;
  23.     /// 定时器定时按间隔时间返回
  24.     async fn on_interval(&mut self, option: &mut Option<WsOption>) -> ProtResult<()> {
  25.         Ok(())
  26.     }
  27.    
  28.     /// 将当前trait转化成Any,仅需当需要重新获取回调处理的时候进行处理
  29.     fn as_any(&self) -> Option<&dyn Any> {
  30.         None
  31.     }
  32.     /// 将当前trait转化成mut Any,仅需当需要重新获取回调处理的时候进行处理
  33.     fn as_any_mut(&mut self) -> Option<&mut dyn Any> {
  34.         None
  35.     }
  36. }
复制代码
服务端基础demo

建立一个本地监听8081的ws端口,完整源码ws_server
建立监听类:
  1. struct Operate {
  2.     sender: Option<Sender<OwnedMessage>>,
  3. }
  4. #[async_trait]
  5. impl WsTrait for Operate {
  6.     fn on_open(&mut self, shake: WsHandshake) -> ProtResult<Option<WsOption>> {
  7.         self.sender = Some(shake.sender);
  8.         Ok(Some(WsOption::new(Duration::from_secs(10))))
  9.     }
  10.     async fn on_message(&mut self, msg: OwnedMessage) -> ProtResult<()> {
  11.         println!("callback on message = {:?}", msg);
  12.         let _ = self
  13.             .sender
  14.             .as_mut()
  15.             .unwrap()
  16.             .send(OwnedMessage::Text("from server".to_string()))
  17.             .await;
  18.         let _ = self.sender.as_mut().unwrap().send(msg).await;
  19.         Ok(())
  20.     }
  21.     async fn on_interval(&mut self, _option: &mut Option<WsOption>) -> ProtResult<()> {
  22.         println!("on_interval!!!!!!!");
  23.         Ok(())
  24.     }
  25. }
复制代码
然后启动服务器监听:
  1. async fn run_main() -> Result<(), Box<dyn Error>> {
  2.     let addr = "127.0.0.1:8081".to_string();
  3.     let server = TcpListener::bind(&addr).await?;
  4.     println!("Listening on: {}", addr);
  5.     loop {
  6.         let (stream, addr) = server.accept().await?;
  7.         tokio::spawn(async move {
  8.             let mut server = Server::new(stream, Some(addr));
  9.             let operate = Operate { sender: None };
  10.             // 设置服务回调
  11.             server.set_callback_ws(Box::new(operate));
  12.             let e = server.incoming().await;
  13.             println!("close server ==== addr = {:?} e = {:?}", addr, e);
  14.         });
  15.     }
  16. }
复制代码
此时即可实现websocket的监听及处理。
客户端demo

当下客户端demo需要能接受终端的输入,并向服务器发送数据,所以需要自己构建sender
建立客户端连接,在这里我们手动构建了一个sender/receiver对。
  1. async fn run_main() -> ProtResult<()> {
  2.     // 自己手动构建数据对,并将receiver传给服务端
  3.     let (sender, receiver) = channel(10);
  4.     let sender_clone = sender.clone();
  5.     tokio::spawn(async move {
  6.         let url = "ws://127.0.0.1:8081";
  7.         let mut client = Client::builder()
  8.             .url(url)
  9.             .unwrap()
  10.             .connect()
  11.             .await
  12.             .unwrap();
  13.         client.set_callback_ws(Box::new(Operate { sender:Some(sender_clone), receiver: Some(receiver) }));
  14.         client.wait_ws_operate().await.unwrap();
  15.     });
  16.     loop {
  17.         let mut buffer = String::new();
  18.         let stdin = io::stdin(); // We get `Stdin` here.
  19.         stdin.read_line(&mut buffer)?;
  20.         sender.send(OwnedMessage::Text(buffer)).await?;
  21.     }
  22.     Ok(())
  23. }
复制代码
监听实现
  1. struct Operate {
  2.     sender: Option<Sender<OwnedMessage>>,
  3.     receiver: Option<Receiver<OwnedMessage>>,
  4. }
  5. #[async_trait]
  6. impl WsTrait for Operate {
  7.     fn on_open(&mut self, shake: WsHandshake) -> ProtResult<Option<WsOption>> {
  8.         // 将receiver传给控制中心, 以让其用该receiver做接收
  9.         let mut option = WsOption::new(Duration::from_secs(1000));
  10.         if self.receiver.is_some() {
  11.             option.set_receiver(self.receiver.take().unwrap());
  12.         }
  13.         if self.sender.is_none() {
  14.             self.sender = Some(shake.sender);
  15.         }
  16.         Ok(Some(option))
  17.     }
  18.     async fn on_message(&mut self, msg: OwnedMessage) -> ProtResult<()> {
  19.         println!("callback on message = {:?}", msg);
  20.         let _ = self
  21.             .sender
  22.             .as_mut()
  23.             .unwrap()
  24.             .send(OwnedMessage::Text("from client".to_string()))
  25.             .await;
  26.         let _ = self.sender.as_mut().unwrap().send(msg).await;
  27.         Ok(())
  28.     }
  29.     async fn on_interval(&mut self, _option: &mut Option<WsOption>) -> ProtResult<()> {
  30.         println!("on_interval!!!!!!!");
  31.         Ok(())
  32.     }
  33. }
复制代码
接口说明

Client和Server为了同时兼容HTTP服务,即握手用的为HTTP的前半段请求,选择了回调用Box的形式来做回调函数的处理。
  1. pub struct Server<T>
  2. where
  3.     T: AsyncRead + AsyncWrite + Unpin + Sized,
  4. {
  5.     /// http的接口回调, 处理http服务器
  6.     callback_http: Option<Box<dyn HttpTrait>>,
  7.     /// websocket的接口回调, 处理websocket服务器
  8.     callback_ws: Option<Box<dyn WsTrait>>,
  9.     // ...
  10. }
复制代码
他们两个可能是同时存在,或者单个存在的,即当作服务的时候,可能仅对/ws进行websocket的升级,其它的仅仅是http服务,所以需要能单独又能聚合的处理数据。而单存的websocket仅需WsTrait回调。
即在pub async fn incoming(&mut self) -> ProtResult处理服务的时候不在传入回调地址,改成预先设置。达到灵活处理的目的。且接口比较清晰。
小结

wenmeng库当前已支持HTTP1.1/HTTP2/WEBSOCKET,在浏览器的环境中websocket是必不可缺少的存在,当然有很多原生的服务中用的都是socket,下一章中,我们将实现websocket与tcp的互转,以便一些tcp的程序可以服务web的服务。
点击 [关注][在看][点赞] 是对作者最大的支持

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

渣渣兔

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表