35. 干货系列从零用Rust编写负载均衡及代理,代理服务器的源码升级改造 ...

打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子
项目地址

国内: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
项目设计目标

在同一个端口上同时支持HTTP/HTTPS/SOCKS5代理,即假设监听8090端口,那么可以设置如下:
  1. curl --proxy socks5://127.0.0.1:8090 http://www.baidu.com
  2. curl --proxy http://127.0.0.1:8090 http://www.baidu.com
  3. curl --proxy http://127.0.0.1:8090 https://www.baidu.com
复制代码
以上方案需要都可以兼容打通,才算成功。
初始方案

不做HTTP服务器,仅简单的解析数据流,然后进行数据转发
  1. pub async fn process<T>(
  2.     username: &Option<String>,
  3.     password: &Option<String>,
  4.     mut inbound: T,
  5. ) -> Result<(), ProxyError<T>>
  6. where
  7.     T: AsyncRead + AsyncWrite + Unpin,
  8. {
  9.     let mut outbound;
  10.     let mut request;
  11.     let mut buffer = BinaryMut::new();
  12.     loop {
  13.         let size = {
  14.             let mut buf = ReadBuf::uninit(buffer.chunk_mut());
  15.             inbound.read_buf(&mut buf).await?;
  16.             buf.filled().len()
  17.         };
  18.         if size == 0 {
  19.             return Err(ProxyError::Extension("empty"));
  20.         }
  21.         unsafe {
  22.             buffer.advance_mut(size);
  23.         }
  24.         request = webparse::Request::new();
  25.         // 通过该方法解析标头是否合法, 若是partial(部分)则继续读数据
  26.         // 若解析失败, 则表示非http协议能处理, 则抛出错误
  27.         // 此处clone为浅拷贝,不确定是否一定能解析成功,不能影响偏移
  28.         match request.parse_buffer(&mut buffer.clone()) {
  29.             Ok(_) => match request.get_connect_url() {
  30.                 Some(host) => {
  31.                     match HealthCheck::connect(&host).await {
  32.                         Ok(v) => outbound = v,
  33.                         Err(e) => {
  34.                             Self::err_server_status(inbound, 503).await?;
  35.                             return Err(ProxyError::from(e));
  36.                         }
  37.                     }
  38.                     break;
  39.                 }
  40.                 None => {
  41.                     if !request.is_partial() {
  42.                         Self::err_server_status(inbound, 503).await?;
  43.                         return Err(ProxyError::UnknownHost);
  44.                     }
  45.                 }
  46.             },
  47.             Err(WebError::Http(HttpError::Partial)) => {
  48.                 continue;
  49.             }
  50.             Err(_) => {
  51.                 return Err(ProxyError::Continue((Some(buffer), inbound)));
  52.             }
  53.         }
  54.     }
  55.     match request.method() {
  56.         &Method::Connect => {
  57.             log::trace!(
  58.                 "https connect {:?}",
  59.                 String::from_utf8_lossy(buffer.chunk())
  60.             );
  61.             inbound.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await?;
  62.         }
  63.         _ => {
  64.             outbound.write_all(buffer.chunk()).await?;
  65.         }
  66.     }
  67.     let _ = copy_bidirectional(&mut inbound, &mut outbound).await?;
  68.     Ok(())
  69. }
复制代码
此方案仅做浅解析,处理相当高效,但遇到如下问题:

  • HTTP/HTTPS代理服务器需要验证密码
  • HTTP服务存在不同的协议,此方法只兼容HTTP/1.1,无法兼容明确的HTTP/2协议
  • 请求的协议头有些得做修改,此方法无法修改
改造方案


  • 引入HTTP服务器介入
  • 但是因为需要兼容不同协议,只有等确定协议后才能引入协议,需要预读数据,进行协议判定。
  • HTTPS代理协议只处理一组Connect协议,之后需要解除http协议进行双向绑定。
完整源码

  • 预读数据


  • Socks5:第一个字节为0X05,非ascii字符,其它协议不会影响
  • Https: https代理必须发送Connect方法,所以必须以CONNECT或者connect开头,且查询其它HTTP方法没有以C开头的,这里仅判断第一个字符为C或者c,该协议仅处理一条http请求不参与后续TLS握手协议等保证数据安全
  • 其它开头的均被认为http代理
  1. let mut buffer = BinaryMut::with_capacity(24);
  2. let size = {
  3.     let mut buf = ReadBuf::uninit(buffer.chunk_mut());
  4.     inbound.read_buf(&mut buf).await?;
  5.     buf.filled().len()
  6. };
  7. if size == 0 {
  8.     return Err(ProxyError::Extension("empty"));
  9. }
  10. unsafe {
  11.     buffer.advance_mut(size);
  12. }
  13. // socks5 协议, 直接返回, 交给socks5层处理
  14. if buffer.as_slice()[0] == 5 {
  15.     return Err(ProxyError::Continue((Some(buffer), inbound)));
  16. }
  17. let mut max_req_num = usize::MAX;
  18. // https 协议, 以connect开头, 仅处理一条HTTP请求
  19. if buffer.as_slice()[0] == b'C' || buffer.as_slice()[0] == b'c' {
  20.     max_req_num = 1;
  21. }
复制代码

  • 构建HTTP服务器,构建服务类:
  1. /// http代理类处理类
  2. struct Operate {
  3.     /// 用户名
  4.     username: Option<String>,
  5.     /// 密码
  6.     password: Option<String>,
  7.     /// Stream类, https连接后给后续https使用
  8.     stream: Option<TcpStream>,
  9.     /// http代理keep-alive的复用
  10.     sender: Option<Sender<RecvRequest>>,
  11.     /// http代理keep-alive的复用
  12.     receiver: Option<Receiver<ProtResult<RecvResponse>>>,
  13. }
复制代码
构建HTTP服务
  1. // 需要将已读的数据buffer重新加到server的已读cache中, 否则解析会出错
  2. let mut server = Server::new_by_cache(inbound, None, buffer);
  3. // 构建HTTP服务回调
  4. let mut operate = Operate {
  5.     username: username.clone(),
  6.     password: password.clone(),
  7.     stream: None,
  8.     sender: None,
  9.     receiver: None,
  10. };
  11. server.set_max_req(max_req_num);
  12. let _e = server.incoming(&mut operate).await?;
  13. if let Some(outbound) = &mut operate.stream {
  14.     let mut inbound = server.into_io();
  15.     let _ = copy_bidirectional(&mut inbound, outbound).await?;
  16. }
复制代码
此时我们已将数据用HTTP服务进行处理,收到相应的请求再进行给远端做转发:
HTTP核心处理回调,此处我们用的是async_trait异步回调
  1. #[async_trait]
  2. impl OperateTrait for &mut Operate {
  3.     async fn operate(&mut self, request: &mut RecvRequest) -> ProtResult<RecvResponse> {
  4.         // 已连接直接进行后续处理
  5.         if let Some(sender) = &self.sender {
  6.             sender.send(request.replace_clone(Body::empty())).await?;
  7.             if let Some(res) = self.receiver.as_mut().unwrap().recv().await {
  8.                 return Ok(res?)
  9.             }
  10.             return Err(ProtError::Extension("already close by other"))
  11.         }
  12.         // 获取要连接的对象
  13.         let stream = if let Some(host) = request.get_connect_url() {
  14.             match HealthCheck::connect(&host).await {
  15.                 Ok(v) => v,
  16.                 Err(e) => {
  17.                     return Err(ProtError::from(e));
  18.                 }
  19.             }
  20.         } else {
  21.             return Err(ProtError::Extension("unknow tcp stream"));
  22.         };
  23.         // 账号密码存在,将获取`Proxy-Authorization`进行校验,如果检验错误返回407协议
  24.         if self.username.is_some() && self.password.is_some() {
  25.             let mut is_auth = false;
  26.             if let Some(auth) = request.headers_mut().remove(&"Proxy-Authorization") {
  27.                 if let Some(val) = auth.as_string() {
  28.                     is_auth = self.check_basic_auth(&val);
  29.                 }
  30.             }
  31.             if !is_auth {
  32.                 return Ok(Response::builder().status(407).body("")?.into_type());
  33.             }
  34.         }
  35.         // 判断用户协议
  36.         match request.method() {
  37.             &Method::Connect => {
  38.                 // https返回200内容直接进行远端和客户端的双向绑定
  39.                 self.stream = Some(stream);
  40.                 return Ok(Response::builder().status(200).body("")?.into_type());
  41.             }
  42.             _ => {
  43.                 // http协议,需要将客户端的内容转发到服务端,并将服务端数据转回客户端
  44.                 let client = Client::new(ClientOption::default(), MaybeHttpsStream::Http(stream));
  45.                 let (mut recv, sender) = client.send2(request.replace_clone(Body::empty())).await?;
  46.                 match recv.recv().await {
  47.                     Some(res) => {
  48.                         self.sender = Some(sender);
  49.                         self.receiver = Some(recv);
  50.                         return Ok(res?)
  51.                     },
  52.                     None => return Err(ProtError::Extension("already close by other")),
  53.                 }
  54.             }
  55.         }
  56.     }
  57. }
复制代码
密码校验,由Basic的密码加密方法,先用base64解密,再用:做拆分,再与用户密码比较
  1. pub fn check_basic_auth(&self, value: &str) -> bool
  2. {
  3.     use base64::engine::general_purpose;
  4.     use std::io::Read;
  5.     let vals: Vec<&str> = value.split_whitespace().collect();
  6.     if vals.len() == 1 {
  7.         return false;
  8.     }
  9.     let mut wrapped_reader = Cursor::new(vals[1].as_bytes());
  10.     let mut decoder = base64::read::DecoderReader::new(
  11.         &mut wrapped_reader,
  12.         &general_purpose::STANDARD);
  13.     // handle errors as you normally would
  14.     let mut result: Vec<u8> = Vec::new();
  15.     decoder.read_to_end(&mut result).unwrap();
  16.     if let Ok(value) = String::from_utf8(result) {
  17.         let up: Vec<&str> = value.split(":").collect();
  18.         if up.len() != 2 {
  19.             return false;
  20.         }
  21.         if up[0] == self.username.as_ref().unwrap() ||
  22.             up[1] == self.password.as_ref().unwrap() {
  23.             return true;
  24.         }
  25.     }
  26.     return false;
  27. }
复制代码
小结

代理在计算机网络很常见,比如服务器群组内部通常只会开一个口进行对外访问,就可以通过内网代理来进行处理,从而更好的保护内网服务器。代理让我们网络更安全,但是警惕非正规的代理可能会窃取您的数据。请用HTTPS内容访问更安全。
点击 [关注][在看][点赞] 是对作者最大的支持

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

科技颠覆者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表