标题:[多线程]基于单例懒汉模式的线程池的实现
@水墨不写bug
本文我们将要实现一个线程池,其中用到的一些模块如LockGuard,Log等在之前的文章中有所报告,以是本文就不再赘述。
一、锁的RAII计划
- #pragma once
- #include <pthread.h>
- // 使用锁需要频繁调用系统调用,十分麻烦
- // 于是实现锁的RAII设计
- class LockGuard
- {
- private:
- pthread_mutex_t *GetMutex() { return _mutex; }
- public:
- LockGuard(pthread_mutex_t *mutex)
- : _mutex(mutex)
- {
- pthread_mutex_lock(_mutex);
- }
- ~LockGuard()
- {
- pthread_mutex_unlock(_mutex);
- }
- private:
- pthread_mutex_t *_mutex;
- };
- /*
- *之前的理解错误了:不需要init和destroy,因为锁本身就是存在的!
- *锁在一般情况下会内置在类的内部,需要使用(加锁)的时候,把锁的地址传进来就行了
- *在构造函数内加锁,析构函数内解锁。
- *e.g.
- while(ture)
- {
- LockGuard lockguard(td->_mutex);
- if(tickets > 0)
- {
- //抢票
- }
- else
- {
- break;
- }
- }
- *while内每一次进行循环,都需要创建一个新的锁,创建即加锁,if代码块结束即为解锁
- */
- /*
- void UnLock() { pthread_mutex_unlock(&_mutex); }
- void Lock() { pthread_mutex_lock(&_mutex); }
- */
复制代码 以上代码是基于RAII(Resource Acquisition Is Initialization)计划模式的锁管理类LockGuard。
该类用于简化对pthread库的互斥锁(pthread_mutex_t)的利用,确保在加锁息争锁过程中的安全性和轻便性。
二、任务类举例
- #pragma once
- #include <functional>
- #include <iostream>
- using std::cout;
- using std::endl;
- // void DownLoad()
- // {
- // cout<<"I am a DownTask"<<endl;
- // }
- // 任务类,包含各种任务
- class Task
- {
- private:
- using task_t = std::function<void()>;
- void Excute()
- {
- _result = _x + _y;
- }
- public:
- Task()
- : _x(0), _y(0)
- {
- }
- Task(int a, int b)
- : _x(a), _y(b)
- {
- }
- void operator()()
- {
- Excute();
- }
- void Debug()
- {
- cout<<"Debug : "<<_x<<" + "<<_y<<" = "<<_result<<endl;
- }
- private:
- int _x;
- int _y;
- int _result;
- };
复制代码 这个类封装了一个具体的任务:两数相加。
尽管这是一项简单的任务(这样计划是为了便于理解线程池的部分),但是其实任务类可以根据用户的需要来自行计划,可以是下载资源,请求网页html等。
任务类的计划目标就是一个演示作用。
三、日记体系的计划
- #pragma once
- #include <string>
- #include <iostream>
- #include <unistd.h>
- #include <sys/types.h>
- #include <ctime>
- #include <cstdarg>
- #include <fstream>
- #include <cstring>
- #include "LockGuard.hpp"
- namespace log_ddsm
- {
- using std::cout;
- using std::endl;
- // 日志信息管理
- enum LEVEL
- {
- DEBUG = 1, // 调试信息
- INFO = 2, // 提示信息
- WARNING = 3, // 警告
- ERROR = 4, // 错误,但是不影响服务正常运行
- FATAL = 5, // 致命错误,服务无法正常运行
- };
- enum SIZE
- {
- TIMESIZE = 128,
- LOGSIZE = 1024,
- FILE_TYPE_LOG_SIZE = 2048
- };
- enum TYPE
- {
- SCREEN_TYPE = 8,
- FILE_TYPE = 16
- };
- // 默认日志文件名称
- const char *DEFAULT_LOG_NAME = "./log.txt";
- // 全局锁,保护打印日志
- pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
- struct logMessage
- {
- std::string _level; // 日志等级
- int _level_num; // 日志等级的int格式
- pid_t _pid; // 这条日志的进程id
- std::string _file_name; // 文件名
- int _file_number; // 行号
- std::string _cur_time; // 当时的时间
- std::string _log_info; // 日志正文
- };
- // 通过int获取日志等级
- std::string getLevel(int level)
- {
- switch (level)
- {
- case 1:
- return "DEBUG";
- case 2:
- return "INFO";
- case 3:
- return "WARNING";
- case 4:
- return "ERROR";
- case 5:
- return "FATAL";
- default:
- return "UNKNOWN";
- }
- }
- std::string getCurTime()
- {
- time_t cur = time(nullptr);
- struct tm *curtime = localtime(&cur);
- char buf[TIMESIZE] = {0};
- snprintf(buf, sizeof(buf), "%d-%d-%d %d:%d:%d",
- curtime->tm_year + 1900,
- curtime->tm_mon + 1,
- curtime->tm_mday,
- curtime->tm_hour,
- curtime->tm_min,
- curtime->tm_sec);
- return buf;
- }
- class Log
- {
- private:
- void FlushMessage(const logMessage &lg)
- {
- // 互斥锁,保护Print过程
- LockGuard lockguard(&gmutex);
- // 过滤逻辑
- // 致命错误到文件逻辑
- if (lg._level_num >= _ignore_level)
- {
- if (_print_type == SCREEN_TYPE)
- PrintToScreen(lg);
- else if (_print_type == FILE_TYPE)
- PrintToFile(lg);
- else
- std::cerr << __FILE__ << " " << __LINE__ << ":" << " UNKNOWN_TYPE " << std::endl;
- }
- }
- void PrintToScreen(const logMessage &lg)
- {
- printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
- lg._pid,
- lg._file_name.c_str(),
- lg._file_number,
- lg._cur_time.c_str(),
- lg._log_info.c_str());
- }
- void PrintToFile(const logMessage &lg)
- {
- std::ofstream out(_log_file_name, std::ios::app); // 追加打印
- if (!out.is_open())
- {
- std::cerr << __FILE__ << " " << __LINE__ << ":" << " LOG_FILE_OPEN fail " << std::endl;
- return;
- }
- char log_txt[FILE_TYPE_LOG_SIZE] = {0}; // 缓冲区
- snprintf(log_txt, sizeof(log_txt), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(),
- lg._pid,
- lg._file_name.c_str(),
- lg._file_number,
- lg._cur_time.c_str(),
- lg._log_info.c_str());
- out.write(log_txt, strlen(log_txt)); // 写入文件
- }
- public:
- // 打印方式——默认向显示器打印
- // 如果选择文件类型打印,默认的文件名称是当前目录的log.txt
- //
- Log(int print_type = SCREEN_TYPE)
- : _print_type(print_type), _log_file_name(DEFAULT_LOG_NAME), _ignore_level(DEBUG)
- {
- }
- // 只有一个全局对象,多个构造函数多余,不如设计一个构造,添加一个设置log文件名称的接口
- // Log(int print_type, const char *logname = DEFAULT_LOG_NAME)
- // : _print_type(print_type), _log_file_name(logname), _ignore_level(1)
- // {
- // }
- void Enable(int type)
- {
- _print_type = type;
- }
- /// @brief 加载日志信息
- /// @param format 格式化输出
- /// @param 可变参数
- /*区分了本层和商城之后,就容易设置参数了*/
- void load_message(int level, std::string filename, int filenumber, const char *format, ...)
- {
- logMessage lg;
- lg._level = getLevel(level);
- lg._level_num = level;
- lg._pid = getpid();
- lg._file_name = filename;
- lg._file_number = filenumber;
- lg._cur_time = getCurTime();
- // valist + vsnprintf 处理可变参数
- va_list ap;
- va_start(ap, format);
- char log_info[LOGSIZE] = {0};
- vsnprintf(log_info, sizeof(log_info), format, ap);
- va_end(ap);
- lg._log_info = log_info;
- // 打印逻辑
- FlushMessage(lg);
- }
- /// @brief
- /// @param ignorelevel
- /* DEBUG = 1, 调试信息
- *INOF = 2, 提示信息
- *WARNING = 3, 警告
- *ERROR = 4, 错误,但是不影响服务正常运行
- *FATAL = 5, 致命错误,服务无法正常运行 */
- void SetIgnoreLevel(int ignorelevel)
- {
- _ignore_level = ignorelevel;
- }
- void SetLogFileName(const char *newlogfilename)
- {
- _log_file_name = newlogfilename;
- }
- ~Log() {}
- private:
- int _print_type;
- std::string _log_file_name;
- int _ignore_level;
- };
- // 全局的Log对象
- Log lg;
- // 打印日志信息
- // e.g.
- // LOG(DEBUG,"this is a test message %d %f %c", 13 , 9.81 , 'A' );
- //
- // 使用日志的一般格式
- #define LOG(Level, Format, ...) \
- do \
- { \
- lg.load_message(Level, __FILE__, __LINE__, Format, ##__VA_ARGS__); \
- } while (0)
- /// 无法设计为inlne——__VA_ARGS只能出现在宏替换中
- // inline void LOG(int level,const char* format, ...)
- // {
- // lg.load_message(level,__FILE__,__LINE__,format,__VA_ARGS__);
- // }
- // 往文件打印
- #define ENABLE_FILE() \
- do \
- { \
- lg.Enable(FILE_TYPE); \
- } while (0)
- // 往显示器打印
- #define ENABLE_SCREEN() \
- do \
- { \
- lg.Enable(SCREEN_TYPE); \
- } while (0)
- //设置日志忽略等级
- #define SET_IGNORE_LEVEL(Level) \
- do \
- { \
- lg.SetIgnoreLevel(Level); \
- } while (0)
- //设置日志文件名称
- #define SET_LOG_FILENAME(Name) \
- do \
- { \
- lg.SetLogFileName(Name); \
- } while (0)
- }; // namespace log_ddsm
复制代码 以上是日记体系的计划,具体的报告请参考我之前的这篇文章:
《[HTTP协议]应用层协议HTTP从入门到深刻理解并落地部署自己的云服务(2)实操部署》:
https://blog.csdn.net/2301_79465388/article/details/146114051?spm=1001.2014.3001.5501
四、基于POSIX线程封装自己的线程模块
- #pragma once
- #include <pthread.h>
- #include <vector>
- #include <functional>
- #include <iostream>
- #include <cassert>
- #include <cstring>
- #include "Log.hpp"
- using namespace log_ddsm;
- // 理解C++11做的工作,封装自己的原生线程
- // 创建线程,执行任务
- namespace ThreadMoudle
- {
- // 支持模板
- using func_t = std::function<void(const std::string &name)>;
- // 线程控制-自主管理原生线程
- class Thread
- {
- private:
- // 线程创建后例行执行的函数
- static void *Routine(void *args) // 设置为static,防止this指针干扰类型匹配
- {
- Thread *self = static_cast<Thread *>(args);
- self->Excute();
- return nullptr;
- }
- void Excute()
- {
- _isrunning = true;
- _func(_name);
- _isrunning = false;
- }
- public:
- /// @brief 构造线程
- /// @param name 线程名
- /// @param func 线程要执行的任务
- Thread(const std::string &name, func_t func)
- : _name(name), _tid(), _isrunning(false), _func(func)
- {
- }
- bool Start()
- {
- int res = ::pthread_create(&_tid, nullptr, Routine, this); // 要求类型:void*(void*)——有this指针导致类型不匹配
- if (res != 0)
- {
- LOG(ERROR, "pthread_create failed with error: %s\n", strerror(res));
- // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_creat: error!" << std::endl;
- // printf("pthread_create failed with error: %s\n", strerror(res));
- return false;
- }
- else
- {
- return true;
- }
- }
- bool Stop()
- {
- if (_isrunning)
- {
- int res = ::pthread_cancel(_tid);
- if (res != 0)
- {
- // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_cancel: error!" << std::endl;
- // printf("pthread_cancel failed with error: %s\n", strerror(res));
- LOG(ERROR, "pthread_cancel failed with error: %s\n", strerror(res));
- return false;
- }
- else
- {
- return true;
- }
- }
- //走到这里,说明线程已经stop了
- return true;
- }
- // Stop之后join
- bool Join()
- {
- if (!_isrunning)
- {
- int res = ::pthread_join(_tid, nullptr);
- if (res != 0)
- {
- // std::cout<<"Debug res:"<<res<<std::endl;
- // std::cerr << __FILE__ << " " << __LINE__ << ":" << "pthread_join: error!" << std::endl;
- // printf("pthread_join failed with error: %s\n", strerror(res));
- LOG(ERROR, "pthread_join failed with error: %s\n", strerror(res));
- return false;
- }
- else
- {
- return true;
- }
- }
- else
- {
- // Running时join
- LOG(INFO, "join while Running\n");
- // std::cerr << "join while Running" << std::endl;
- return false;
- }
- }
- ~Thread() {}
- bool Status() { return _isrunning; }
- std::string Name() { return _name; }
- private:
- std::string _name; // 线程的名字
- pthread_t _tid; // 线程的类型
- bool _isrunning; // 线程是否在运行
- func_t _func; // 线程将要执行的回调函数
- };
- }; // namespace ThreadMoudle
复制代码 以上代码实现了一个线程管理模块 ThreadMoudle,其中包含一个线程类 Thread。它封装了POSIX线程(pthread)的创建、启动、制止和加入(join)操纵,并提供了一些辅助功能。
计划要点
- 线程管理:
- 该模块封装了POSIX线程的基本操纵,提供了一个易于利用的接口来管理线程的创建、启动、制止和加入操纵。
- 可重用性和可扩展性:
- 通过利用std::function<void(const std::string &name)>,可以将恣意符合该签名的回调函数传递给线程对象,从而实现任务的多样性和机动性。
- 线程安全性:
- 通过内部状态标记(_isrunning)和得当的日记记录,确保线程的状态和操纵的一致性,制止了多次启动或制止同一线程的问题。
主要功能
- 线程创建和启动:
- Thread::Start():调用pthread_create创建一个新线程,并启动实行Routine函数。Routine函数将调用实例的Excute方法实行具体任务。
- Routine函数被声明为静态函数,以制止this指针干扰范例匹配。
- 线程实行具体任务:
- Thread::Excute():设置线程状态为运行中,并调用回调函数_func实行具体任务。
- 线程制止:
- Thread::Stop():调用pthread_cancel取消运行中的线程。对于不符合逻辑的情况:线程未运行,则记录错误日记。
- 线程加入:
- Thread::Join():调用pthread_join等待线程制止。对于不符合逻辑的情况:线程仍在运行,则记录错误日记。
- 日记记录:
- 利用日记模块(log_ddsm)记录线程操纵中的错误信息,便于调试和问题排查。
利用示例
- #include "Thread.hpp"
- void MyTask(const std::string &name)
- {
- std::cout << "Thread " << name << " is running." << std::endl;
- int count = 5;
- while(count--)
- {
- std::cout<< count<<std::endl;
- }
- }
- int main()
- {
- ThreadMoudle::Thread myThread("TestThread", MyTask);
- if (myThread.Start())
- {
- std::cout << "Thread started successfully." << std::endl;
- }
- // 等待线程完成任务
- myThread.Join();
- return 0;
- }
复制代码 结果:
在这个示例中:
- 创建一个名为"TestThread"的线程对象,并传递任务函数MyTask。
- 调用myThread.Start()启动线程。
- 等待线程完成任务后,调用myThread.Join()等待线程制止。
五、基于懒汉模式的线程池主体逻辑计划
- #pragma once
- #include <queue>
- #include <unistd.h>
- #include "Log.hpp"
- #include "Thread.hpp" //using pthread lib
- using std::cout;
- using std::endl;
- using ThreadMoudle::func_t;
- using namespace log_ddsm;
- enum NUM
- {
- DEFAULTNUM = 10
- };
- /// @brief
- /// @tparam T:任务类型
- //懒汉模式设计单例ThreadPool
- template <typename T>
- class ThreadPool
- {
- private:
- void LockQueue() // 加锁
- {
- pthread_mutex_lock(&_taskqueue_mutex);
- }
- void UnLockQueue() // 解锁
- {
- pthread_mutex_unlock(&_taskqueue_mutex);
- }
- void Sleep() // 在条件变量_taskqueue_cond下等待
- {
- pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
- }
- void WakeUp() // 叫醒一个进程
- {
- pthread_cond_signal(&_taskqueue_cond);
- }
- void WakeUpAll() // 叫醒所有线程
- {
- pthread_cond_broadcast(&_taskqueue_cond);
- }
- bool IsEmpty()
- {
- return _taskqueue.empty();
- }
- // thread将要执行的函数逻辑
- void HandlerTask(const std::string &name)
- {
- // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
- while (true)
- {
- LockQueue();
- /* 优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
- while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
- {
- _sleep_num++;
- Sleep();
- _sleep_num--;
- }
- if (IsEmpty() && !_isrunning)
- {
- UnLockQueue();
- LOG(DEBUG, "thread quit\n");
- // cout << "thread quit" << endl;
- break;
- }
- T t; // 从taskqueue中获取任务
- t = _taskqueue.front();
- _taskqueue.pop();
- UnLockQueue();
- // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
- // 不能在临界区中进行
- t();
- LOG(DEBUG, "%s", name.c_str());
- // cout << name << " : ";
- t.Debug();
- }
- }
- // 缺省个数——10
- ThreadPool(size_t threadnum)
- : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0) // 初始化列中调用has a的构造??
- {
- // 初始化
- pthread_mutex_init(&_taskqueue_mutex, nullptr);
- pthread_cond_init(&_taskqueue_cond, nullptr);
- }
- // 初始化线程池
- void Init()
- {
- func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
- for (int i = 0; i < _threadnum; ++i)
- {
- std::string name = "thread - " + std::to_string(i + 1);
- _threads.emplace_back(name, func); // 减少一次拷贝,直接在vector内构造
- // usleep(10000);
- // cout << "emplaced i+1 :" << i + 1 << endl;
- }
- }
- // 开始运行
- void Start()
- {
- _isrunning = true; // 单进程访问不需要保护
- for (int i = 0; i < _threadnum; ++i)
- {
- _threads[i].Start();
- LOG(DEBUG, "thread - %d started \n", i + 1);
- }
- }
- public:
- static ThreadPool<T> *GetInstance()
- {
- if (_ptp == nullptr)
- {
- LockGuard lockguard(&_sig_mutex);
- if (_ptp == nullptr)
- {
- LOG(INFO, "create threadspool\n");
- _ptp = new ThreadPool<T>(DEFAULTNUM);//线程安全问题
- _ptp->Init();
- _ptp->Start();
- }
- }
- else
- {
- LOG(INFO, "get threadspool\n");
- }
- return _ptp;
- }
- // 停止运行
- void Stop()
- {
- LockQueue();
- _isrunning = false;
- WakeUpAll();
- UnLockQueue();
- }
- // 向任务队列加入任务
- void PushTask(const T &in)
- {
- LockQueue();
- if (_isrunning == true) // 防止出现线程池想要退出但是 却被一直分配任务,导致无法退出
- {
- _taskqueue.push(in); // 加入任务
- if (_sleep_num > 0)
- WakeUp();
- }
- UnLockQueue();
- }
- ~ThreadPool()
- {
- for (int i = 0; i < _threadnum; ++i)
- {
- LOG(INFO, "thread-%d joined\n", i + 1);
- _threads[i].Join();
- }
- pthread_mutex_destroy(&_taskqueue_mutex);
- pthread_cond_destroy(&_taskqueue_cond);
- }
- private:
- size_t _threadnum; // 线程池中线程的个数
- std::vector<ThreadMoudle::Thread> _threads; // vector维护线程池
- std::queue<T> _taskqueue; // 任务队列
- bool _isrunning; // 标识线程池是否在运行
- /*多线程对taskqueue访问需要整体使用——加锁*/
- pthread_mutex_t _taskqueue_mutex; // 用于维护taskqueue的mutex cond
- pthread_cond_t _taskqueue_cond;
- int _sleep_num; // 休眠的线程个数
- static ThreadPool<T> *_ptp; // 单例对象
- static pthread_mutex_t _sig_mutex;
- };
- // 静态成员在类外初始化
- template <class T>
- ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;
- // 第一次保护获取单例的mutex
- template <class T>
- pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码 类 ThreadPool 的成员函数详解
- LockQueue() 和 UnLockQueue():
- 这两个函数分别用来加锁息争锁 _taskqueue_mutex 互斥锁。
- 目标是保护任务队列 _taskqueue 的访问,确保在多线程环境下对任务队列的操纵是线程安全的。
- void LockQueue() // 加锁
- {
- pthread_mutex_lock(&_taskqueue_mutex);
- }
- void UnLockQueue() // 解锁
- {
- pthread_mutex_unlock(&_taskqueue_mutex);
- }
复制代码
- Sleep() 和 WakeUp()、WakeUpAll():
- Sleep() 函数使线程在条件变量 _taskqueue_cond 上等待,等待条件满足时被叫醒。
- WakeUp() 函数叫醒一个在 _taskqueue_cond 上等待的线程。
- WakeUpAll() 函数叫醒所有在 _taskqueue_cond 上等待的线程。
- 这些函数用于实现线程休眠和叫醒机制,当任务队列为空时,线程进入休眠状态;当有新任务加入时,叫醒线程实行任务。
- void Sleep() // 在条件变量_taskqueue_cond下等待
- {
- pthread_cond_wait(&_taskqueue_cond, &_taskqueue_mutex);
- }
- void WakeUp() // 叫醒一个进程
- {
- pthread_cond_signal(&_taskqueue_cond);
- }
- void WakeUpAll() // 叫醒所有线程
- {
- pthread_cond_broadcast(&_taskqueue_cond);
- }
复制代码
- IsEmpty():
- 检查任务队列是否为空。
- 返回布尔值,表现任务队列是否为空。
- bool IsEmpty()
- {
- return _taskqueue.empty();
- }
复制代码
- HandlerTask(const std::string &name):
- 线程实行的主要函数逻辑。
- 线程在循环中等待任务队列中的任务,当有任务时,从任务队列中取出任务并实行。
- 当任务队列为空且线程池不再运行时,线程退出循环,竣事实行。
- // thread将要执行的函数逻辑
- void HandlerTask(const std::string &name)
- {
- // 一直循环:等待任务——执行任务——等待任务,直到运行到break逻辑
- while (true)
- {
- LockQueue();
- /* 优先级上,先考虑任务队列不为空,只有任务队列为空,再考虑是否要线程退出*/
- while (IsEmpty() && _isrunning) // 没有任务则等待,while——在唤醒之后先判断是否队列仍然有任务,否则再次等待
- {
- _sleep_num++;
- Sleep();
- _sleep_num--;
- }
- if (IsEmpty() && !_isrunning)
- {
- UnLockQueue();
- LOG(DEBUG, "thread quit\n");
- break;
- }
- T t; // 从taskqueue中获取任务
- t = _taskqueue.front();
- _taskqueue.pop();
- UnLockQueue();
- // 执行任务,在解锁之后进行,执行任务要与从队列中获取任务并发
- // 不能在临界区中进行
- t();
- LOG(DEBUG, "%s", name.c_str());
- // cout << name << " : ";
- t.Debug();
- }
- }
复制代码
- 构造函数 ThreadPool(size_t threadnum):
- 初始化线程池,包括线程数量、任务队列、互斥锁和条件变量。
- 设置 _threadnum 为线程池中的线程数量,初始化其他成员变量。
- ThreadPool(size_t threadnum)
- : _threadnum(threadnum), _taskqueue(), _isrunning(false), _sleep_num(0)
- {
- pthread_mutex_init(&_taskqueue_mutex, nullptr);
- pthread_cond_init(&_taskqueue_cond, nullptr);
- }
复制代码
- Init():
- 初始化线程池中的线程。
- 创建 thread 对象,并将 HandlerTask 绑定为线程实行的任务函数。这一操纵的目标是为了使emplace_back传入Thread构造函数的范例匹配。
- void Init()
- {
- func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
- for (int i = 0; i < _threadnum; ++i)
- {
- std::string name = "thread - " + std::to_string(i + 1);
- _threads.emplace_back(name, func);
- }
- }
复制代码
- Start():
- 启动线程池中的线程。
- 设置 _isrunning 为 true,然后启动线程池中每个线程。
- void Start()
- {
- _isrunning = true;
- for (int i = 0; i < _threadnum; ++i)
- {
- _threads[i].Start();
- LOG(DEBUG, "thread - %d started \n", i + 1);
- }
- }
复制代码
- GetInstance():
- 获取线程池的单例实例。
- 利用双重检查锁定机制确保线程安全。
- static ThreadPool<T> *GetInstance()
- {
- if (_ptp == nullptr)
- {
- LockGuard lockguard(&_sig_mutex);
- if (_ptp == nullptr)
- {
- LOG(INFO, "create threadspool\n");
- _ptp = new ThreadPool<T>(DEFAULTNUM);
- _ptp->Init();
- _ptp->Start();
- }
- }
- else
- {
- LOG(INFO, "get threadspool\n");
- }
- return _ptp;
- }
复制代码
- Stop():
- 制止线程池的运行。
- 设置 _isrunning 为 false,并叫醒所有休眠的线程。
- void Stop()
- {
- LockQueue();
- _isrunning = false;
- WakeUpAll();
- UnLockQueue();
- }
复制代码
- PushTask(const T &in):
- 向任务队列中添加任务。
- 如果线程池正在运行,添加任务到任务队列,并叫醒一个休眠的线程。
- void PushTask(const T &in)
- {
- LockQueue();
- if (_isrunning == true)
- {
- _taskqueue.push(in);
- if (_sleep_num > 0)
- WakeUp();
- }
- UnLockQueue();
- }
复制代码
- 析构函数 ~ThreadPool():
- 销毁线程池,等待所有线程竣事并释放资源。
- 销毁互斥锁和条件变量。
- ~ThreadPool()
- {
- for (int i = 0; i < _threadnum; ++i)
- {
- LOG(INFO, "thread-%d joined\n", i + 1);
- _threads[i].Join();
- }
- pthread_mutex_destroy(&_taskqueue_mutex);
- pthread_cond_destroy(&_taskqueue_cond);
- }
复制代码
静态成员初始化
- 静态成员变量 _ptp 和 _sig_mutex 在类外举行初始化。
- template <class T>
- ThreadPool<T> *ThreadPool<T>::_ptp = nullptr;
- template <class T>
- pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码 六、总结
ThreadPool 类实现了一个线程池的基本功能,固然你也可以在这个线程池的根本上扩充实现自己需要的功能。
计划中利用单例模式确保只有一个线程池实例,利用互斥锁和条件变量保证多线程环境下的线程安全,利用日记记录监控线程池的运行状态。
完~
转载请注明出处
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |