聊一聊 C#线程池 的线程动态注入 (中)

海哥  金牌会员 | 2024-12-24 16:00:33 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 876|帖子 876|积分 2628

一:背景

1. 讲故事

上一篇我们用 Thread.Sleep 的方式演示了线程池饥饿场景下的动态线程注入,可以观察到大概 1s 产生 1~2 个新线程,很显然如许的增长速率扛不住上游请求对线程池的DDOS攻击,导致线程池队列越来越大,但C#团队这么优秀,能优化的地方绝对会给各人尽可能的优化,比如这篇我们聊到的 Task.Result 场景下的注入。
二:Task.Result 角度下的动态注入

1. 测试代码

为了直观的体会到优化效果,先上一段测试代码观察一下。
  1.         static void Main(string[] args)
  2.         {
  3.             for (int i = 0; i < 10000; i++)
  4.             {
  5.                 ThreadPool.QueueUserWorkItem((idx) =>
  6.                 {
  7.                     Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")} -> {idx}: 这是耗时任务");
  8.                     try
  9.                     {
  10.                         var client = new HttpClient();
  11.                         var content = client.GetStringAsync("https://youtube.com").Result;
  12.                         Console.WriteLine(content.Length);
  13.                     }
  14.                     catch (Exception ex)
  15.                     {
  16.                         Console.WriteLine(ex.Message);
  17.                     }
  18.                 }, i);
  19.             }
  20.             Console.ReadLine();
  21.         }
复制代码

从卦象上来看大概1s产生4个新线程,再细致看的话大概是250ms一个,固然250不大好听,但不管怎么说确实比 Thread.Sleep 场景下只产生 1~2 个线程要快了好几倍,以终为始,我们再反向的看下这个优化的底层逻辑在哪?
2. 底层逻辑在哪里

还是那句话,千言万语不抵一张图,流程图大概如下:

接下来解释下其中的几个元素。

  • NotifyThreadBlocked
这是主动关照 GateThread 线程赶紧醒来,通过上一篇的知识各人应该知道 GateThread 会500ms一次被动唤醒,但为了提速不可能再这么干了,需要让人强制唤醒它,修剪后的源码如下:
  1.     private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken cancellationToken)
  2.     {
  3.         var mres = new SetOnInvokeMres();
  4.         AddCompletionAction(mres, addBeforeOthers: true);
  5.         bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();
  6.         var returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);
  7.         return returnValue;
  8.     }
  9.     public bool NotifyThreadBlocked()
  10.     {
  11.         GateThread.Wake(this);
  12.         return true;
  13.     }
  14.     public static void Wake(PortableThreadPool threadPoolInstance)
  15.     {
  16.         DelayEvent.Set();
  17.     }
复制代码
卦中的 DelayEvent.Set(); 正是强制唤醒 GateThread 的 event 事件。

  • HasBlockingAdjustmentDelayElapsed
GateThread 是注入线程的官方通道,那到底要不要注入线程呢?肯定少不了一些判定,其中一个判定就是当前的延迟周期是否超过了 250ms,这个250ms的阈值终极由 BlockingConfig.MaxDelayMs 变量指定,这是能否调用 CreateWorkerThread方法需要闯的一个关口,参考代码如下:
  1.         private static class BlockingConfig
  2.         {
  3.             MaxDelayMs =(uint) AppContextConfigHelper.GetInt32Config(
  4.                         "System.Threading.ThreadPool.Blocking.MaxDelayMs",
  5.                             250,
  6.                             false);
  7.         }
  8.         private static void GateThreadStart()
  9.         {
  10.             while (true)
  11.             {
  12.                 bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs));
  13.                 currentTimeMs = Environment.TickCount;
  14.                 do
  15.                 {
  16.                     previousDelayElapsed = delayHelper.HasBlockingAdjustmentDelayElapsed(currentTimeMs, wasSignaledToWake);
  17.                     if (pendingBlockingAdjustment == PendingBlockingAdjustment.WithDelayIfNecessary && !previousDelayElapsed)
  18.                     {
  19.                         break;
  20.                     }
  21.                     uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);
  22.                 } while (false);
  23.             }
  24.         }
  25.         public bool HasBlockingAdjustmentDelayElapsed(int currentTimeMs, bool wasSignaledToWake)
  26.         {
  27.             if (!wasSignaledToWake && _adjustForBlockingAfterNextDelay)
  28.             {
  29.                 return true;
  30.             }
  31.             uint elapsedMsSincePreviousBlockingAdjustmentDelay = (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs);
  32.             return elapsedMsSincePreviousBlockingAdjustmentDelay >= _previousBlockingAdjustmentDelayMs;
  33.         }
复制代码
从上面的代码可以看到一旦 previousDelayElapsed =false 就直接 break 了,不再调用PerformBlockingAdjustment 方法来闯第二个关口。

  • PerformBlockingAdjustment
一旦满足了250ms阈值之后,接下来就需要观察ThreadPool当前的负载能力,由内部的 ThreadCounts 提供支持,比如 NumProcessingWork 表现当前线程池正在处置惩罚的任务数, NumThreadsGoal 表现线程不要超过此上限值,如果超过了就进入动态注入阶段,参考代码如下:
  1.     private struct ThreadCounts
  2.     {
  3.         public short NumProcessingWork;
  4.         public short NumExistingThreads;
  5.         public short NumThreadsGoal;
  6.     }
复制代码
有了这个基础之后,接下来再上一段注入线程需要满足的第二个关口。
  1.         private static void GateThreadStart()
  2.         {
  3.             uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);
  4.         }
  5.         private uint PerformBlockingAdjustment(bool previousDelayElapsed)
  6.         {
  7.             var nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker);
  8.             if (addWorker)
  9.             {
  10.                 WorkerThread.MaybeAddWorkingWorker(this);
  11.             }
  12.             return nextDelayMs;
  13.         }
  14.         private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker)
  15.         {
  16.             if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0)
  17.             {
  18.                 addWorker = true;
  19.             }
  20.         }
复制代码
从卦中代码可以看到,一旦线程池中 处置惩罚的任务数 >= 线程上限值,这就表现当前线程池正在满负荷的跑,numRequestedWorkers>0 表现有新任务来了需要线程来处置惩罚,所以这两组条件一旦满足,就必须要创建新线程。
3. 怎样眼见为实

刚才啰嗦了那么多,那怎样眼见为实呢?非常简单,还是用 dnspy 的断点日志功能观察,我们下三个断点。

  • 第一个条件 HasBlockingAdjustmentDelayElapsed 处增长 1.  {!wasSignaledToWake}  {this._adjustForBlockingAfterNextDelay}, 延迟时间:{currentTimeMs - this._previousBlockingAdjustmentDelayStartTimeMs} ,上一次延迟:{_previousBlockingAdjustmentDelayMs}。


  • 第二个条件 PerformBlockingAdjustment 处增长 2. 正在处置惩罚任务数:{threadCounts.NumProcessingWork} ,符合线程数:{num},是否要新增线程:{this._separated.numRequestedWorkers>0} 。


  • 线程创建 WorkerThread.CreateWorkerThread 处增长 3.  已成功创建线程  。

末了把步伐跑起来,观察 output窗口 的结果,非常清爽,吉卦。

三:总结

采用主动关照的方式唤醒GateThread可以让每秒线程注入数由原来的 1~2 个提拔到 4 个,固然有所优化,但面对上游洪水猛兽般的请求,很显然也是杯水车薪,终极还是酿成了线程饥饿的悲剧,下一篇我们继续研究怎样让线程注入的快一点,再快一点。。。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

海哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表