在相识了线程的根本概念和线程互斥与同步之后,我们可以以此计划一个简朴的线程池。【Linux】线程-CSDN博客
【Linux】线程同步与互斥-CSDN博客
线程池也是一种池化技能。提前申请一些线程,等待有任务时就直接让线程去实行,不消再收到任务之后再创建线程。
一.日志 计划
以往,我们多线程在向表现器打印信息时,会出现信息稠浊的征象。这是由于多线程向表现器打印信息时,表现器是一种临界资源。访问临界资源应该对其举行保护,否则就会出现数据不同等。为了办理该征象,我们可以计划处一个日志 类,打印信息时都使用该类,以是,我们得包管该类打印信息是原子的。
1.计谋模式
计谋模式(Strategy Pattern)是一种活动计划模式,它使你能在运行时改变对象的活动。其紧张头脑是将算法或活动封装到独立的类中,这些类称为计谋类。上下文类(Context)使用计谋类来实行特定的算法或活动,而客户端可以根据必要选择差别的计谋。
我们可以根据计谋模式计划出差别的革新计谋,好比向表现器革新,大概向指定路径的指定文件革新。
而计谋模式的具体实现方式就是先实现一个计谋类,内里包罗了一个虚函数,该虚函数是将来要实行的活动大概算法。
然后我们再通过继续的方式具体的实现某一个种计谋。
- namespace MyLog
- {
- using namespace MutexModule;
- #define gap "\r\n"
- // 策略模式——刷新策略
- // 虚基类
- class logstrategy
- {
- public:
- ~logstrategy() = default;
- virtual void synclog(const std::string &message) = 0;
- };
- // 刷新策略1--->向显示器刷新
- class consolelogstrategy : public logstrategy
- {
- public:
- ~consolelogstrategy() {}
- void synclog(const std::string &message) override
- {
- // 向显示器刷新需要加锁
- mutexguard lock(_mutex);
- std::cout << message << gap;
- }
- private:
- Mutex _mutex;
- };
- // 刷新策略2--->向指定文件里刷新
- const std::string defaultPath = "./log";
- const std::string defaultName = "log.log";
- class filelogstrategy : public logstrategy
- {
- public:
- filelogstrategy(const std::string &path = defaultPath, const std::string &name = defaultName)
- : _path(path),
- _file(name)
- {
- // 指定路径存在,直接返回;不存在,创建路径
- if (std::filesystem::exists(_path))
- {
- return;
- }
- try
- {
- std::filesystem::create_directories(_path);
- }
- catch (const std::filesystem::filesystem_error &e)
- {
- std::cerr << e.what() << '\n';
- }
- }
- ~filelogstrategy() {}
- void synclog(const std::string &message) override
- {
- // 向指定文件里打印, 向指定文件里面打印也得是原子的,得加锁
- mutexguard lock(_mutex);
- // 拼接路径+文件名
- std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file;
- // 向指定文件里面以追加方式写入
- std::ofstream out(filename, std::ios::app);
- out << message << gap;
- out.close();
- }
- private:
- std::string _path;
- std::string _file;
- Mutex _mutex;
- };
- }
复制代码 阐明:我们实现了两种革新计谋,向表现器革新、向指定路径的指定文件革新。但岂论哪种革新方式,我们都得包管是原子的,即任意时间只能有一个线程革新,如许就不会产生数据稠浊的情况。以是,这里我们实现原子性的方法是借助互斥锁。
2.日志 类
有了革新计谋之后,下一步便是处理处罚日志的具体内容了。这里我们渴望打印出来的日志包罗以下信息:
[时间][日志品级][历程pid][文件名][行号] - 日志正文
[2025-5-4 10:05:48][INFO][828670][thread.hpp][38]- create newthread-1 success
[2025-5-4 10:05:48][INFO][828670][thread.hpp][38]- create newthread-2 success
[2025-5-4 10:05:48][INFO][828670][thread.hpp][38]- create newthread-3 success
[2025-5-4 10:05:48][INFO][828670][thread.hpp][38]- create newthread-4 success
[2025-5-4 10:05:48][INFO][828670][thread.hpp][38]- create newthread-5 success
对于日志类来说,他起首得有本身的革新计谋,以是日志类包罗一个成员那就是革新计谋,并且我们得指定默认的革新计谋:
- class logger
- {
- public:
- logger()
- {
- // 默认使用显示器刷新策略
- UseConsoleStrategy();
- }
- ~logger() {}
- void UseConsoleStrategy() { _fflush_strategy = std::make_unique<consolelogstrategy>(); }
- void UseFileLogStrategy() { _fflush_strategy = std::make_unique<filelogstrategy>(); }
- private:
- std::unique_ptr<logstrategy> _fflush_strategy;
- }
复制代码 有了革新方式之后,我们下一步便是处理处罚日志内容了。这里我们接纳内部类的方式,实现日志内容的计划:
- // 获取时间
- std::string GetTime()
- {
- // 1.获取当前的时间戳
- time_t cur_time = time(nullptr);
- // 2.将时间戳转化为年月日-时分秒
- struct tm format_time;
- localtime_r(&cur_time, &format_time);
- char time_buffer[128] = {0};
- snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%02d:%02d",
- format_time.tm_year + 1900,
- format_time.tm_mon + 1,
- format_time.tm_mday,
- format_time.tm_hour,
- format_time.tm_min,
- format_time.tm_sec);
- return time_buffer;
- }
- // 日志等级
- enum class loglevel
- {
- DEBUG,
- INFO,
- WARINING,
- ERROR,
- FATAL
- };
- // 获取日志等级
- std::string loglevelToString(loglevel level)
- {
- switch (level)
- {
- case loglevel::DEBUG:
- return "DEBUG";
- case loglevel::INFO:
- return "INFO";
- case loglevel::WARINING:
- return "WARNING";
- case loglevel::ERROR:
- return "ERROR";
- case loglevel::FATAL:
- return "FATAL";
- default:
- return "UNKNOEN";
- }
- }
- // 内部类
- // 用来描述日志具体内容
- class logmessage
- {
- public:
- logmessage(loglevel &level, const std::string &name, int number, logger &logger)
- : _cur_time(GetTime()),
- _log_level(level),
- _file(name),
- _line_number(number),
- _pid(getpid()),
- _logger(logger)
- {
- // 将格式化信息写入ss字符串流中
- std::stringstream ss;
- ss << "[" << _cur_time << "]"
- << "[" << loglevelToString(_log_level) << "]"
- << "[" << _pid << "]"
- << "[" << _file << "]"
- << "[" << _line_number << "]"
- << "- ";
- // 从字符串中获取字符串
- _format_info = ss.str();
- }
- ~logmessage()
- {
- // 如果有刷新策略,在对象析构的时候进行刷新
- if (_logger._fflush_strategy)
- {
- _logger._fflush_strategy->synclog(_format_info);
- }
- }
- // 日志的主要内容
- template <typename T>
- logmessage &operator<<(const T &message)
- {
- std::stringstream ss;
- ss << message;
- _format_info += ss.str();
- return *this;
- }
- private:
- std::string _cur_time;
- loglevel _log_level;
- pid_t _pid;
- std::string _file;
- int _line_number;
- std::string _format_info;
- logger &_logger;
- }; // end of logmessage 内部类,用来处理日志的格式化内容以及主要内容
复制代码 有了以上内容,我们的日志类已经根本上实现了,但是我们还得再日志类中实现一个仿函数,该仿函数的返回值是内部类范例,有了内部类范例,我们就可以根据内部重载的<<运算符制作日志消息,末了在该内部类对象析构的时间举行革新即可。以是我们在返回内部类对象时返回暂时对象,并且不要接收,接纳匿名的方式,如许它的声明周期就只有1行,该行竣事就会自动革新了。
- // logger类内成员
- public:
- logmessage operator()(loglevel level, const std::string &file, int line)
- {
- return logmessage(level, file, line, *this);
- }
复制代码 为了方便使用,我们直接在命名空间中,界说一个全局的logger对象,使用日志类的时间,直接使用该全局对象。全局对象访问仿函数来实现日志的构成和打印。以是我们的调用方式就变为:
- Glogger(loglevel, filename, linenumber) << "xxx" << "xxx" << ...;
复制代码 但是如许还是不太优雅,我们还得手动设置文件名和行号。我们可以使用宏来简化使用。
- #define LOG(level) Glogger(level, __FILE__, __LINE__)
- #define USE_CONSOLE_STARATEGY Glogger.UseConsoleStrategy()
- #define USE_FILE_LOG_STARATRGY Glogger.UseFileLogStrategy()
复制代码 二.线程池
线程池作为一种池化技能,可以提前申请好资源,当数据大概任务到来时,直接行止理,不消在创建线程了。
计划方案:
- 线程池要在创建的时间创建出多个线程,我们用数组将全部的线程管理起来。
- 除了线程外,还得有任务,以是我们还得有一个任务队列。
- 在处理处罚任务时,和添加任务时都得是原子的,以是还得有互斥锁。
- 当任务队列为空时,但线程池还没有竣事,以是我们得让全部的线程等待,以是还得有条件变量。
在创建线程池的时间,直接在构造函数创建n个线程即可,由于创建线程必要指定实行的方法,以是我们实现一个handler方法,用来让创建出的线程去实行。
- ThreadPool(const int threads = defaultThreadSize)
- :_num(threads), _isRunning(true), _sleepernumber(0)
- {
- // 创建_num个线程
- for(int i=0; i<_num; i++){
- _threads.emplace_back(
- [this](){
- Handler();
- }
- );
- }
- }
复制代码 而对于handler方法来说,全部的线程都用从任务队列中获取任务,但任务队列作为临界资源,同一时间只能有一个线程访问,以是我们必须得加锁。
但是另有一个标题,当线程池竣事的时间,如果此时另有线程在等待,我们就应该叫醒它们,否则就会导致内存走漏标题。以是,在判断线程必要等待时,需满意两个条件,线程池没有竣事,并且没有任务,才必要等待,否则直接实行反面的代码。
在实行反面的代码时,我们必要判断线程池是否竣事,如果竣事了,并且没有任务,则直接让线程退出,否则实行完任务,在退出。
当线程拿到任务之后,就可以开释锁了,由于此时该任务已经属于该线程私有的了,如果再持有锁,就得等任务实行完才气获取下一个任务,导致服从底下。
- void Handler()
- {
- // 获取线程名字
- char name[128] = { 0 };
- pthread_getname_np(pthread_self(), name, sizeof(name));
- // 从任务队列中获取任务
- while(true){
- T t;
- {
- // 加锁访问任务队列,任意时刻只能有一个线程访问任务队列
- mutexguard lock(_mutex);
- // 当线程池终止了,但有可能还有线程再等待,此时已经没有任务,其他的线程都已经被回收了,这些线程会导致内存泄露
- // 但是如果直接叫醒所有线程,它们不会退出循环,而是继续等待
- // 所以在进行等待的时候,要判断线程池是否还在运行,如果已经结束,并且任务队列为空,则不需要等待
- // 一个不满足,就必须等待
- while(_isRunning && _taskManager.empty()){
- // 任务队列为空,线程进行等待
- _sleepernumber++;
- _cond.Wait(_mutex);
- _sleepernumber--;
- }
- // 当线程池已经终止了&&任务队列为空,就让线程结束
- if(!_isRunning && _taskManager.empty()){
- LOG(loglevel::INFO) << name << "退出";
- break;
- }
- // 获取任务
- t = _taskManager.front();
- _taskManager.pop();
- }
- // 执行任务
- // 当一个线程加锁拿出任务后,这个任务已经从任务队列中消失了,只属于该线程私有,所以先解锁,再执行,提高效率。
- t();
- }
- }
复制代码 添加任务也会访问临界资源任务队列,以是也得加锁,当然也得包管线程池还在运行,否则就不添加。并且,添加之后,就有任务了,我们判断此时是否有线程再等待,如果有,则叫醒,让其获取任务。
- // 向任务队列中新增任务
- bool emplace(const T& task)
- {
- // 任意时刻,都只允许只有一个线程插入任务
- mutexguard lock(_mutex);
- if(!_isRunning) return false;
- _taskManager.emplace(task);
- if(_sleepernumber){
- WakeUpOne();
- }
- return true;
- }
- void WakeUpOne()
- {
- LOG(loglevel::INFO) << "唤醒一个线程";
- _cond.signal();
- }
复制代码 我们还得有接口,让线程池克制。克制运行之后,如果另有任务就继续实行,没有任务了,就让线程退出。但由于有大概另有线程再等待,它们收不到任务了,如果还等待的化,就会导致内存泄漏标题,以是,再克制线程池之后,我们必要叫醒全部的线程。
- void WakeUpAll()
- {
- if(_sleepernumber){
- LOG(loglevel::INFO) << "唤醒所有线程";
- _cond.broadcast();
- }
- }
- // 让线程池终止
- void Stop()
- {
- if(!_isRunning) return;
- _isRunning = false;
- LOG(loglevel::INFO) << "线程池已经被终止";
- // 线程池结束就让所有等待的线程苏醒,否则它们不会退出
- WakeUpAll();
- }
- // 回收线程
- void Join()
- {
- if(_isRunning) return;
- for(auto& thread : _threads){
- thread.Join();
- }
- }
复制代码 有了以上接口,我们的线程池就可以运行起来了。但是,如果在内存中同时存在多个线程池的话,就会导致资源提前被申请,导致反面来的任务申请不到线程了。也有大概线程池许多,但处理处罚的热任务很少,就会导致资源浪费标题。
以是,我们渴望,线程池只能被实例化出一份,即内存中只答应有一个线程池。借此,我们来引出,单例模式线程池。
1.单例模式线程池
所谓单例模式,实在就是一个类只能实例化出一个对象。
而实现单例模式有两中方案:饿汉模式和懒汉模式。
- 饿汉模式:在将代码加载到内存中时,就已经初始化了该对象
- 懒汉模式:在代码加载到内存中时,只初始化一个该类对象的指针,并不具体实例化。认真正使用的时间,在举行实例化
在一个类比较大的时间,在加载的时间直接创建对象比较耗时
懒汉模式接纳延时创建技能,就可以加速启动历程的时间
在内核中,我们使用malloc申请内存空间,实在就使用了懒汉模式,先给你虚拟地点空间,当你使用该虚拟地点空间的时间,再给你从内存中开发,并构建映射关系
我们这里接纳懒汉模式实现单例:
起首,单例模式只能实例化一个对象,以是我们不应该将构造、拷贝构造,赋值函数等袒暴露来。我们在类内界说一个静态的该类对象的指针。由于静态对象是全局的,以是在代码加载到内存中时,他就已经被创建了,但由于我们创建的是指针,以是还没有真正意义上创建对象。
- static ThreadPool<T>* _inc; // 未来实例化出的对象
- static Mutex _sm; // 用来实现单例模式
复制代码 我们提供一个静态函数,用来初始化静态对象,初始化该静态对象肯定得是原子的,要否则如果该函数被多线程同时访问,就有大概创建多个对象。
- static ThreadPool<T>* Getinstance(int threadsize = defaultThreadSize)
- {
- LOG(loglevel::DEBUG) << "获取线程池单例...";
- if(!_inc){
- mutexguard lock(_sm);
- if(!_inc){
- LOG(loglevel::INFO) << "线程池单例创建....";
- _inc = new ThreadPool<T>(threadsize);
- }
- }
- return _inc;
- }
复制代码 我们这里接纳双if判断,来进步获取单例的运行服从。如果没有外层的if,每一个线程都得先申请锁,然后再判断,申请锁的时间什么都做不了。就算我们单例创建好了,下一次还得申请锁,在判断。
以是我们额外添加一个if判断,单例还没有创建的时间确实没有厘革,但对有已经有了单例来说,就可以让其他线程提前退出,获取到单例。
- #ifndef __ThreadPool__HPP__#define __ThreadPool__HPP__#include <iostream>#include <queue>#include "thread.hpp"#include "log.hpp"#include "mutex.hpp"#include "cond.hpp"namespace ThreadPoolModule{ using namespace MyThread; using namespace MutexModule; using namespace MyCond; using namespace MyLog; // 默认使用5个线程的线程池 const int defaultThreadSize = 5; template <typename T> class ThreadPool { private: void WakeUpAll() { if(_sleepernumber){ LOG(loglevel::INFO) << "叫醒全部线程"; _cond.broadcast(); } } void WakeUpOne() { LOG(loglevel::INFO) << "叫醒一个线程"; _cond.signal(); } // 同一时间,内存中不必要存在多个线程池 // 使用单例模式来控制该历程池只能实例化出一个对象:单例模子即一个类只能实例化一个对象 // 单例模式有两种实现方式:饿汉模子和懒汉模式 // 饿汉模式:在将代码加载到内存中时,就已经初始化了该对象 // 懒汉模式:在代码加载到内存中时,只初始化一个该类对象的指针,并不具体实例化。认真正使用的时间,在举行实例化 // 在一个类比较大的时间,在加载的时间直接创建对象比较耗时 // 懒汉模式接纳延时创建技能,就可以加速启动历程的时间 // 在内核中,我们使用malloc申请内存空间,实在就使用了懒汉模式,先给你虚拟地点空间,当你使用该虚拟地点空间的时间,再给你从内存中开发,并构建映射关系 // 由于单例模式只能创建一个对象,以是不应该将类的构造,拷贝构造,赋值重载函数公开 ThreadPool(const int threads = defaultThreadSize)
- :_num(threads), _isRunning(true), _sleepernumber(0)
- {
- // 创建_num个线程
- for(int i=0; i<_num; i++){
- _threads.emplace_back(
- [this](){
- Handler();
- }
- );
- }
- } ThreadPool(const ThreadPool& tp) = delete; ThreadPool operator=(const ThreadPool& tp) = delete; public: // 有大概有多个实行流进入该函数,但是只能创建一个对象 static ThreadPool<T>* Getinstance(int threadsize = defaultThreadSize)
- {
- LOG(loglevel::DEBUG) << "获取线程池单例...";
- if(!_inc){
- mutexguard lock(_sm);
- if(!_inc){
- LOG(loglevel::INFO) << "线程池单例创建....";
- _inc = new ThreadPool<T>(threadsize);
- }
- }
- return _inc;
- } ~ThreadPool(){} void Handler()
- {
- // 获取线程名字
- char name[128] = { 0 };
- pthread_getname_np(pthread_self(), name, sizeof(name));
- // 从任务队列中获取任务
- while(true){
- T t;
- {
- // 加锁访问任务队列,任意时刻只能有一个线程访问任务队列
- mutexguard lock(_mutex);
- // 当线程池终止了,但有可能还有线程再等待,此时已经没有任务,其他的线程都已经被回收了,这些线程会导致内存泄露
- // 但是如果直接叫醒所有线程,它们不会退出循环,而是继续等待
- // 所以在进行等待的时候,要判断线程池是否还在运行,如果已经结束,并且任务队列为空,则不需要等待
- // 一个不满足,就必须等待
- while(_isRunning && _taskManager.empty()){
- // 任务队列为空,线程进行等待
- _sleepernumber++;
- _cond.Wait(_mutex);
- _sleepernumber--;
- }
- // 当线程池已经终止了&&任务队列为空,就让线程结束
- if(!_isRunning && _taskManager.empty()){
- LOG(loglevel::INFO) << name << "退出";
- break;
- }
- // 获取任务
- t = _taskManager.front();
- _taskManager.pop();
- }
- // 执行任务
- // 当一个线程加锁拿出任务后,这个任务已经从任务队列中消失了,只属于该线程私有,所以先解锁,再执行,提高效率。
- t();
- }
- } // 让线程池克制 void Stop() { if(!_isRunning) return; _isRunning = false; LOG(loglevel::INFO) << "线程池已经被克制"; // 线程池竣事就让全部等待的线程清醒,否则它们不会退出 WakeUpAll(); } // 回收线程 void Join() { if(_isRunning) return; for(auto& thread : _threads){ thread.Join(); } } // 向任务队列中新增任务 bool emplace(const T& task) { // 任意时间,都只答应只有一个线程插入任务 mutexguard lock(_mutex); if(!_isRunning) return false; _taskManager.emplace(task); if(_sleepernumber){ WakeUpOne(); } return true; } private: std::vector<Thread> _threads; // 线程池 int _num; // 线程个数 std::queue<T> _taskManager; // 任务队列 Mutex _mutex; // 互斥锁 cond _cond; // 信号量 bool _isRunning; // 线程池是否运行 int _sleepernumber; // 当前等待的线程个数 static ThreadPool<T>* _inc; // 将来实例化出的对象 static Mutex _sm; // 用来实现单例模式 }; // 初始化静态成员 template <typename T> ThreadPool<T>* ThreadPool<T>::_inc = nullptr; template <typename T> Mutex ThreadPool<T>::_sm;}#endif
复制代码 三.重入和线程安全
如果函数是可重入的,那么它就是线程安全的。
线程安全不肯定是可重入的,而可重入的肯定是线程安全的。
四.死锁
死锁是指一组线程中,都占据本身的资源,同时又向使用对方的资源,如许导致线程相互申请无法推进线程运行的征象就叫做死锁。
简朴来说,线程A想要访问的资源必须同时持有锁1和锁2,线程B也一样。但此时线程A持有锁1,线程B持有锁2.而它们又同时访问对方的锁,如许就导致谁都申请不到锁,导致壅闭挂起。
1.死锁的四个须要条件
- 0x1.互斥条件:一个临界资源只能被一个实行流访问
- 0x2.哀求与保持条件:一个实行流由于哀求而导致壅闭时,对已有的资源不开释
- 0x3.不剥夺条件:一个实行流已得到的资源,在未使用完前,不可被劫掠
- 0x4.循环等待条件:多少个实行流,接纳循环的申请对方的资源,导致了头尾衔接的等待资源关系。
2.克制死锁
死锁产生上面四种条件必须同时具有,以是我们只必要粉碎此中的条件即可,死锁就不会建立!!!!
办理方案1:我们可以使用trylock来申请锁,在申请另一个锁时,发现申请失败,就可以开释掉当前的锁,来让其他人获取。
当然另有其他方法来克制死锁,可以自行相识。
以上,便是单例线程池的全部内容!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |