C# 简单实现线程池

立山  金牌会员 | 2022-11-21 20:44:44 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 645|帖子 645|积分 1935

NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。
  1. namespace CustomThreadPool;
  2. /// <summary>
  3. /// 线程池类
  4. /// </summary>
  5. public class ThreadPoolExecutor
  6. {
  7.     /// <summary>
  8.     /// 核心线程的任务队列
  9.     /// </summary>
  10.     private readonly Queue<WorkTask> tasks = new Queue<WorkTask>();
  11.     /// <summary>
  12.     /// 最大核心线程数
  13.     /// </summary>
  14.     private int coreThreadCount;
  15.     /// <summary>
  16.     /// 最大非核心线程数
  17.     /// </summary>
  18.     private int noneCoreThreadCount;
  19.     /// <summary>
  20.     /// 当前运行的核心线程的数量
  21.     /// </summary>
  22.     private int runCoreThreadCount;
  23.     /// <summary>
  24.     /// 当前运行的非核心线程的数量
  25.     /// </summary>
  26.     private int runNoneCoreThreadCount;
  27.     /// <summary>
  28.     /// 核心线程队列的最大数
  29.     /// </summary>
  30.     private int maxQueueCount;
  31.     /// <summary>
  32.     /// 当核心线程空闲时最大活跃时间
  33.     /// </summary>
  34.     private int keepAliveTimeout;
  35.     /// <summary>
  36.     /// 设置是否为后台线程
  37.     /// </summary>
  38.     private bool isBackground;
  39.     private ThreadPoolExecutor() { }
  40.     /// <summary>
  41.     ///
  42.     /// </summary>
  43.     /// <param name="CoreThreadCount">核心线程数</param>
  44.     /// <param name="TotalThreadCount">总线程数</param>
  45.     /// <param name="IsBackground">是否为后台线程</param>
  46.     /// <param name="QueueCount">核心队列的最大数</param>
  47.     /// <param name="KeepAliveTimeout">当核心线程空闲时最大活跃时间</param>
  48.     /// <exception cref="ArgumentOutOfRangeException"></exception>
  49.     /// <exception cref="ArgumentException"></exception>
  50.     public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = true, int QueueCount = 200, int KeepAliveTimeout = 0)
  51.     {
  52.         if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null);
  53.         if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}");
  54.         if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null);
  55.         if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null);
  56.         coreThreadCount = CoreThreadCount;
  57.         noneCoreThreadCount = TotalThreadCount - CoreThreadCount;
  58.         keepAliveTimeout = KeepAliveTimeout;
  59.         maxQueueCount = QueueCount;
  60.         isBackground = IsBackground;
  61.     }
  62.     /// <summary>
  63.     /// 执行任务
  64.     /// </summary>
  65.     /// <param name="task">一个自定义任务</param>
  66.     /// <exception cref="ArgumentNullException">任务为null时,抛出该错误</exception>
  67.     /// <exception cref="NotSupportedException">当核心任务队列已满且非核心线程最大数为0时抛出该错误</exception>
  68.     public void QueueTask(WorkTask task)
  69.     {
  70.         if (task == null) throw new ArgumentNullException(nameof(task));
  71.         lock (tasks)
  72.         {
  73.             tasks.Enqueue(task);
  74.             if (tasks.Count <= maxQueueCount)
  75.             {
  76.                 if (runCoreThreadCount < coreThreadCount)
  77.                 {
  78.                     ++runCoreThreadCount;
  79.                     Run(true);
  80.                 }
  81.             }
  82.             else
  83.             {
  84.                 if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount)
  85.                 {
  86.                     ++runNoneCoreThreadCount;
  87.                     Run(false);
  88.                 }
  89.             }
  90.         }
  91.     }
  92.     private void Run(bool isCore)
  93.     {
  94.         Tuple<int, bool> state = new(keepAliveTimeout, isCore);
  95.         Thread thread = new(t => Excute(t))
  96.         {
  97.             Name = Guid.NewGuid().ToString("D"),
  98.             IsBackground = isBackground
  99.         };
  100.         thread.Start(state);
  101.     }
  102.     private void Excute(object? state)
  103.     {
  104.         if (state == null) return;
  105.         var parameter = (Tuple<int, bool>)state;
  106.         bool first = true;
  107.         DateTime firstTime = DateTime.Now;
  108.         while (true)
  109.         {
  110.             WorkTask? item = null;
  111.             lock (tasks)
  112.             {
  113.                 if (tasks.Count > 0)
  114.                 {
  115.                     first = true;
  116.                     item = tasks.Dequeue();
  117.                 }
  118.                 else
  119.                 {
  120.                     if (parameter.Item2)
  121.                     {
  122.                         if (first)
  123.                         {
  124.                             firstTime = DateTime.Now;
  125.                             first = false;
  126.                         }
  127.                         if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1)
  128.                         {
  129.                             --runCoreThreadCount;
  130.                             break;
  131.                         }
  132.                     }
  133.                     else
  134.                     {
  135.                         --runNoneCoreThreadCount;
  136.                         break;
  137.                     }
  138.                 }
  139.             }
  140.             item?.Runsynchronous();
  141.         }
  142.     }
  143. }
复制代码
  1. namespace CustomThreadPool;
  2. /// <summary>
  3. /// 包装的任务类
  4. /// </summary>
  5. public class WorkTask
  6. {
  7.     public static WorkTaskFactory Factory { get; private set; } = new WorkTaskFactory();
  8.     /// <summary>
  9.     /// 任务运行结束时触发该事件
  10.     /// </summary>
  11.     public event Action<WorkTask>? TaskCompleted;
  12.     /// <summary>
  13.     /// 任务ID
  14.     /// </summary>
  15.     private static int _id = 0;
  16.     /// <summary>
  17.     /// 委托给任务不带执行参数的代码
  18.     /// </summary>
  19.     private readonly Action? action;
  20.     /// <summary>
  21.     /// 委托给任务执行的带输入参数代码
  22.     /// </summary>
  23.     private readonly Action<object?>? actionWithParamter;
  24.     /// <summary>
  25.     /// 线程间的同步事件
  26.     /// </summary>
  27.     public AutoResetEvent WaitHandle { get; protected set; } = new AutoResetEvent(false);
  28.     /// <summary>
  29.     /// 执行代码的参数
  30.     /// </summary>
  31.     public object? State { get; protected set; }
  32.     /// <summary>
  33.     /// 接收任务抛出的异常
  34.     /// </summary>
  35.     public WorkTaskException? Exception { get; protected set; }
  36.     /// <summary>
  37.     /// 任务是否完成标志
  38.     /// </summary>
  39.     public bool IsCompleted { get; protected set; } = false;
  40.     /// <summary>
  41.     /// 任务知否有异常
  42.     /// </summary>
  43.     public bool IsFaulted { get; protected set; } = false;
  44.     /// <summary>
  45.     /// 任务状态
  46.     /// </summary>
  47.     public WorkTaskStatus Status { get; protected set; } = WorkTaskStatus.Created;
  48.     public int Id { get { return Interlocked.Increment(ref _id); } }
  49.     protected WorkTask() { }
  50.     protected void OnTaskCompleted(WorkTask sender)
  51.     {
  52.         TaskCompleted?.Invoke(sender);
  53.     }
  54.     public WorkTask(Action action)
  55.     {
  56.         this.action = action ?? throw new ArgumentNullException(nameof(action));
  57.     }
  58.     public WorkTask(Action<object?> action, object state)
  59.     {
  60.         actionWithParamter = action ?? throw new ArgumentNullException(nameof(action));
  61.         this.State = state;
  62.     }
  63.     /// <summary>
  64.     /// 任务的同步方法
  65.     /// </summary>
  66.     public virtual void Runsynchronous()
  67.     {
  68.         if (Status != WorkTaskStatus.Created) return;
  69.         Status = WorkTaskStatus.Running;
  70.         try
  71.         {
  72.             action?.Invoke();
  73.             actionWithParamter?.Invoke(State);
  74.         }
  75.         catch (Exception ex)
  76.         {
  77.             Exception = new WorkTaskException(ex.Message, ex);
  78.             IsFaulted = true;
  79.         }
  80.         finally
  81.         {
  82.             OnTaskCompleted(this);
  83.             WaitHandle.Set();
  84.             IsCompleted = true;
  85.             Status = WorkTaskStatus.RanToCompleted;
  86.         }
  87.     }
  88.     /// <summary>
  89.     /// 通过调用线程执行的方法
  90.     /// </summary>
  91.     public void Start()
  92.     {
  93.         Factory.ThreadPoolExcutor?.QueueTask(this);
  94.     }
  95.     /// <summary>
  96.     /// 通过调用线程执行的方法
  97.     /// </summary>
  98.     /// <param name="executor">线程池管理类</param>
  99.     public void Start(ThreadPoolExecutor executor)
  100.     {
  101.         executor.QueueTask(this);
  102.     }
  103.     /// <summary>
  104.     /// 执行一组任务并等待所有任务完成。
  105.     /// </summary>
  106.     /// <param name="tasks">一组任务</param>
  107.     /// <returns>所有任务是否都接收到完成的信号。</returns>
  108.     public static bool WaitAll(WorkTask[] tasks)
  109.     {
  110.         var result = true;
  111.         foreach (var task in tasks)
  112.         {
  113.             result = result && task.WaitHandle.WaitOne();
  114.         }
  115.         return result;
  116.     }
  117.     /// <summary>
  118.     /// 执行一组任务并等待任意一个任务完成。
  119.     /// </summary>
  120.     /// <param name="tasks">一组任务</param>
  121.     /// <returns>返回已完成任务的索引</returns>
  122.     public static int WaitAny(WorkTask[] tasks)
  123.     {
  124.         var index = new Random().Next(0, tasks.Length - 1);
  125.         tasks[index].WaitHandle.WaitOne();
  126.         return index;
  127.     }
  128. }
  129. /// <summary>
  130. /// 具有返回类型的任务
  131. /// </summary>
  132. /// <typeparam name="TResult"></typeparam>
  133. public class WorkTask<TResult> : WorkTask
  134. {
  135.     private readonly Func<TResult>? func;
  136.     private readonly Func<object?, TResult>? funcWithParameter;
  137.     protected TResult? _result = default(TResult);
  138.     public TResult? Result
  139.     {
  140.         get
  141.         {
  142.             if (!isSetSignal)
  143.                 WaitHandle.WaitOne();
  144.             return _result;
  145.         }
  146.     }
  147.     public WorkTask(Func<TResult> func)
  148.     {
  149.         this.func = func ?? throw new ArgumentNullException(nameof(func));
  150.     }
  151.     public WorkTask(Func<object?, TResult> func, object? state)
  152.     {
  153.         this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func));
  154.         this.State = state;
  155.     }
  156.     private bool isSetSignal = false;
  157.     public override void Runsynchronous()
  158.     {
  159.         if (Status != WorkTaskStatus.Created) return;
  160.         Status = WorkTaskStatus.Running;
  161.         try
  162.         {
  163.             if (func != null) _result = func();
  164.             if (funcWithParameter != null) _result = funcWithParameter(State);
  165.         }
  166.         catch (Exception ex)
  167.         {
  168.             Exception = new WorkTaskException(ex.Message, ex);
  169.             IsFaulted = true;
  170.         }
  171.         finally
  172.         {
  173.             OnTaskCompleted(this);
  174.             isSetSignal = WaitHandle.Set();
  175.             Status = WorkTaskStatus.RanToCompleted;
  176.             IsCompleted = true;
  177.         }
  178.     }
  179. }
  180. public class WorkTaskException : Exception
  181. {
  182.     public WorkTaskException()
  183.     {
  184.     }
  185.     public WorkTaskException(string Message)
  186.         : base(Message)
  187.     {
  188.     }
  189.     public WorkTaskException(string Message, Exception InnerException)
  190.         : base(Message, InnerException)
  191.     {
  192.     }
  193. }
  194. public enum WorkTaskStatus
  195. {
  196.     /// <summary>
  197.     /// 已创建
  198.     /// </summary>
  199.     Created = 0,
  200.     /// <summary>
  201.     /// 正在运行
  202.     /// </summary>
  203.     Running = 1,
  204.     /// <summary>
  205.     /// 已完成
  206.     /// </summary>
  207.     RanToCompleted = 2,
  208. }
复制代码
  1. namespace CustomThreadPool;
  2. public class WorkTaskFactory
  3. {
  4.     public ThreadPoolExecutor? ThreadPoolExcutor { get; private set; }
  5.     public WorkTaskFactory(ThreadPoolExecutor excutor)
  6.     {
  7.         ThreadPoolExcutor = excutor;
  8.     }
  9.     public WorkTaskFactory()
  10.     : this(new ThreadPoolExecutor(5, 10))
  11.     {
  12.     }
  13.     public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null)
  14.     {
  15.         WorkTask task = new WorkTask(action);
  16.         ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
  17.         ThreadPoolExcutor?.QueueTask(task);
  18.         return task;
  19.     }
  20.     public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null)
  21.     {
  22.         WorkTask<TResult> task = new WorkTask<TResult>(func, state);
  23.         ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
  24.         ThreadPoolExcutor?.QueueTask(task);
  25.         return task;
  26.     }
  27. }
复制代码
  1. namespace CustomThreadPool;
  2. using System.Threading;
  3. using System.Text;
  4. using System;
  5. using System.Diagnostics;
  6. using System.Reflection.Emit;
  7. class Program
  8. {
  9.     static void Main(string[] args)
  10.     {
  11.         int count = 5;
  12.         ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000);
  13.         WorkTask<int?>[] workTasks = new WorkTask<int?>[count];
  14.         for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor);
  15.         WorkTask<int> task = WorkTask.Factory.StartNew(t =>
  16.         {
  17.             Thread.Sleep(100);
  18.             Console.WriteLine("start thread");
  19.             return 100;
  20.         }, state: null, executor: poolExcutor);
  21.         Console.WriteLine("start main");
  22.         WorkTask.WaitAll(workTasks);
  23.         Console.WriteLine(task.Result);
  24.         Console.WriteLine(workTasks.Sum(t => t.Result));
  25.     }
  26.     private static int? Action(object? t)
  27.     {
  28.         Thread.Sleep(2000);
  29.         Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}");
  30.         return t == null ? default(int?) : (int)t + 1;
  31.     }
  32. }
复制代码
调用结果

 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立山

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表