NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。- namespace CustomThreadPool;
- /// <summary>
- /// 线程池类
- /// </summary>
- public class ThreadPoolExecutor
- {
- /// <summary>
- /// 核心线程的任务队列
- /// </summary>
- private readonly Queue<WorkTask> tasks = new Queue<WorkTask>();
- /// <summary>
- /// 最大核心线程数
- /// </summary>
- private int coreThreadCount;
- /// <summary>
- /// 最大非核心线程数
- /// </summary>
- private int noneCoreThreadCount;
- /// <summary>
- /// 当前运行的核心线程的数量
- /// </summary>
- private int runCoreThreadCount;
- /// <summary>
- /// 当前运行的非核心线程的数量
- /// </summary>
- private int runNoneCoreThreadCount;
- /// <summary>
- /// 核心线程队列的最大数
- /// </summary>
- private int maxQueueCount;
- /// <summary>
- /// 当核心线程空闲时最大活跃时间
- /// </summary>
- private int keepAliveTimeout;
- /// <summary>
- /// 设置是否为后台线程
- /// </summary>
- private bool isBackground;
- private ThreadPoolExecutor() { }
- /// <summary>
- ///
- /// </summary>
- /// <param name="CoreThreadCount">核心线程数</param>
- /// <param name="TotalThreadCount">总线程数</param>
- /// <param name="IsBackground">是否为后台线程</param>
- /// <param name="QueueCount">核心队列的最大数</param>
- /// <param name="KeepAliveTimeout">当核心线程空闲时最大活跃时间</param>
- /// <exception cref="ArgumentOutOfRangeException"></exception>
- /// <exception cref="ArgumentException"></exception>
- public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = true, int QueueCount = 200, int KeepAliveTimeout = 0)
- {
- if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null);
- if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}");
- if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null);
- if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null);
- coreThreadCount = CoreThreadCount;
- noneCoreThreadCount = TotalThreadCount - CoreThreadCount;
- keepAliveTimeout = KeepAliveTimeout;
- maxQueueCount = QueueCount;
- isBackground = IsBackground;
- }
- /// <summary>
- /// 执行任务
- /// </summary>
- /// <param name="task">一个自定义任务</param>
- /// <exception cref="ArgumentNullException">任务为null时,抛出该错误</exception>
- /// <exception cref="NotSupportedException">当核心任务队列已满且非核心线程最大数为0时抛出该错误</exception>
- public void QueueTask(WorkTask task)
- {
- if (task == null) throw new ArgumentNullException(nameof(task));
- lock (tasks)
- {
- tasks.Enqueue(task);
- if (tasks.Count <= maxQueueCount)
- {
- if (runCoreThreadCount < coreThreadCount)
- {
- ++runCoreThreadCount;
- Run(true);
- }
- }
- else
- {
- if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount)
- {
- ++runNoneCoreThreadCount;
- Run(false);
- }
- }
- }
- }
- private void Run(bool isCore)
- {
- Tuple<int, bool> state = new(keepAliveTimeout, isCore);
- Thread thread = new(t => Excute(t))
- {
- Name = Guid.NewGuid().ToString("D"),
- IsBackground = isBackground
- };
- thread.Start(state);
- }
- private void Excute(object? state)
- {
- if (state == null) return;
- var parameter = (Tuple<int, bool>)state;
- bool first = true;
- DateTime firstTime = DateTime.Now;
- while (true)
- {
- WorkTask? item = null;
- lock (tasks)
- {
- if (tasks.Count > 0)
- {
- first = true;
- item = tasks.Dequeue();
- }
- else
- {
- if (parameter.Item2)
- {
- if (first)
- {
- firstTime = DateTime.Now;
- first = false;
- }
- if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1)
- {
- --runCoreThreadCount;
- break;
- }
- }
- else
- {
- --runNoneCoreThreadCount;
- break;
- }
- }
- }
- item?.Runsynchronous();
- }
- }
- }
复制代码- namespace CustomThreadPool;
- /// <summary>
- /// 包装的任务类
- /// </summary>
- public class WorkTask
- {
- public static WorkTaskFactory Factory { get; private set; } = new WorkTaskFactory();
- /// <summary>
- /// 任务运行结束时触发该事件
- /// </summary>
- public event Action<WorkTask>? TaskCompleted;
- /// <summary>
- /// 任务ID
- /// </summary>
- private static int _id = 0;
- /// <summary>
- /// 委托给任务不带执行参数的代码
- /// </summary>
- private readonly Action? action;
- /// <summary>
- /// 委托给任务执行的带输入参数代码
- /// </summary>
- private readonly Action<object?>? actionWithParamter;
- /// <summary>
- /// 线程间的同步事件
- /// </summary>
- public AutoResetEvent WaitHandle { get; protected set; } = new AutoResetEvent(false);
- /// <summary>
- /// 执行代码的参数
- /// </summary>
- public object? State { get; protected set; }
- /// <summary>
- /// 接收任务抛出的异常
- /// </summary>
- public WorkTaskException? Exception { get; protected set; }
- /// <summary>
- /// 任务是否完成标志
- /// </summary>
- public bool IsCompleted { get; protected set; } = false;
- /// <summary>
- /// 任务知否有异常
- /// </summary>
- public bool IsFaulted { get; protected set; } = false;
- /// <summary>
- /// 任务状态
- /// </summary>
- public WorkTaskStatus Status { get; protected set; } = WorkTaskStatus.Created;
- public int Id { get { return Interlocked.Increment(ref _id); } }
- protected WorkTask() { }
- protected void OnTaskCompleted(WorkTask sender)
- {
- TaskCompleted?.Invoke(sender);
- }
- public WorkTask(Action action)
- {
- this.action = action ?? throw new ArgumentNullException(nameof(action));
- }
- public WorkTask(Action<object?> action, object state)
- {
- actionWithParamter = action ?? throw new ArgumentNullException(nameof(action));
- this.State = state;
- }
- /// <summary>
- /// 任务的同步方法
- /// </summary>
- public virtual void Runsynchronous()
- {
- if (Status != WorkTaskStatus.Created) return;
- Status = WorkTaskStatus.Running;
- try
- {
- action?.Invoke();
- actionWithParamter?.Invoke(State);
- }
- catch (Exception ex)
- {
- Exception = new WorkTaskException(ex.Message, ex);
- IsFaulted = true;
- }
- finally
- {
- OnTaskCompleted(this);
- WaitHandle.Set();
- IsCompleted = true;
- Status = WorkTaskStatus.RanToCompleted;
- }
- }
- /// <summary>
- /// 通过调用线程执行的方法
- /// </summary>
- public void Start()
- {
- Factory.ThreadPoolExcutor?.QueueTask(this);
- }
- /// <summary>
- /// 通过调用线程执行的方法
- /// </summary>
- /// <param name="executor">线程池管理类</param>
- public void Start(ThreadPoolExecutor executor)
- {
- executor.QueueTask(this);
- }
- /// <summary>
- /// 执行一组任务并等待所有任务完成。
- /// </summary>
- /// <param name="tasks">一组任务</param>
- /// <returns>所有任务是否都接收到完成的信号。</returns>
- public static bool WaitAll(WorkTask[] tasks)
- {
- var result = true;
- foreach (var task in tasks)
- {
- result = result && task.WaitHandle.WaitOne();
- }
- return result;
- }
- /// <summary>
- /// 执行一组任务并等待任意一个任务完成。
- /// </summary>
- /// <param name="tasks">一组任务</param>
- /// <returns>返回已完成任务的索引</returns>
- public static int WaitAny(WorkTask[] tasks)
- {
- var index = new Random().Next(0, tasks.Length - 1);
- tasks[index].WaitHandle.WaitOne();
- return index;
- }
- }
- /// <summary>
- /// 具有返回类型的任务
- /// </summary>
- /// <typeparam name="TResult"></typeparam>
- public class WorkTask<TResult> : WorkTask
- {
- private readonly Func<TResult>? func;
- private readonly Func<object?, TResult>? funcWithParameter;
- protected TResult? _result = default(TResult);
- public TResult? Result
- {
- get
- {
- if (!isSetSignal)
- WaitHandle.WaitOne();
- return _result;
- }
- }
- public WorkTask(Func<TResult> func)
- {
- this.func = func ?? throw new ArgumentNullException(nameof(func));
- }
- public WorkTask(Func<object?, TResult> func, object? state)
- {
- this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func));
- this.State = state;
- }
- private bool isSetSignal = false;
- public override void Runsynchronous()
- {
- if (Status != WorkTaskStatus.Created) return;
- Status = WorkTaskStatus.Running;
- try
- {
- if (func != null) _result = func();
- if (funcWithParameter != null) _result = funcWithParameter(State);
- }
- catch (Exception ex)
- {
- Exception = new WorkTaskException(ex.Message, ex);
- IsFaulted = true;
- }
- finally
- {
- OnTaskCompleted(this);
- isSetSignal = WaitHandle.Set();
- Status = WorkTaskStatus.RanToCompleted;
- IsCompleted = true;
- }
- }
- }
- public class WorkTaskException : Exception
- {
- public WorkTaskException()
- {
- }
- public WorkTaskException(string Message)
- : base(Message)
- {
- }
- public WorkTaskException(string Message, Exception InnerException)
- : base(Message, InnerException)
- {
- }
- }
- public enum WorkTaskStatus
- {
- /// <summary>
- /// 已创建
- /// </summary>
- Created = 0,
- /// <summary>
- /// 正在运行
- /// </summary>
- Running = 1,
- /// <summary>
- /// 已完成
- /// </summary>
- RanToCompleted = 2,
- }
复制代码- namespace CustomThreadPool;
- public class WorkTaskFactory
- {
- public ThreadPoolExecutor? ThreadPoolExcutor { get; private set; }
- public WorkTaskFactory(ThreadPoolExecutor excutor)
- {
- ThreadPoolExcutor = excutor;
- }
- public WorkTaskFactory()
- : this(new ThreadPoolExecutor(5, 10))
- {
- }
- public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null)
- {
- WorkTask task = new WorkTask(action);
- ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
- ThreadPoolExcutor?.QueueTask(task);
- return task;
- }
- public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null)
- {
- WorkTask<TResult> task = new WorkTask<TResult>(func, state);
- ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
- ThreadPoolExcutor?.QueueTask(task);
- return task;
- }
- }
复制代码- namespace CustomThreadPool;
- using System.Threading;
- using System.Text;
- using System;
- using System.Diagnostics;
- using System.Reflection.Emit;
- class Program
- {
- static void Main(string[] args)
- {
- int count = 5;
- ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000);
- WorkTask<int?>[] workTasks = new WorkTask<int?>[count];
- for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor);
- WorkTask<int> task = WorkTask.Factory.StartNew(t =>
- {
- Thread.Sleep(100);
- Console.WriteLine("start thread");
- return 100;
- }, state: null, executor: poolExcutor);
- Console.WriteLine("start main");
- WorkTask.WaitAll(workTasks);
- Console.WriteLine(task.Result);
- Console.WriteLine(workTasks.Sum(t => t.Result));
- }
- private static int? Action(object? t)
- {
- Thread.Sleep(2000);
- Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}");
- return t == null ? default(int?) : (int)t + 1;
- }
- }
复制代码 调用结果

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |