ToB企服应用市场:ToB评测及商务社交产业平台

标题: 聊一聊 C#线程池 的线程动态注入 (上) [打印本页]

作者: 小秦哥    时间: 3 天前
标题: 聊一聊 C#线程池 的线程动态注入 (上)
一:配景

1. 讲故事

在线程饥饿的场景中,我们首先要相识的就是线程是怎样动态注入的?着实现如今的ThreadPool内部的实现逻辑非常复杂,而且随着版本的迭代内部逻辑也在不断的变化,有时间也没必要详细的去相识,只需在稍微宏观的角度去明确一下即可,我准备用三篇来详细的聊一聊线程注入的流程走向来作为线程饥饿的铺垫系列,这篇我们先从 Thread.Sleep 的角度观察线程的动态注入。
二:Sleep 角度下的动态注入

1. 测试代码

为了方便研究,我们用 Thread.Sleep 的方式阻塞线程池线程,然后观察线程的注入速度,参考代码如下:
  1.         static void Main(string[] args)
  2.         {
  3.             for (int i = 0; i < 10000; i++)
  4.             {
  5.                 ThreadPool.QueueUserWorkItem((idx) =>
  6.                 {
  7.                     Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")} -> {idx}: 这是耗时任务");
  8.                     Thread.Sleep(int.MaxValue);
  9.                 }, i);
  10.             }
  11.             Console.ReadLine();
  12.         }
复制代码

仔细观察卦中的输出,除了初始的12个线程喷涌而出,后面你会发现它的线程动态注入有时间大概是 500ms 一次,有时间会是 1000ms 一次,所以我们可以得到一个简单的结论:Thread.Sleep 场景下1s 大概会动态注入1~2个线程。
有了这个结论之后,接下来我们探究下它的底层逻辑在哪?
2. 底层代码逻辑在哪

千言万语不及一张图,截图如下:

接下来我们来聊一下卦中的各个元素吧。
在 PortableThreadPool 中有一个 GateThread 类,专门掌管着线程的动态注入,默认环境下它大概是 500ms 被叫醒一次。这个是有许多逻辑源码支撑的。
  1.     private static class GateThread
  2.     {
  3.         public const uint GateActivitiesPeriodMs = 500;
  4.         private static void GateThreadStart()
  5.         {
  6.             while (true)
  7.             {
  8.                 bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs));
  9.                 ...
  10.             }
  11.         }
  12.         public uint GetNextDelay(int currentTimeMs)
  13.         {
  14.             uint elapsedMsSincePreviousGateActivities = (uint)(currentTimeMs - _previousGateActivitiesTimeMs);
  15.             uint nextDelayForGateActivities =
  16.                 elapsedMsSincePreviousGateActivities < GateActivitiesPeriodMs
  17.                     ? GateActivitiesPeriodMs - elapsedMsSincePreviousGateActivities
  18.                     : 1;
  19.             ...
  20.         }
  21.     }
复制代码

这个方法是用来判断任务最后一次出队的时间,即内部的lastDequeueTime 字段,这也是为什么有时间是1个周期(500ms),有时间是2个周期的底层缘故原由,假如在一个周期内判断lastDequeueTime(490ms) 0 &&                SufficientDelaySinceLastDequeue(threadPoolInstance))            {                bool addWorker = false;                              if (addWorker)                {                    WorkerThread.MaybeAddWorkingWorker(threadPoolInstance);                }            }        }        private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance)        {            uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime);            uint minimumDelay;            if (threadPoolInstance._cpuUtilization < CpuUtilizationLow)            {                minimumDelay = GateActivitiesPeriodMs;            }            else            {                minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;            }            return delay > minimumDelay;        }        private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)        {            bool alreadyRemovedWorkingWorker = false;            while (TakeActiveRequest(threadPoolInstance))            {                threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;                if (!ThreadPoolWorkQueue.Dispatch())                {                }            }        }[/code]这个方法是用来创建线程的主体逻辑,在线程池中由上层的 MaybeAddWorkingWorker 调用,参考如下:
  1.         private static void GateThreadStart()
  2.         {
  3.             if (!disableStarvationDetection &&
  4.                 threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None &&
  5.                 threadPoolInstance._separated.numRequestedWorkers > 0 &&
  6.                 SufficientDelaySinceLastDequeue(threadPoolInstance))
  7.             {
  8.                 bool addWorker = false;
  9.               
  10.                 if (addWorker)
  11.                 {
  12.                     WorkerThread.MaybeAddWorkingWorker(threadPoolInstance);
  13.                 }
  14.             }
  15.         }
  16.         private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance)
  17.         {
  18.             uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime);
  19.             uint minimumDelay;
  20.             if (threadPoolInstance._cpuUtilization < CpuUtilizationLow)
  21.             {
  22.                 minimumDelay = GateActivitiesPeriodMs;
  23.             }
  24.             else
  25.             {
  26.                 minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;
  27.             }
  28.             return delay > minimumDelay;
  29.         }
  30.         private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)
  31.         {
  32.             bool alreadyRemovedWorkingWorker = false;
  33.             while (TakeActiveRequest(threadPoolInstance))
  34.             {
  35.                 threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
  36.                 if (!ThreadPoolWorkQueue.Dispatch())
  37.                 {
  38.                 }
  39.             }
  40.         }
复制代码
这里有一个留意点:上面的  while (toCreate > 0) 代码预示着一个周期内(500ms)可能会一连创建多个工作线程,但在饥饿的大多数环境下都是toCreate=1的环境。
3.怎样眼见为实

说了这么多,能不能用一些手段让我眼见为实呢?要想眼见为实也不难,可以用 dnspy 断点日志功能观察即可,分别在如下三个方法上下断点。
在此处下断点的目标用于观察 GateThread 的叫醒周期时间,截图如下:

这里下断点主要是观察当前的延迟假如超过 500ms 时是否真的会通过 CreateWorkerThread 创建线程。截图如下:

最后我们在 MaybeAddWorkingWorker 方法的底层的线程创建方法 CreateWorkerThread 中下一个断点。

所有的埋点下好之后,我们让步伐跑起来,观察 output 窗口的输出。

从输出窗口中可以清晰的看到以500ms为界限判断啥时该创建,啥时不该创建。
三:总结

可能有些朋侪很感慨,线程的动态注入咋怎么慢?1s才1-2个,难怪会出现线程饥饿。。。哈哈,下一篇我们聊一聊Task.Result下的注入优化。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4