42 干货系列从零用Rust编写负载均衡及代理,wmproxy中配置tcp转websocket ...

打印 上一主题 下一主题

主题 827|帖子 827|积分 2481

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子
项目地址

国内: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
设计目标

通过简单配置方便用户快速使用tcp转websocket及websocket转tcp,也可支持http升级到websocket协议。
改造http升级websocket

因为负载均衡的不确定性,在未读取数据前,未能确定当前的处理逻辑

  • /root/proxy.png 访问当前的件服务器
  • /api/up 通过负载均衡访问后端服务器
  • /ws 将连接升级成websocket
  • 其它情况
    所以我们得预备能支持websocket的可能,那我们将同时设置回调HTTP及websocket,源码在reverse/http.rs
  1. let timeout = oper.servers[0].comm.build_client_timeout();
  2. let mut server = Server::builder()
  3.     .addr(addr)
  4.     .timeout_layer(timeout)
  5.     .stream(inbound);
  6. // 设置HTTP回调
  7. server.set_callback_http(Box::new(Operate { inner: oper }));
  8. // 设置websocket回调,客户端有可能升级到websocket协议
  9. server.set_callback_ws(Box::new(ServerWsOperate::new(servers)));
  10. if let Err(e) = server.incoming().await {
  11.     if server.get_req_num() == 0 {
  12.         log::info!("反向代理:未处理任何请求时发生错误:{:?}", e);
  13.     } else {
  14.         if !e.is_io() {
  15.             log::info!("反向代理:处理信息时发生错误:{:?}", e);
  16.         }
  17.     }
  18. }
复制代码
ServerWsOperate中定义了服务的内部信息,及向远程websocket发送的sender,以做绑定
  1. pub struct ServerWsOperate {
  2.     inner: InnerWsOper,
  3.     sender: Option<Sender<OwnedMessage>>,
  4. }
复制代码
在on_open的时候建立和远程websocket的双向绑定:
  1. #[async_trait]
  2. impl WsTrait for ServerWsOperate {
  3.     /// 握手完成后之后的回调,服务端返回了Response之后就认为握手成功
  4.     async fn on_open(&mut self, shake: WsHandshake) -> ProtResult<Option<WsOption>> {
  5.         if shake.request.is_none() {
  6.             return Err(ProtError::Extension("miss request"));
  7.         }
  8.         let mut option = WsOption::new();
  9.         if let Some(location) =
  10.             ReverseHelper::get_location_by_req(&self.inner.servers, shake.request.as_ref().unwrap())
  11.         {
  12.             if !location.is_ws {
  13.                 return Err(ProtError::Extension("Not Support Ws"));
  14.             }
  15.             if let Ok((url, domain)) = location.get_reverse_url() {
  16.                 println!("connect url = {}, domain = {:?}", url, domain);
  17.                 let mut client = Client::builder()
  18.                     .url(url)?
  19.                     .connect_with_domain(&domain)
  20.                     .await?;
  21.                 let (serv_sender, serv_receiver) = channel::<OwnedMessage>(10);
  22.                 let (cli_sender, cli_receiver) = channel::<OwnedMessage>(10);
  23.                 option.set_receiver(serv_receiver);
  24.                 self.sender = Some(cli_sender);
  25.                 client.set_callback_ws(Box::new(ClientWsOperate {
  26.                     sender: Some(serv_sender),
  27.                     receiver: Some(cli_receiver),
  28.                 }));
  29.                 tokio::spawn(async move {
  30.                     if let Err(e) = client
  31.                         .wait_ws_operate_with_req(shake.request.unwrap())
  32.                         .await
  33.                     {
  34.                         println!("error = {:?}", e);
  35.                     };
  36.                     println!("client close!!!!!!!!!!");
  37.                 });
  38.             }
  39.             return Ok(Some(option));
  40.         }
  41.         return Err(ProtError::Extension("miss match"));
  42.     }
  43. }
复制代码
在此地方,我们是用负载均衡来做配置location.get_reverse_url(),远程端的域名和ip要映射成本地的ip,所以这边可能要读取负载的ip而不是从dns中解析ip。
获取正确的连接域名和ip地址。
  1. pub fn get_reverse_url(&self) -> ProtResult<(Url, String)> {
  2.     if let Some(addr) = self.get_upstream_addr() {
  3.         if let Some(r) = &self.comm.proxy_url {
  4.             let mut url = r.clone();
  5.             let domain = url.domain.clone().unwrap_or(String::new());
  6.             url.domain = Some(format!("{}", addr.ip()));
  7.             url.port = Some(addr.port());
  8.             Ok((url, domain))
  9.         } else {
  10.             let url = Url::parse(format!("http://{}/", addr).into_bytes())?;
  11.             let domain = format!("{}", addr.ip());
  12.             Ok((url, domain))
  13.         }
  14.     } else {
  15.         Err(ProtError::Extension("error"))
  16.     }
  17. }
复制代码
此处的处理方式与nginx不同,nginx是将所有升级请求的头信息全部删除,再根据配置的过行补充
  1. Upgrade: websocket
  2. Connection: Upgrade
复制代码
所以在nginx中配置支持websocket通常如下配置,也就是通常配置的时候需要查找资料进行copy
  1. proxy_set_header Upgrade $http_upgrade;
  2. proxy_set_header Connection "upgrade";
复制代码
在wmproxy中并不会对客户端的请求做特殊的处理,也就是发了升级远程的websocket服务器接受了升级,我们当前协议就会升级。所以我们在配置中加入了一个字段is_ws,如果升级成websocket但是并不支持websocket的时候直接进行报错,告知不支持协议
  1. [[http.server.location]]
  2. rule = "/ws"
  3. is_ws = true
  4. reverse_proxy = "http://ws"
复制代码
如此在该问该url的时候就可以转websocket了,比如websocat ws://127.0.0.1/ws
tcp转websocket

利用上章讲述的StreamToWs,且利用stream流的转发,将转发类型配置tcp2ws非安全的ws,或者tcp2wss带tls的wss,实现源码在reverse/stream.rs
  1. [[stream.server]]
  2. bind_addr = "0.0.0.0:85"
  3. proxy_url = "ws://127.0.0.1:8081/"
  4. bind_mode = "tcp2ws"
复制代码
这样子,我们就可以将本地监听的85端口的地址,流量转发成8081的websocket远程地址。如果远程端验证域名可以配置上相应的domain = "wmproxy.com"
  1. if s.bind_mode == "tcp2ws" {
  2.     let mut stream_to_ws = StreamToWs::new(inbound, format!("ws://{}", addr))?;
  3.     if domain.is_some() {
  4.         stream_to_ws.set_domain(domain.unwrap());
  5.     }
  6.     let _ = stream_to_ws.copy_bidirectional().await;
  7. }
复制代码
如此我们就可以轻松的获取tcp流量转websocket的能力。
websocket转tcp

利用上章讲述的WsToStream,且利用stream流的转发,将转发类型配置ws2tcp转发为tcp,实现源码在reverse/stream.rs
  1. [[stream.server]]
  2. bind_addr = "0.0.0.0:86"
  3. up_name = "ws1"
  4. proxy_url = "tcp://127.0.0.1:8082"
  5. bind_mode = "ws2tcp"
复制代码
这样子,我们就可以将本地监听的86端口websocket的地址,流量转发成8082的tcp远程地址。如果远程端验证域名可以配置上相应的domain = "wmproxy.com"
  1. if s.bind_mode == "ws2tcp" {
  2.     let mut ws_to_stream = WsToStream::new(inbound, addr)?;
  3.     if domain.is_some() {
  4.         ws_to_stream.set_domain(domain.unwrap());
  5.     }
  6.     let _ = ws_to_stream.copy_bidirectional().await;
  7. }
复制代码
如此我们就可以轻松的获取websocket流量转tcp的能力。
小结

利用wmproxy可以轻松的转化tcp到websocket的流量互转,配置简单。可以利用现成的websocket高速通道辅助我们的tcp程序获取更稳定的流量通道。
点击 [关注][在看][点赞] 是对作者最大的支持

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连密封材料

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表