ToB企服应用市场:ToB评测及商务社交产业平台

标题: 18. 从零开始编写一个类nginx工具, 主动式健康检查源码实现 [打印本页]

作者: 来自云龙湖轮廓分明的月亮    时间: 2023-11-17 10:40
标题: 18. 从零开始编写一个类nginx工具, 主动式健康检查源码实现
wmproxy

wmproxy将用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,后续将实现websocket代理, 内外网穿透等, 会将实现过程分享出来, 感兴趣的可以一起造个轮子法
项目地址

gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
为什么我们需要主动

  主动可以让我们掌握好系统的稳定性,假设我们有一条连接不可达,连接超时的判定是5秒,需要检测失败3次才认定为失败,那么此时从我们开始检测,到判定失败需要耗时15秒。
  如果此时我们是个高并发的系统,每秒的QPS是1000,我们有三个地址判定,那么此时我们有1/3的失败概率。那么在15秒内,我们会收到15000个请求,会造成5000个请求失败,如果是重要的数据,我们会丢失很多重要数据。
  如果此时客户端拥有重试机制,那么客户端在失败的时候会发起重试,而且系统可能会反复的分配到那台不可达的系统,将会造成短时间内请求数激增,可能引发系统的雪崩。
  所以此时我们主动知道目标端的系统稳定性极其重要。
网络访问示意图

以下是没有主动健康检查
sequenceDiagram    participant 客户端    participant 代理服务器    客户端->>代理服务器: 请求数据(0.5s)    代理服务器->>后端1: 连接并请求数据(5s)失败    Note right of 后端1: 机器宕机不可达        代理服务器-->>客户端: 返回失败0.5s(总耗时6s)    客户端->>代理服务器: 重新请求数据(0.5s)    代理服务器->>后端2: 请求数据成功(0.2s)    后端2-->>代理服务器: 返回数据成功(0.2s)    代理服务器-->> 客户端: 返回数据成功0.5s(总耗时1.4s)如果出错的时候,一个请求的平均时长可能会达到(1.4s + 5s) / 2 = (3.2s),比正常访问多了(3.2 - 1.4) = 1.8s,节点的宕机会对系统的稳定性产生较大的影响
以下是主动健康检查,它保证了访问后端服务器组均是正常的状态
sequenceDiagram    客户端->>代理服务器: 请求数据(0.5s)    loop 健康检查        代理服务器->>服务器组(只访问1): 定时请求,保证存活,1检查成功,2检查失败    end    Note right of 服务器组(只访问1): 处理客户端数据    代理服务器 -->> 服务器组(只访问1): 请求数据(0.2s)    服务器组(只访问1) -->> 代理服务器: 返回数据成功(0.2s)    代理服务器-->>客户端: 返回数据成功(0.5s)(总耗时1.4s)服务器2出错的时候,主动检查已经检查出服务器2不可用,负载均衡的时候选择已经把服务器2摘除,所以系统的平均耗时1.4s,系统依然保持稳定
健康检查的种类

在目前的系统中有以下两分类:
健康检查的准备

我们需要从配置中读出所有的需要健康检查的类型,即需要去重,把同一个指向的地址过滤掉
配置有可能被重新加载,所以我们需要预留发送配置的方式(或者后续类似nginx用新开进程的方式则不需要),此处做一个预留。

部分实现源码

主要源码定义在check/active.rs中,主要的定义两个类
  1. /// 单项健康检查
  2. #[derive(Debug, Clone)]
  3. pub struct OneHealth {
  4.     /// 主动检查地址
  5.     pub addr: SocketAddr,
  6.     /// 主动检查方法, 有http/https/tcp等
  7.     pub method: String,
  8.     /// 每次检查间隔
  9.     pub interval: Duration,
  10.     /// 最后一次记录时间
  11.     pub last_record: Instant,
  12. }
  13. /// 主动式健康检查
  14. pub struct ActiveHealth {
  15.     /// 所有的健康列表
  16.     pub healths: Vec<OneHealth>,
  17.     /// 接收健康列表,当配置变更时重新载入
  18.     pub receiver: Receiver<Vec<OneHealth>>,
  19. }
复制代码
我们在配置的时候获取所有需要主动检查的数据
  1. /// 获取所有待健康检查的列表
  2. pub fn get_health_check(&self) -> Vec<OneHealth> {
  3.     let mut result = vec![];
  4.     let mut already: HashSet<SocketAddr> = HashSet::new();
  5.     if let Some(proxy) = &self.proxy {
  6.         // ...
  7.     }
  8.     if let Some(http) = &self.http {
  9.         // ...
  10.     }
  11.     result
  12. }
复制代码
主要的检查源码,所有的最终信息都落在HealthCheck中的静态变量里:
  1. pub async fn do_check(&self) -> ProxyResult<()> {
  2.     // 防止短时间内健康检查的连接过多, 做一定的超时处理, 或者等上一条消息处理完毕
  3.     if !HealthCheck::check_can_request(&self.addr, self.interval) {
  4.         return Ok(())
  5.     }
  6.     if self.method.eq_ignore_ascii_case("http") {
  7.         match tokio::time::timeout(self.interval + Duration::from_secs(1), self.connect_http()).await {
  8.             Ok(r) => match r {
  9.                 Ok(r) => {
  10.                     if r.status().is_server_error() {
  11.                         log::trace!("主动健康检查:HTTP:{}, 返回失败:{}", self.addr, r.status());
  12.                         HealthCheck::add_fall_down(self.addr);
  13.                     } else {
  14.                         HealthCheck::add_rise_up(self.addr);
  15.                     }
  16.                 }
  17.                 Err(e) => {
  18.                     log::trace!("主动健康检查:HTTP:{}, 发生错误:{:?}", self.addr, e);
  19.                     HealthCheck::add_fall_down(self.addr);
  20.                 }
  21.             },
  22.             Err(e) => {
  23.                 log::trace!("主动健康检查:HTTP:{}, 发生超时:{:?}", self.addr, e);
  24.                 HealthCheck::add_fall_down(self.addr);
  25.             },
  26.         }
  27.     } else {
  28.         match tokio::time::timeout(Duration::from_secs(3), self.connect_http()).await {
  29.             Ok(r) => {
  30.                 match r {
  31.                     Ok(_) => {
  32.                         HealthCheck::add_rise_up(self.addr);
  33.                     }
  34.                     Err(e) => {
  35.                         log::trace!("主动健康检查:TCP:{}, 发生错误:{:?}", self.addr, e);
  36.                         HealthCheck::add_fall_down(self.addr);
  37.                     }
  38.                 }
  39.             }
  40.             Err(e) => {
  41.                 log::trace!("主动健康检查:TCP:{}, 发生超时:{:?}", self.addr, e);
  42.                 HealthCheck::add_fall_down(self.addr);
  43.             }
  44.         }
  45.     }
  46.     Ok(())
  47. }
复制代码
结语

主动检查可以及时的更早的发现系统中不稳定的因素,是系统稳定性的基石,也可以通过更早的发现因素来通知运维介入,我们的目的是使系统更稳定,更健壮,处理延时更少。
点击 [关注][在看][点赞] 是对作者最大的支持

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4