mormot.core.threads--TSynThreadPool

打印 上一主题 下一主题

主题 885|帖子 885|积分 2655

mormot.core.threads--TSynThreadPool
  1. { ************ 面向服务器进程的线程池 }  
  2. TSynThreadPool = class; // 前向声明TSynThreadPool类
  3. /// 定义了TSynThreadPool所使用的工作线程
  4. TSynThreadPoolWorkThread = class(TSynThread)
  5. protected
  6.     fOwner: TSynThreadPool; // 线程池所有者
  7.     fThreadNumber: integer; // 线程编号
  8.     {$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口
  9.     fProcessingContext: pointer; // 正在处理的上下文,受fOwner.fSafe.Lock保护
  10.     fEvent: TSynEvent; // 同步事件
  11.     {$endif USE_WINIOCP}
  12.     procedure NotifyThreadStart(Sender: TSynThread); // 通知线程开始
  13.     procedure DoTask(Context: pointer); // 异常安全地调用fOwner.Task()
  14. public
  15.     /// 初始化线程
  16.     constructor Create(Owner: TSynThreadPool); reintroduce;
  17.   
  18.     /// 终结线程
  19.     destructor Destroy; override;
  20.   
  21.     /// 循环等待并执行挂起的任务,通过调用fOwner.Task()
  22.     procedure Execute; override;
  23.   
  24.     /// 关联的线程池
  25.     property Owner: TSynThreadPool
  26.       read fOwner;
  27. end;
  28. TSynThreadPoolWorkThreads = array of TSynThreadPoolWorkThread; // 线程池工作线程数组类型
  29. /// 一个简单的线程池,用于例如快速处理HTTP/1.0请求
  30. // - 在Windows上通过I/O完成端口实现,或在Linux/POSIX上使用经典的事件驱动方法
  31. TSynThreadPool = class
  32. protected
  33.     {$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口
  34.     fSafe: TOSLightLock; // 使用更稳定的锁
  35.     {$endif USE_WINIOCP}
  36.     fWorkThread: TSynThreadPoolWorkThreads; // 工作线程数组
  37.     fWorkThreadCount: integer; // 工作线程数量
  38.     fRunningThreads: integer; // 正在运行的线程数量
  39.     fExceptionsCount: integer; // 异常计数(未在代码中明确使用,但可能用于调试或监控)
  40.     fContentionAbortDelay: integer; // 由于争用而拒绝连接的延迟时间(毫秒)
  41.     fOnThreadTerminate: TOnNotifyThread; // 线程终止通知事件
  42.     fOnThreadStart: TOnNotifyThread; // 线程开始通知事件
  43.     fContentionTime: Int64; // 等待队列可用槽位的总时间(毫秒)
  44.     fContentionAbortCount: cardinal; // 由于争用而中止的任务数
  45.     fContentionCount: cardinal; // 等待队列可用槽位的次数
  46.     fName: RawUtf8; // 线程池名称
  47.     fTerminated: boolean; // 线程池是否已终止
  48.     {$ifdef USE_WINIOCP} // 如果使用Windows I/O完成端口
  49.     fRequestQueue: THandle; // IOCP有其自己的内部队列
  50.     {$else}
  51.     fQueuePendingContext: boolean; // 当所有线程都忙时,是否应维护一个内部队列
  52.     fPendingContext: array of pointer; // 挂起的上下文数组
  53.     fPendingContextCount: integer; // 挂起的上下文数量
  54.     function GetPendingContextCount: integer; // 获取挂起上下文数量的函数
  55.     function PopPendingContext: pointer; // 从挂起上下文数组中弹出一个元素的函数
  56.     function QueueLength: integer; virtual; // 获取队列长度的虚拟函数(可能用于调试)
  57.     {$endif USE_WINIOCP}
  58.     /// 在I/O错误时结束线程
  59.     function NeedStopOnIOError: boolean; virtual;
  60.     /// 在通知后要执行的进程,这是一个抽象方法,需要被子类实现
  61.     procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: pointer); virtual; abstract;
  62.     /// 中止任务的进程
  63.     procedure TaskAbort(aContext: Pointer); virtual;
  64. public
  65.     /// 使用指定的线程数初始化线程池
  66.     // - 抽象的Task()方法将由其中一个线程调用
  67.     // - 一个线程池最多可以关联256个线程
  68.     // - 在Windows上,可以可选地接受一个之前使用Windows重叠I/O(IOCP)打开的aOverlapHandle
  69.     // - 在POSIX上,如果aQueuePendingContext=true,则将挂起的上下文存储到内部队列中,
  70.     //   以便在队列未满时Push()返回true
  71.     {$ifdef USE_WINIOCP}
  72.     constructor Create(NumberOfThreads: integer = 32; aOverlapHandle: THandle = INVALID_HANDLE_VALUE; const aName: RawUtf8 = '');
  73.     {$else}
  74.     constructor Create(NumberOfThreads: integer = 32; aQueuePendingContext: boolean = false; const aName: RawUtf8 = '');
  75.     {$endif USE_WINIOCP}
  76.   
  77.     /// 关闭线程池,释放所有关联的线程
  78.     destructor Destroy; override;
  79.   
  80.     /// 让线程池处理一个指定的任务(作为指针)
  81.     // - 如果没有空闲线程可用,并且Create(aQueuePendingContext=false)被使用,则返回false(调用者稍后应重试)
  82.     // - 如果在Create中aQueuePendingContext为true,或使用了IOCP,则提供的上下文将被添加到内部列表,并在可能时处理
  83.     // - 如果aWaitOnContention默认为false,则在队列满时立即返回
  84.     // - 设置aWaitOnContention=true以等待最多ContentionAbortDelay毫秒并重试将任务排队
  85.     function Push(aContext: pointer; aWaitOnContention: boolean = false): boolean;
  86.     {$ifndef USE_WINIOCP}
  87.   
  88.     /// 在Push()返回false后调用,以查看队列是否确实已满
  89.     // - 如果QueuePendingContext为false,则返回false
  90.     function QueueIsFull: boolean;
  91.   
  92.     /// 如果所有线程都忙时,线程池是否应维护一个内部队列
  93.     // - 作为Create构造函数的参数提供
  94.     property QueuePendingContext: boolean
  95.       read fQueuePendingContext;
  96.     {$endif USE_WINIOCP}
  97.   
  98.     /// 对此线程池中定义的线程的低级访问
  99.     property WorkThread: TSynThreadPoolWorkThreads
  100.       read fWorkThread;
  101. published
  102.     /// 线程池中可用的线程数
  103.     // - 映射Create()参数,即默认为32
  104.     property WorkThreadCount: integer
  105.       read fWorkThreadCount;
  106.   
  107.     /// 当前在此线程池中处理任务的线程数
  108.     // - 范围在0..WorkThreadCount之间
  109.     property RunningThreads: integer
  110.       read fRunningThreads;
  111.   
  112.     /// 由于线程池争用而被拒绝的任务数
  113.     // - 如果此数字很高,请考虑设置更高的线程数,或分析并调整Task方法
  114.     property ContentionAbortCount: cardinal
  115.       read fContentionAbortCount;
  116.   
  117.     /// 由于争用而拒绝连接的延迟时间(毫秒)
  118.     // - 默认为5000,即等待IOCP或aQueuePendingContext内部列表中有空间可用5秒
  119.     // - 在此延迟期间,不接受新的连接(即不调用Accept),以便负载均衡器可以检测到争用并切换到池中的另一个实例,
  120.     //   或直接客户端最终可能会拒绝其连接,因此不会开始发送数据
  121.     property ContentionAbortDelay: integer
  122.       read fContentionAbortDelay write fContentionAbortDelay;
  123.   
  124.     /// 等待队列中可用槽位的总时间(毫秒)
  125.     // - 争用不会立即失败,但会重试直到ContentionAbortDelay
  126.     // - 此处的高数值需要对Task方法进行代码重构
  127.     property ContentionTime: Int64
  128.       read fContentionTime;
  129.   
  130.     /// 线程池等待队列中可用槽位的次数
  131.     // - 争用不会立即失败,但会重试直到ContentionAbortDelay
  132.     // - 此处的高数值可能需要增加线程数
  133.     // - 使用此属性和ContentionTime来计算平均争用时间
  134.     property ContentionCount: cardinal
  135.       read fContentionCount;
  136.   
  137.     {$ifndef USE_WINIOCP}
  138.     /// 当前等待分配给线程的输入任务数
  139.     property PendingContextCount: integer
  140.       read GetPendingContextCount;
  141.     {$endif USE_WINIOCP}
  142. end;
  143. {$M-} // 关闭内存管理消息
  144. const
  145.   // 允许TSynThreadPoolWorkThread堆栈使用最多256 * 2MB = 512MB的RAM
  146.   THREADPOOL_MAXTHREADS = 256;
复制代码
由于 TSynThreadPool类是一个高度抽象且依靠于特定实现的类(如它大概使用Windows的I/O完成端口或Linux/POSIX的变乱驱动机制),编写一个完整的例程代码大概会相称复杂,并且需要模拟或现实实现这些依靠项。然而,我可以提供一个简化的示例,该示例展示了如何创建 TSynThreadPool实例、如何向其推送使命,并如何大致实现 Task方法。
请留意,以下代码是一个高度简化的示例,并不包含全部 TSynThreadPool类定义中的功能,特殊是与Windows I/O完成端口或Linux/POSIX变乱驱动机制相关的部分。此外,由于 TSynThreadPool是一个假设的类(因为它不是Delphi标准库或广泛认可的第三方库的一部分),我将基于您提供的类定义来编写这个示例。
  1. program TSynThreadPoolDemo;
  2. {$MODE DELPHI}
  3. uses
  4.   SysUtils, Classes; // 引入必要的单元
  5. // 假设TSynThreadPool及其依赖项已经在某个单元中定义
  6. // 这里我们使用一个占位符单元名YourThreadPoolUnit
  7. uses YourThreadPoolUnit;
  8. // 一个简单的任务上下文类(仅作为示例)
  9. type
  10.   TMyTaskContext = record
  11.     Data: Integer;
  12.   end;
  13. // TSynThreadPool的Task方法的实现类
  14. type
  15.   TMyThreadPool = class(TSynThreadPool)
  16.   protected
  17.     procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer); override;
  18.   end;
  19. { TMyThreadPool }
  20. procedure TMyThreadPool.Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer);
  21. var
  22.   Ctx: PMyTaskContext;
  23. begin
  24.   Ctx := PMyTaskContext(aContext);
  25.   WriteLn('Processing task with data: ', Ctx.Data);
  26.   // 在这里添加处理任务的代码
  27.   // ...
  28. end;
  29. var
  30.   Pool: TMyThreadPool;
  31.   Ctx: TMyTaskContext;
  32.   I: Integer;
  33. begin
  34.   try
  35.     // 创建一个线程池实例,假设我们想要使用4个工作线程
  36.     Pool := TMyThreadPool.Create(4);
  37.     try
  38.       // 模拟向线程池推送一些任务
  39.       for I := 1 to 10 do
  40.       begin
  41.         Ctx.Data := I;
  42.         if not Pool.Push(@Ctx) then
  43.         begin
  44.           // 在这个简化的示例中,我们不会处理Push返回false的情况
  45.           // 在实际应用中,您可能需要等待、重试或将任务放入另一个队列中
  46.           WriteLn('Failed to push task to the pool (this should not happen in this simplified example)');
  47.         end;
  48.       end;
  49.       // 在这个简化的示例中,我们没有等待所有任务完成
  50.       // 在实际应用中,您可能需要等待线程池中的所有任务都完成后再继续
  51.       // ...
  52.     finally
  53.       // 销毁线程池实例,这将释放所有关联的资源
  54.       Pool.Free;
  55.     end;
  56.   except
  57.     on E: Exception do
  58.       WriteLn('An error occurred: ', E.Message);
  59.   end;
  60.   // 保持控制台窗口打开,直到用户按任意键
  61.   WriteLn('Press Enter to exit...');
  62.   ReadLn;
  63. end.
  64. // 注意:由于TSynThreadPool是一个假设的类,并且上述代码没有实现所有细节(如线程池的实际工作线程管理、任务队列等),
  65. // 因此这个示例主要是为了展示如何使用该类(如果它存在的话)的大致结构。
  66. // 在实际应用中,您需要根据TSynThreadPool类的具体实现来调整此代码。
复制代码
紧张说明

  • 占位符单元:在上述代码中,我使用了 YourThreadPoolUnit作为包含 TSynThreadPool类定义的占位符单元名。在现实应用中,您需要将其替换为包含该类定义的现实单元名。
  • 使命上下文:我定义了一个简朴的 TMyTaskContext记录类型来作为使命的上下文。在现实应用中,您大概需要根据需要定义更复杂的上下文类型。
  • 错误处理:在 Push方法返回 false的情况下,上述代码仅打印了一条消息,并没有采取任何规复措施。在现实应用中,您大概需要实现更复杂的错误处理逻辑(如重试、等待或将使命放入另一个队列中)。
  • 等待使命完成:上述代码没有等待线程池中的全部使命都完成。在现实应用中,您大概需要实现某种形式的等待机制(例如,使用同步变乱或计数器)来确保全部使命都已完成后再继续执行后续代码。
  • 线程池实现:由于 TSynThreadPool是一个假设的类,并且其实现细节(如工作线程的管理、使命队列的实现等)并未在您的类定义中给出,因此上述代码仅提供了一个大致的框架。在现实应用中,您需要根据 TSynThreadPool类的具体实现来调整此代码。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

盛世宏图

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表