[多线程]基于单例懒汉模式的线程池的实现

写过一篇  金牌会员 | 2025-3-17 00:31:52 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 937|帖子 937|积分 2811

标题:[多线程]基于单例懒汉模式的线程池的实现
@水墨不写bug




  

   本文我们将要实现一个线程池,其中用到的一些模块如LockGuard,Log等在之前的文章中有所报告,以是本文就不再赘述。
  
一、锁的RAII计划

  1. #pragma once
  2. #include <pthread.h>
  3. // 使用锁需要频繁调用系统调用,十分麻烦
  4. // 于是实现锁的RAII设计
  5. class LockGuard
  6. {
  7. private:
  8.         pthread_mutex_t *GetMutex() { return _mutex; }
  9. public:
  10.         LockGuard(pthread_mutex_t *mutex)
  11.                 : _mutex(mutex)
  12.         {
  13.                 pthread_mutex_lock(_mutex);
  14.         }
  15.         ~LockGuard()
  16.         {
  17.                 pthread_mutex_unlock(_mutex);
  18.         }
  19. private:
  20.         pthread_mutex_t *_mutex;
  21. };
  22. /*
  23. *之前的理解错误了:不需要init和destroy,因为锁本身就是存在的!
  24. *锁在一般情况下会内置在类的内部,需要使用(加锁)的时候,把锁的地址传进来就行了
  25. *在构造函数内加锁,析构函数内解锁。
  26. *e.g.
  27. while(ture)
  28. {
  29.         LockGuard lockguard(td->_mutex);
  30.         if(tickets > 0)
  31.         {
  32.                 //抢票
  33.         }
  34.         else
  35.         {
  36.                 break;
  37.         }
  38. }
  39. *while内每一次进行循环,都需要创建一个新的锁,创建即加锁,if代码块结束即为解锁
  40. */
  41. /*
  42.         void UnLock() { pthread_mutex_unlock(&_mutex); }
  43.         void Lock() { pthread_mutex_lock(&_mutex); }
  44. */
复制代码
以上代码是基于RAII(Resource Acquisition Is Initialization)计划模式的锁管理类LockGuard。
该类用于简化对pthread库的互斥锁(pthread_mutex_t)的利用,确保在加锁息争锁过程中的安全性和轻便性。

二、任务类举例

  1. #pragma once
  2. #include <functional>
  3. #include <iostream>
  4. using std::cout;
  5. using std::endl;
  6. // void DownLoad()
  7. // {
  8. //     cout<<"I am a DownTask"<<endl;
  9. // }
  10. // 任务类,包含各种任务
  11. class Task
  12. {
  13. private:
  14.     using task_t = std::function<void()>;
  15.     void Excute()
  16.     {
  17.         _result = _x + _y;
  18.     }
  19. public:
  20.     Task()
  21.         : _x(0), _y(0)
  22.     {
  23.     }
  24.     Task(int a, int b)
  25.         : _x(a), _y(b)
  26.     {
  27.     }
  28.     void operator()()
  29.     {
  30.         Excute();
  31.     }
  32.     void Debug()
  33.     {
  34.         cout<<"Debug : "<<_x<<" + "<<_y<<" = "<<_result<<endl;
  35.     }
  36. private:
  37.     int _x;
  38.     int _y;
  39.     int _result;
  40. };
复制代码
这个类封装了一个具体的任务:两数相加。
尽管这是一项简单的任务(这样计划是为了便于理解线程池的部分),但是其实任务类可以根据用户的需要来自行计划,可以是下载资源,请求网页html等。
   任务类的计划目标就是一个演示作用。
  
三、日记体系的计划

  1. #pragma once
  2. #include <string>
  3. #include <iostream>
  4. #include <unistd.h>
  5. #include <sys/types.h>
  6. #include <ctime>
  7. #include <cstdarg>
  8. #include <fstream>
  9. #include <cstring>
  10. #include "LockGuard.hpp"
  11. namespace log_ddsm
  12. {
  13.     using std::cout;
  14.     using std::endl;
  15.     // 日志信息管理
  16.     enum LEVEL
  17.     {
  18.         DEBUG = 1,   // 调试信息
  19.         INFO = 2,    // 提示信息
  20.         WARNING = 3, // 警告
  21.         ERROR = 4,   // 错误,但是不影响服务正常运行
  22.         FATAL = 5,   // 致命错误,服务无法正常运行
  23.     };
  24.     enum SIZE
  25.     {
  26.         TIMESIZE = 128,
  27.         LOGSIZE = 1024,
  28.         FILE_TYPE_LOG_SIZE = 2048
  29.     };
  30.     enum TYPE
  31.     {
  32.         SCREEN_TYPE = 8,
  33.         FILE_TYPE = 16
  34.     };
  35.     // 默认日志文件名称
  36.     const char *DEFAULT_LOG_NAME = "./log.txt";
  37.     // 全局锁,保护打印日志
  38.     pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
  39.     struct logMessage
  40.     {
  41.         std::string _level; // 日志等级
  42.         int _level_num;     // 日志等级的int格式
  43.         pid_t _pid;             // 这条日志的进程id
  44.         std::string _file_name; // 文件名
  45.         int _file_number;       // 行号
  46.         std::string _cur_time;  // 当时的时间
  47.         std::string _log_info;  // 日志正文
  48.     };
  49.     // 通过int获取日志等级
  50.     std::string getLevel(int level)
  51.     {
  52.         switch (level)
  53.         {
  54.         case 1:
  55.             return "DEBUG";
  56.         case 2:
  57.             return "INFO";
  58.         case 3:
  59.             return "WARNING";
  60.         case 4:
  61.             return "ERROR";
  62.         case 5:
  63.             return "FATAL";
  64.         default:
  65.             return "UNKNOWN";
  66.         }
  67.     }
  68.     std::string getCurTime()
  69.     {
  70.         time_t cur = time(nullptr);
  71.         struct tm *curtime = localtime(&cur);
  72.         char buf[TIMESIZE] = {0};
  73.         snprintf(buf, sizeof(buf), "%d-%d-%d %d:%d:%d",
  74.                  curtime->tm_year + 1900,
  75.                  curtime->tm_mon + 1,
  76.                  curtime->tm_mday,
  77.                  curtime->tm_hour,
  78.                  curtime->tm_min,
  79.                  curtime->tm_sec);
  80.         return buf;
  81.     }
  82.     class Log
  83.     {
  84.     private:
  85.         void FlushMessage(const logMessage &lg)
  86.         {
  87.             // 互斥锁,保护Print过程
  88.             LockGuard lockguard(&gmutex);
  89.             // 过滤逻辑
  90.             // 致命错误到文件逻辑
  91.             if (lg._level_num >= _ignore_level)
  92.             {
  93.                 if (_print_type == SCREEN_TYPE)
  94.                     PrintToScreen(lg);
  95.                 else if (_print_type == FILE_TYPE)
  96.                     PrintToFile(lg);
  97.                 else
  98.                     std::cerr << __FILE__ << " " << __LINE__ << ":" << " UNKNOWN_TYPE " << std::endl;
  99.             }
  100.         }
  101.         void PrintToScreen(const logMessage &lg)
  102.         {
  103.             printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
  104.                    lg._pid,
  105.                    lg._file_name.c_str(),
  106.                    lg._file_number,
  107.                    lg._cur_time.c_str(),
  108.                    lg._log_info.c_str());
  109.         }
  110.         void PrintToFile(const logMessage &lg)
  111.         {
  112.             std::ofstream out(_log_file_name, std::ios::app); // 追加打印
  113.             if (!out.is_open())
  114.             {
  115.                 std::cerr << __FILE__ << " " << __LINE__ << ":" << " LOG_FILE_OPEN fail " << std::endl;
  116.                 return;
  117.             }
  118.             char log_txt[FILE_TYPE_LOG_SIZE] = {0}; // 缓冲区
  119.             snprintf(log_txt, sizeof(log_txt), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
  120.                      lg._pid,
  121.                      lg._file_name.c_str(),
  122.                      lg._file_number,
  123.                      lg._cur_time.c_str(),
  124.                      lg._log_info.c_str());
  125.             out.write(log_txt, strlen(log_txt)); // 写入文件
  126.         }
  127.     public:
  128.         // 打印方式——默认向显示器打印
  129.         // 如果选择文件类型打印,默认的文件名称是当前目录的log.txt
  130.         //
  131.         Log(int print_type = SCREEN_TYPE)
  132.             : _print_type(print_type), _log_file_name(DEFAULT_LOG_NAME), _ignore_level(DEBUG)
  133.         {
  134.         }
  135.         // 只有一个全局对象,多个构造函数多余,不如设计一个构造,添加一个设置log文件名称的接口
  136.         //  Log(int print_type, const char *logname = DEFAULT_LOG_NAME)
  137.         //      : _print_type(print_type), _log_file_name(logname), _ignore_level(1)
  138.         //  {
  139.         //  }
  140.         void Enable(int type)
  141.         {
  142.             _print_type = type;
  143.         }
  144.         /// @brief 加载日志信息
  145.         /// @param format 格式化输出
  146.         /// @param  可变参数
  147.         /*区分了本层和商城之后,就容易设置参数了*/
  148.         void load_message(int level, std::string filename, int filenumber, const char *format, ...)
  149.         {
  150.             logMessage lg;
  151.             lg._level = getLevel(level);
  152.             lg._level_num = level;
  153.             lg._pid = getpid();
  154.             lg._file_name = filename;
  155.             lg._file_number = filenumber;
  156.             lg._cur_time = getCurTime();
  157.             // valist + vsnprintf 处理可变参数
  158.             va_list ap;
  159.             va_start(ap, format);
  160.             char log_info[LOGSIZE] = {0};
  161.             vsnprintf(log_info, sizeof(log_info), format, ap);
  162.             va_end(ap);
  163.             lg._log_info = log_info;
  164.             // 打印逻辑
  165.             FlushMessage(lg);
  166.         }
  167.         /// @brief
  168.         /// @param ignorelevel
  169.         /* DEBUG = 1,   调试信息
  170.          *INOF = 2,     提示信息
  171.          *WARNING = 3,  警告
  172.          *ERROR = 4,    错误,但是不影响服务正常运行
  173.          *FATAL = 5,    致命错误,服务无法正常运行 */
  174.         void SetIgnoreLevel(int ignorelevel)
  175.         {
  176.             _ignore_level = ignorelevel;
  177.         }
  178.         void SetLogFileName(const char *newlogfilename)
  179.         {
  180.             _log_file_name = newlogfilename;
  181.         }
  182.         ~Log() {}
  183.     private:
  184.         int _print_type;
  185.         std::string _log_file_name;
  186.         int _ignore_level;
  187.     };
  188.     // 全局的Log对象
  189.     Log lg;
  190.     // 打印日志信息
  191.     // e.g.
  192.     // LOG(DEBUG,"this is a test message %d %f %c", 13 , 9.81 , 'A' );
  193.     //
  194. // 使用日志的一般格式
  195. #define LOG(Level, Format, ...)                                            \
  196.     do                                                                     \
  197.     {                                                                      \
  198.         lg.load_message(Level, __FILE__, __LINE__, Format, ##__VA_ARGS__); \
  199.     } while (0)
  200.     /// 无法设计为inlne——__VA_ARGS只能出现在宏替换中
  201.     // inline void LOG(int level,const char* format, ...)
  202.     // {
  203.     //     lg.load_message(level,__FILE__,__LINE__,format,__VA_ARGS__);
  204.     // }
  205. // 往文件打印
  206. #define ENABLE_FILE()         \
  207.     do                        \
  208.     {                         \
  209.         lg.Enable(FILE_TYPE); \
  210.     } while (0)
  211. // 往显示器打印
  212. #define ENABLE_SCREEN()         \
  213.     do                          \
  214.     {                           \
  215.         lg.Enable(SCREEN_TYPE); \
  216.     } while (0)
  217. //设置日志忽略等级
  218. #define SET_IGNORE_LEVEL(Level)  \
  219.     do                           \
  220.     {                            \
  221.         lg.SetIgnoreLevel(Level); \
  222.     } while (0)
  223. //设置日志文件名称
  224. #define SET_LOG_FILENAME(Name)  \
  225.     do                          \
  226.     {                           \
  227.         lg.SetLogFileName(Name); \
  228.     } while (0)
  229. }; // namespace log_ddsm
复制代码
以上是日记体系的计划,具体的报告请参考我之前的这篇文章:
   《[HTTP协议]应用层协议HTTP从入门到深刻理解并落地部署自己的云服务(2)实操部署》:
https://blog.csdn.net/2301_79465388/article/details/146114051?spm=1001.2014.3001.5501

  四、基于POSIX线程封装自己的线程模块

  1. #pragma once
  2. #include <pthread.h>
  3. #include <vector>
  4. #include <functional>
  5. #include <iostream>
  6. #include <cassert>
  7. #include <cstring>
  8. #include "Log.hpp"
  9. using namespace log_ddsm;
  10. // 理解C++11做的工作,封装自己的原生线程
  11. // 创建线程,执行任务
  12. namespace ThreadMoudle
  13. {
  14.     // 支持模板
  15.     using func_t = std::function<void(const std::string &name)>;
  16.     // 线程控制-自主管理原生线程
  17.     class Thread
  18.     {
  19.     private:
  20.         // 线程创建后例行执行的函数
  21.         static void *Routine(void *args) // 设置为static,防止this指针干扰类型匹配
  22.         {
  23.             Thread *self = static_cast<Thread *>(args);
  24.             self->Excute();
  25.             return nullptr;
  26.         }
  27.         void Excute()
  28.         {
  29.             _isrunning = true;
  30.             _func(_name);
  31.             _isrunning = false;
  32.         }
  33.     public:
  34.         /// @brief 构造线程
  35.         /// @param name 线程名
  36.         /// @param func 线程要执行的任务
  37.         Thread(const std::string &name, func_t func)
  38.             : _name(name), _tid(), _isrunning(false), _func(func)
  39.         {
  40.         }
  41.         bool Start()
  42.         {
  43.             int res = ::pthread_create(&_tid, nullptr, Routine, this); // 要求类型:void*(void*)——有this指针导致类型不匹配
  44.             if (res != 0)
  45.             {
  46.                 LOG(ERROR, "pthread_create failed with error: %s\n", strerror(res));
  47.                 // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_creat: error!" << std::endl;
  48.                 // printf("pthread_create failed with error: %s\n", strerror(res));
  49.                 return false;
  50.             }
  51.             else
  52.             {
  53.                 return true;
  54.             }
  55.         }
  56.         bool Stop()
  57.         {
  58.             if (_isrunning)
  59.             {
  60.                 int res = ::pthread_cancel(_tid);
  61.                 if (res != 0)
  62.                 {
  63.                     // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_cancel: error!" << std::endl;
  64.                     // printf("pthread_cancel failed with error: %s\n", strerror(res));
  65.                     LOG(ERROR, "pthread_cancel failed with error: %s\n", strerror(res));
  66.                     return false;
  67.                 }
  68.                 else
  69.                 {
  70.                     return true;
  71.                 }
  72.             }
  73.             //走到这里,说明线程已经stop了
  74.             return true;
  75.         }
  76.         // Stop之后join
  77.         bool Join()
  78.         {
  79.             if (!_isrunning)
  80.             {
  81.                 int res = ::pthread_join(_tid, nullptr);
  82.                 if (res != 0)
  83.                 {
  84.                     // std::cout<<"Debug res:"<<res<<std::endl;
  85.                     // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_join: error!" << std::endl;
  86.                     // printf("pthread_join failed with error: %s\n", strerror(res));
  87.                     LOG(ERROR, "pthread_join failed with error: %s\n", strerror(res));
  88.                     return false;
  89.                 }
  90.                 else
  91.                 {
  92.                     return true;
  93.                 }
  94.             }
  95.             else
  96.             {
  97.                 // Running时join
  98.                 LOG(INFO, "join while Running\n");
  99.                 // std::cerr << "join while Running" << std::endl;
  100.                 return false;
  101.             }
  102.         }
  103.         ~Thread() {}
  104.         bool Status() { return _isrunning; }
  105.         std::string Name() { return _name; }
  106.     private:
  107.         std::string _name; // 线程的名字
  108.         pthread_t _tid;    // 线程的类型
  109.         bool _isrunning;   // 线程是否在运行
  110.         func_t _func;      // 线程将要执行的回调函数
  111.     };
  112. }; // namespace ThreadMoudle
复制代码
以上代码实现了一个线程管理模块 ThreadMoudle,其中包含一个线程类 Thread。它封装了POSIX线程(pthread)的创建、启动、制止和加入(join)操纵,并提供了一些辅助功能。
计划要点


  • 线程管理

    • 该模块封装了POSIX线程的基本操纵,提供了一个易于利用的接口来管理线程的创建、启动、制止和加入操纵。

  • 可重用性和可扩展性

    • 通过利用std::function<void(const std::string &name)>,可以将恣意符合该签名的回调函数传递给线程对象,从而实现任务的多样性和机动性。

  • 线程安全性

    • 通过内部状态标记(_isrunning)和得当的日记记录,确保线程的状态和操纵的一致性,制止了多次启动或制止同一线程的问题。

主要功能


  • 线程创建和启动

    • Thread::Start():调用pthread_create创建一个新线程,并启动实行Routine函数。Routine函数将调用实例的Excute方法实行具体任务。
    • Routine函数被声明为静态函数,以制止this指针干扰范例匹配。

  • 线程实行具体任务

    • Thread::Excute():设置线程状态为运行中,并调用回调函数_func实行具体任务。

  • 线程制止

    • Thread::Stop():调用pthread_cancel取消运行中的线程。对于不符合逻辑的情况:线程未运行,则记录错误日记。

  • 线程加入

    • Thread::Join():调用pthread_join等待线程制止。对于不符合逻辑的情况:线程仍在运行,则记录错误日记。

  • 日记记录

    • 利用日记模块(log_ddsm)记录线程操纵中的错误信息,便于调试和问题排查。

利用示例

  1. #include "Thread.hpp"
  2. void MyTask(const std::string &name)
  3. {
  4.     std::cout << "Thread " << name << " is running." << std::endl;
  5.     int count = 5;
  6.     while(count--)
  7.     {
  8.         std::cout<< count<<std::endl;
  9.     }
  10. }
  11. int main()
  12. {
  13.     ThreadMoudle::Thread myThread("TestThread", MyTask);
  14.     if (myThread.Start())
  15.     {
  16.         std::cout << "Thread started successfully." << std::endl;
  17.     }
  18.     // 等待线程完成任务
  19.     myThread.Join();
  20.     return 0;
  21. }
复制代码
结果:

在这个示例中:


  • 创建一个名为"TestThread"的线程对象,并传递任务函数MyTask。
  • 调用myThread.Start()启动线程。
  • 等待线程完成任务后,调用myThread.Join()等待线程制止。

五、基于懒汉模式的线程池主体逻辑计划

  1. #pragma once
  2. #include <queue>
  3. #include <unistd.h>
  4. #include "Log.hpp"
  5. #include "Thread.hpp" //using pthread lib
  6. using std::cout;
  7. using std::endl;
  8. using ThreadMoudle::func_t;
  9. using namespace log_ddsm;
  10. enum NUM
  11. {
  12.     DEFAULTNUM = 10
  13. };
  14. /// @brief
  15. /// @tparam T:任务类型
  16. //懒汉模式设计单例ThreadPool
  17. template <typename T>
  18. class ThreadPool
  19. {
  20. private:
  21.     void LockQueue() // 加锁
  22.     {
  23.         pthread_mutex_lock(&_taskqueue_mutex);
  24.     }
  25.     void UnLockQueue() // 解锁
  26.     {
  27.         pthread_mutex_unlock(&_taskqueue_mutex);
  28.     }
  29.     void Sleep() // 在条件变量_taskqueue_cond下等待
  30.     {
  31.         pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
  32.     }
  33.     void WakeUp() // 叫醒一个进程
  34.     {
  35.         pthread_cond_signal(&_taskqueue_cond);
  36.     }
  37.     void WakeUpAll() // 叫醒所有线程
  38.     {
  39.         pthread_cond_broadcast(&_taskqueue_cond);
  40.     }
  41.     bool IsEmpty()
  42.     {
  43.         return _taskqueue.empty();
  44.     }
  45.     // thread将要执行的函数逻辑
  46.     void HandlerTask(const std::string &name)
  47.     {
  48.         // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
  49.         while (true)
  50.         {
  51.             LockQueue();
  52.             /*  优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
  53.             while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
  54.             {
  55.                 _sleep_num++;
  56.                 Sleep();
  57.                 _sleep_num--;
  58.             }
  59.             if (IsEmpty() && !_isrunning)
  60.             {
  61.                 UnLockQueue();
  62.                 LOG(DEBUG, "thread quit\n");
  63.                 // cout << "thread quit" << endl;
  64.                 break;
  65.             }
  66.             T t; // 从taskqueue中获取任务
  67.             t = _taskqueue.front();
  68.             _taskqueue.pop();
  69.             UnLockQueue();
  70.             // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
  71.             // 不能在临界区中进行
  72.             t();
  73.             LOG(DEBUG, "%s", name.c_str());
  74.             // cout << name << " : ";
  75.             t.Debug();
  76.         }
  77.     }
  78.     // 缺省个数——10
  79.     ThreadPool(size_t threadnum)
  80.         : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0) // 初始化列中调用has a的构造??
  81.     {
  82.         // 初始化
  83.         pthread_mutex_init(&_taskqueue_mutex, nullptr);
  84.         pthread_cond_init(&_taskqueue_cond, nullptr);
  85.     }
  86.     // 初始化线程池
  87.     void Init()
  88.     {
  89.         func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
  90.         for (int i = 0; i < _threadnum; ++i)
  91.         {
  92.             std::string name = "thread - " + std::to_string(i + 1);
  93.             _threads.emplace_back(name, func); // 减少一次拷贝,直接在vector内构造
  94.             // usleep(10000);
  95.             // cout << "emplaced i+1 :" << i + 1 << endl;
  96.         }
  97.     }
  98.     // 开始运行
  99.     void Start()
  100.     {
  101.         _isrunning = true; // 单进程访问不需要保护
  102.         for (int i = 0; i < _threadnum; ++i)
  103.         {
  104.             _threads[i].Start();
  105.             LOG(DEBUG, "thread - %d started \n", i + 1);
  106.         }
  107.     }
  108. public:
  109.     static ThreadPool<T> *GetInstance()
  110.     {
  111.         if (_ptp == nullptr)
  112.         {
  113.             LockGuard lockguard(&_sig_mutex);
  114.             if (_ptp == nullptr)
  115.             {
  116.                 LOG(INFO, "create threadspool\n");
  117.                 _ptp = new ThreadPool<T>(DEFAULTNUM);//线程安全问题
  118.                 _ptp->Init();
  119.                 _ptp->Start();
  120.             }
  121.         }
  122.         else
  123.         {
  124.             LOG(INFO, "get threadspool\n");
  125.         }
  126.         return _ptp;
  127.     }
  128.     // 停止运行
  129.     void Stop()
  130.     {
  131.         LockQueue();
  132.         _isrunning = false;
  133.         WakeUpAll();
  134.         UnLockQueue();
  135.     }
  136.     // 向任务队列加入任务
  137.     void PushTask(const T &in)
  138.     {
  139.         LockQueue();
  140.         if (_isrunning == true) // 防止出现线程池想要退出但是 却被一直分配任务,导致无法退出
  141.         {
  142.             _taskqueue.push(in); // 加入任务
  143.             if (_sleep_num > 0)
  144.                 WakeUp();
  145.         }
  146.         UnLockQueue();
  147.     }
  148.     ~ThreadPool()
  149.     {
  150.         for (int i = 0; i < _threadnum; ++i)
  151.         {
  152.             LOG(INFO, "thread-%d joined\n", i + 1);
  153.             _threads[i].Join();
  154.         }
  155.         pthread_mutex_destroy(&_taskqueue_mutex);
  156.         pthread_cond_destroy(&_taskqueue_cond);
  157.     }
  158. private:
  159.     size_t _threadnum;                          // 线程池中线程的个数
  160.     std::vector<ThreadMoudle::Thread> _threads; // vector维护线程池
  161.     std::queue<T> _taskqueue; // 任务队列
  162.     bool _isrunning;          // 标识线程池是否在运行
  163.     /*多线程对taskqueue访问需要整体使用——加锁*/
  164.     pthread_mutex_t _taskqueue_mutex; // 用于维护taskqueue的mutex cond
  165.     pthread_cond_t _taskqueue_cond;
  166.     int _sleep_num; // 休眠的线程个数
  167.     static ThreadPool<T> *_ptp; // 单例对象
  168.     static pthread_mutex_t _sig_mutex;
  169. };
  170. // 静态成员在类外初始化
  171. template <class T>
  172. ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;
  173. // 第一次保护获取单例的mutex
  174. template <class T>
  175. pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码
类 ThreadPool 的成员函数详解


  • LockQueue()UnLockQueue()

    • 这两个函数分别用来加锁息争锁 _taskqueue_mutex 互斥锁。
    • 目标是保护任务队列 _taskqueue 的访问,确保在多线程环境下对任务队列的操纵是线程安全的。
    1. void LockQueue() // 加锁
    2. {
    3.     pthread_mutex_lock(&_taskqueue_mutex);
    4. }
    5. void UnLockQueue() // 解锁
    6. {
    7.     pthread_mutex_unlock(&_taskqueue_mutex);
    8. }
    复制代码

  • Sleep()WakeUp()WakeUpAll()

    • Sleep() 函数使线程在条件变量 _taskqueue_cond 上等待,等待条件满足时被叫醒。
    • WakeUp() 函数叫醒一个在 _taskqueue_cond 上等待的线程。
    • WakeUpAll() 函数叫醒所有在 _taskqueue_cond 上等待的线程。
    • 这些函数用于实现线程休眠和叫醒机制,当任务队列为空时,线程进入休眠状态;当有新任务加入时,叫醒线程实行任务。
    1. void Sleep() // 在条件变量_taskqueue_cond下等待
    2. {
    3.     pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
    4. }
    5. void WakeUp() // 叫醒一个进程
    6. {
    7.     pthread_cond_signal(&_taskqueue_cond);
    8. }
    9. void WakeUpAll() // 叫醒所有线程
    10. {
    11.     pthread_cond_broadcast(&_taskqueue_cond);
    12. }
    复制代码

  • IsEmpty()

    • 检查任务队列是否为空。
    • 返回布尔值,表现任务队列是否为空。
    1. bool IsEmpty()
    2. {
    3.     return _taskqueue.empty();
    4. }
    复制代码

  • HandlerTask(const std::string &name)

    • 线程实行的主要函数逻辑。
    • 线程在循环中等待任务队列中的任务,当有任务时,从任务队列中取出任务并实行。
    • 当任务队列为空且线程池不再运行时,线程退出循环,竣事实行。
    1. // thread将要执行的函数逻辑
    2. void HandlerTask(const std::string &name)
    3. {
    4.     // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
    5.     while (true)
    6.     {
    7.         LockQueue();
    8.         /*  优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
    9.         while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
    10.         {
    11.             _sleep_num++;
    12.             Sleep();
    13.             _sleep_num--;
    14.         }
    15.         if (IsEmpty() && !_isrunning)
    16.         {
    17.             UnLockQueue();
    18.             LOG(DEBUG, "thread quit\n");
    19.             break;
    20.         }
    21.         T t; // 从taskqueue中获取任务
    22.         t = _taskqueue.front();
    23.         _taskqueue.pop();
    24.         UnLockQueue();
    25.         // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
    26.         // 不能在临界区中进行
    27.         t();
    28.         LOG(DEBUG, "%s", name.c_str());
    29.         // cout << name << " : ";
    30.         t.Debug();
    31.     }
    32. }
    复制代码

  • 构造函数 ThreadPool(size_t threadnum)

    • 初始化线程池,包括线程数量、任务队列、互斥锁和条件变量。
    • 设置 _threadnum 为线程池中的线程数量,初始化其他成员变量。
    1. ThreadPool(size_t threadnum)
    2.     : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0)
    3. {
    4.     pthread_mutex_init(&_taskqueue_mutex, nullptr);
    5.     pthread_cond_init(&_taskqueue_cond, nullptr);
    6. }
    复制代码

  • Init()

    • 初始化线程池中的线程。
    • 创建 thread 对象,并将 HandlerTask 绑定为线程实行的任务函数。这一操纵的目标是为了使emplace_back传入Thread构造函数的范例匹配。
    1. void Init()
    2. {
    3.     func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
    4.     for (int i = 0; i < _threadnum; ++i)
    5.     {
    6.         std::string name = "thread - " + std::to_string(i + 1);
    7.         _threads.emplace_back(name, func);
    8.     }
    9. }
    复制代码

  • Start()

    • 启动线程池中的线程。
    • 设置 _isrunning 为 true,然后启动线程池中每个线程。
    1. void Start()
    2. {
    3.     _isrunning = true;
    4.     for (int i = 0; i < _threadnum; ++i)
    5.     {
    6.         _threads[i].Start();
    7.         LOG(DEBUG, "thread - %d started \n", i + 1);
    8.     }
    9. }
    复制代码

  • GetInstance()

    • 获取线程池的单例实例。
    • 利用双重检查锁定机制确保线程安全。
    1. static ThreadPool<T> *GetInstance()
    2. {
    3.     if (_ptp == nullptr)
    4.     {
    5.         LockGuard lockguard(&_sig_mutex);
    6.         if (_ptp == nullptr)
    7.         {
    8.             LOG(INFO, "create threadspool\n");
    9.             _ptp = new ThreadPool<T>(DEFAULTNUM);
    10.             _ptp->Init();
    11.             _ptp->Start();
    12.         }
    13.     }
    14.     else
    15.     {
    16.         LOG(INFO, "get threadspool\n");
    17.     }
    18.     return _ptp;
    19. }
    复制代码

  • Stop()

    • 制止线程池的运行。
    • 设置 _isrunning 为 false,并叫醒所有休眠的线程。
    1. void Stop()
    2. {
    3.     LockQueue();
    4.     _isrunning = false;
    5.     WakeUpAll();
    6.     UnLockQueue();
    7. }
    复制代码

  • PushTask(const T &in)

    • 向任务队列中添加任务。
    • 如果线程池正在运行,添加任务到任务队列,并叫醒一个休眠的线程。
    1. void PushTask(const T &in)
    2. {
    3.     LockQueue();
    4.     if (_isrunning == true)
    5.     {
    6.         _taskqueue.push(in);
    7.         if (_sleep_num > 0)
    8.             WakeUp();
    9.     }
    10.     UnLockQueue();
    11. }
    复制代码

  • 析构函数 ~ThreadPool()

    • 销毁线程池,等待所有线程竣事并释放资源。
    • 销毁互斥锁和条件变量。
    1. ~ThreadPool()
    2. {
    3.     for (int i = 0; i < _threadnum; ++i)
    4.     {
    5.         LOG(INFO, "thread-%d joined\n", i + 1);
    6.         _threads[i].Join();
    7.     }
    8.     pthread_mutex_destroy(&_taskqueue_mutex);
    9.     pthread_cond_destroy(&_taskqueue_cond);
    10. }
    复制代码

静态成员初始化



  • 静态成员变量 _ptp 和 _sig_mutex 在类外举行初始化。
  1. template <class T>
  2. ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;
  3. template <class T>
  4. pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码
六、总结

   ThreadPool 类实现了一个线程池的基本功能,固然你也可以在这个线程池的根本上扩充实现自己需要的功能。
计划中利用单例模式确保只有一个线程池实例,利用互斥锁和条件变量保证多线程环境下的线程安全,利用日记记录监控线程池的运行状态。
  
完~
转载请注明出处



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

写过一篇

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表