ToB企服应用市场:ToB评测及商务社交产业平台

标题: 19. 从零开始编写一个类nginx工具, 配置数据的热更新原理及实现 [打印本页]

作者: 花瓣小跑    时间: 2023-11-22 02:30
标题: 19. 从零开始编写一个类nginx工具, 配置数据的热更新原理及实现
wmproxy

wmproxy是由Rust编写,已实现http/https代理,socks5代理, 反向代理,静态文件服务器,内网穿透,配置热更新等, 后续将实现websocket代理等,同时会将实现过程分享出来, 感兴趣的可以一起造个轮子法
项目地址

gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
配置数据

数据通常配置在配置文件中,如果需要变更配置,我们通常将配置文件进行更新,并通知程序重新加载配置以便生效。
nginx的变更方式

在nginx中,我们通常用
  1. nginx -s reload
复制代码
进行数据的安全无缝的重载。在nginx中,是多进程的模式,也就是在nginx -s reload信号发出后master进程通知之前的work进程停止接收新的流,也就是accpet暂停,但是会服务完当前的数据请求,并同时会启用新的work进程来接受新的请求

缺点:nginx只能整体的配置做全部重置,且无法查看当前的配置(除非看配置文件,配置可能被重新修改过和内存中的值可能不匹配)
当前选取的方式

当前选择的是用HTTP请求的方式,也就是对本地的端口进行监听(http://127.0.0.1:8837),对本地端口监听也不会造成对外暴露端口带来的安全问题,这样子可以高度的自定义。具有比较高的活跃性,也可以实时查询内存中的数据。
例如访问:
功能实现的原理

所以这里涉及一个分平台的编码,我们在此使用的是,这和C/C++中的#ifdef WINDOWS类似,但是只能在函数级的做调整,所以此处额外在封装了两个函数来做调用。
  1. /// 非windows平台
  2. #[cfg(not(target_os = "windows"))]
  3. fn set_reuse_port(socket: &Socket, reuse: bool) -> io::Result<()> {
  4.     socket.set_reuse_port(true)?;
  5.     Ok(())
  6. }
  7. /// windows平台,空实现
  8. #[cfg(target_os = "windows")]
  9. fn set_reuse_port(_socket: &Socket, _sreuse: bool) -> io::Result<()> {
  10.     Ok(())
  11. }
复制代码
然后将原来的TcpListener::bind(addr)函数改成Helper::bind即可无缝切换到支持端口复用的功能,针对代理端及反向代理端:
  1. /// 可端口复用的绑定方式,该端口可能被多个进程同时使用
  2. pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
  3.     let addrs = addr.to_socket_addrs()?;
  4.     let mut last_err = None;
  5.     for addr in addrs {
  6.         let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
  7.         socket.set_nonblocking(true)?;
  8.         let _ = socket.set_only_v6(false);
  9.         socket.set_reuse_address(true)?;
  10.         Self::set_reuse_port(&socket, true)?;
  11.         socket.bind(&addr.into())?;
  12.         match socket.listen(128) {
  13.             Ok(_) => {
  14.                 let listener: std::net::TcpListener = socket.into();
  15.                 return TcpListener::from_std(listener);
  16.             }
  17.             Err(e) => {
  18.                 log::info!("绑定端口地址失败,原因: {:?}", addr);
  19.                 last_err = Some(e);
  20.             }
  21.         }
  22.     }
  23.     Err(last_err.unwrap_or_else(|| {
  24.         io::Error::new(
  25.             io::ErrorKind::InvalidInput,
  26.             "could not resolve to any address",
  27.         )
  28.     }))
  29. }
复制代码
测试功能

测试配置加载reload,一开始我们绑定81的端口

进程启动后改为绑定82的端口,然后调用reload(curl.exe http://127.0.0.1:8837/reload)

此时,再调用stop(curl.exe http://127.0.0.1:8837/stop),正确的预期应该显示关闭,且82端口不可再访问

符合功能预期,初步测试完毕
相关源码实现

以下是启动及发送重载配置的流程示意图
flowchart TD    A[加载配置]    B[绑定端口]    C[控制端]    D[服务1]    E[服务2]    F[控制窗户端]        A -->|加载数据后绑定| B    B -->|"(1)绑定端口后启动"| C    B -->|"(1)异步的方式启动"| D    F -->|发送重载入命令| C    C -->|"(3)发送关闭服务命令"| D    C -->|"(2)启动新的服务后关闭原服务"| E以下是中控的定义,消息的通知主要通过Sender/Receiver来进行数据的通知。
  1. /// 控制端,可以对配置进行热更新
  2. pub struct ControlServer {
  3.     /// 控制端当前的配置文件,如果部分修改将直接修改数据进行重启
  4.     option: ConfigOption,
  5.     /// 通知服务进行关闭的Sender,服务相关如果收到该消息则停止Accept
  6.     server_sender_close: Option<Sender<()>>,
  7.     /// 通知中心服务的Sender,每个服务拥有一个该Sender,可反向通知中控关闭
  8.     control_sender_close: Sender<()>,
  9.     /// 通知中心服务的Receiver,收到一次则将当前的引用计数-1,如果为0则表示需要关闭服务器
  10.     control_receiver_close: Option<Receiver<()>>,
  11.     /// 服务的引用计数
  12.     count: i32,
  13. }
复制代码
启动控制终端,接收HTTP的指令和关闭的指令,此时control已经变成了Arc,方便在各各线程间传播,同步修改数据。
  1. pub async fn start_control(control: Arc<Mutex<ControlServer>>) -> ProxyResult<()> {
  2.     let listener = {
  3.         let value = &control.lock().await.option;
  4.         TcpListener::bind(format!("127.0.0.1:{}", value.control)).await?
  5.     };
  6.     loop {
  7.         let mut receiver = {
  8.             let value = &mut control.lock().await;
  9.             value.control_receiver_close.take()
  10.         };
  11.         
  12.         tokio::select! {
  13.             Ok((conn, addr)) = listener.accept() => {
  14.                 let cc = control.clone();
  15.                 tokio::spawn(async move {
  16.                     let mut server = Server::new_data(conn, Some(addr), cc);
  17.                     if let Err(e) = server.incoming(Self::operate).await {
  18.                         log::info!("反向代理:处理信息时发生错误:{:?}", e);
  19.                     }
  20.                 });
  21.                 let value = &mut control.lock().await;
  22.                 value.control_receiver_close = receiver;
  23.             }
  24.             _ = Self::receiver_await(&mut receiver) => {
  25.                 let value = &mut control.lock().await;
  26.                 value.count -= 1;
  27.                 log::info!("反向代理:控制端收到关闭信号,当前:{}", value.count);
  28.                 if value.count <= 0 {
  29.                     break;
  30.                 }
  31.                 value.control_receiver_close = receiver;
  32.             }
  33.         }
  34.     }
  35.     Ok(())
  36. }
复制代码
结语

此时以不同于nginx的另一种配置的加载已经开发完毕,配置的热加载可以让您更从容的保护好您的系统。
点击 [关注][在看][点赞] 是对作者最大的支持

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4