1.概念介绍
线程池是一种多线程处理形式,它维护着多个线程,这些线程处于等待状态,随时准备担当任务并实行。线程池的主要目的是为了提高系统的性能和资源使用率,避免在处理短时间任务时频仍创建和烧毁线程所带来的开销。
线程池的优点
- 提高性能:避免了频仍创建和烧毁线程的开销,因为线程的创建和烧毁是比力耗时的操作。
- 控制资源:可以限制线程的数目,防止过多的线程竞争系统资源,导致系统性能下降甚至崩溃。
- 提高响应性:可以或许更快地响应新的任务请求,因为线程已经准备好,无需等待线程创建。
- 可管理性:线程池可以统一管理、分配、调优和监控此中的线程。
线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比力短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常符合的。因为单个任务小,而任务数目巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器敏捷响应客户请求。
- 担当突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,固然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
2.线程池的实现
起首,为了方便使用互斥锁,条件变量和线程,我们需要将这些封装起来。
Mutex.hpp对线程进行封装,代码如下:
封装互斥锁
- #include <iostream>
- #include <pthread.h>
- using namespace std;
- class Mutex
- {
- public:
- Mutex(const Mutex&)=delete;
- const Mutex& operator=(const Mutex&)=delete;
- Mutex()
- {
- pthread_mutex_init(&_lock,nullptr);
- }
- ~Mutex()
- {
- pthread_mutex_destroy(&_lock);
- }
- void Lock()
- {
- pthread_mutex_lock(&_lock);
- }
- pthread_mutex_t * LockPtr()
- {
- return &_lock;
- }
- void Unlock()
- {
- pthread_mutex_unlock(&_lock);
- }
- private:
- pthread_mutex_t _lock;
- };
- class LockGuard
- {
- public:
- LockGuard(Mutex& m)
- :_mutex(m)
- {
- _mutex.Lock();
- }
- ~LockGuard()
- {
- _mutex.Unlock();
- }
- private:
- Mutex& _mutex;
- };
复制代码 封装条件变量
Cond.hpp对线程进行封装,代码如下:
- #include"Mutex.hpp"
- class Cond
- {
- public:
- Cond()
- {
- pthread_cond_init(&_cond,nullptr);
- }
- ~Cond()
- {
- pthread_cond_destroy(&_cond);
- }
- void Wait(Mutex& mutex)
- {
- pthread_cond_wait(&_cond,mutex.LockPtr());
- }
- void Notify()
- {
- pthread_cond_signal(&_cond);
- }
- void NotifyAll()
- {
- pthread_cond_broadcast(&_cond);
- }
- private:
- pthread_cond_t _cond;
- };
复制代码 封装线程
Thread.hpp对线程进行封装,代码如下:
- #include <pthread.h>
- #include <iostream>
- #include <functional>
- #include <string>
- #include <unistd.h>
- using namespace std;
- using func_t = function<void(string)>;
- static int number = 1;
- enum STATUS
- {
- NEW,
- RUNNING,
- STOP
- };
- class Thread
- {
- private:
- static void *Routine(void *arg)
- {
- Thread *t = static_cast<Thread *>(arg);
- t->_func(t->_name);
- return nullptr;
- }
- public:
- Thread(func_t func)
- : _func(func), _status(NEW), _joinable(true)
- {
- _name = "Thread-" + to_string(number++);
- _pid = getpid();
- }
- bool Start()
- {
- if (_status != RUNNING)
- {
- _status = RUNNING;
- int n = pthread_create(&_tid, nullptr, Routine, this);
- if (n != 0)
- {
- return false;
- }
- return true;
- }
- return false;
- }
- bool Stop()
- {
- if (_status == RUNNING)
- {
- _status = STOP;
- int n = pthread_cancel(_tid);
- if (n != 0)
- {
- return false;
- }
- return true;
- }
- return false;
- }
- bool Join()
- {
- if (_joinable)
- {
- _status = STOP;
- int n = pthread_join(_tid, nullptr);
- if (n != 0)
- {
- return false;
- }
- return true;
- }
- return false;
- }
- void Detach()
- {
- _joinable = false;
- pthread_detach(_tid);
- }
- string Name()
- {
- return _name;
- }
- private:
- string _name;
- pthread_t _tid;
- pid_t _pid;
- STATUS _status;
- bool _joinable;
- func_t _func;
- };
复制代码 线程的成员变量
- string _name;
- pthread_t _tid;
- pid_t _pid;
- STATUS _status;
- bool _joinable;
- func_t _func;
复制代码 我们知道创建线程的时间 pthread_create 函数原型如下
- int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
- void *(*start_routine) (void *), void *arg);
复制代码 此中实行的函数 start_routine 类型为void*(* ) (void*),即函数参数和返回值都为 void* 类型,而我们封装类传入的函数_func为 void(string) 类型,以是我们不能直接使用_func,而需要进行二次封装,通过类型为void*(* ) (void*) 的Routine函数封装使用_func函数,而 pthread_create 可以使用Routine函数。
- static void *Routine(void *arg)
- {
- Thread *t = static_cast<Thread *>(arg);
- t->_func(t->_name);
- return nullptr;
- }
- bool Start()
- {
- if (_status != RUNNING)
- {
- _status = RUNNING;
- int n = pthread_create(&_tid, nullptr, Routine, this);
- if (n != 0)
- {
- return false;
- }
- return true;
- }
- return false;
- }
复制代码 但是为什么写代码时我们要将Routine函数定义为静态函数呢?因为在类内定义时,Routine无形中多了一个参数,即this指针,现实上Routine函数类型为void*(* ) (thread<T>*,void*),这样就不满意使用 pthread_create 时需要的函数类型,以是我们需要把Routine函数定义为静态函数从而去掉第一个隐含参数。
但这样Routine函数就不可以访问类内的成员变量_func,以是我们在使用 pthread_create 时需要把 this 参数传已往,从而让Runfunc函数能访问到_func,从而实行_func函数。
封装线程池
线程池的成员变量
- vector<thread_t> _threads;
- int _num;
- int _wait_num;
- std::queue<T> _taskq; // 临界资源
- //控制器
- Mutex _lock;
- Cond _cond;
- bool _isrunning;
- static ThreadPool<T> *instance;
- static Mutex mutex; // 只用来保护单例
复制代码 线程池的主要组件包罗线程数组、任务队列和控制器。线程数组用来存放被创建的线程,任务队列将新任务添加到队列末了,并通知空闲线程可以从队列最前端取用任务实行,控制器管理着一个队列锁,其保护临界资源任务队列,包管线程间的互斥关系,以及一个信号量,包管线程间的同步关系。
线程池的实现通常包罗线程池初始化、任务提交、线程调度和线程烧毁等步调。
线程池初始化
- private:
- bool IsEmpty() { return _taskq.empty(); }
- void HandlerTask(string name)
- {
- cout << "线程: " << name << ", 进入HandlerTask的逻辑";
- while (true)
- {
- // 1. 拿任务
- T t;
- {
- LockGuard lockguard(_lock);
- while (IsEmpty() && _isrunning)
- {
- _wait_num++;
- _cond.Wait(_lock);
- _wait_num--;
- }
- // 2. 任务队列为空 && 线程池退出了
- if (IsEmpty() && !_isrunning)
- break;
- t = _taskq.front();
- _taskq.pop();
- }
- // 2. 处理任务
- t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
- }
- cout << "线程: " << name << " 退出";
- }
- ThreadPool(const ThreadPool<T> &) = delete;
- ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
- ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
- {
- for (int i = 0; i < _num; i++)
- {
- _threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
- cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
- }
- }
- public:
- static ThreadPool<T> *getInstance()
- {
- if (instance == NULL)
- {
- LockGuard lockguard(mutex);
- if (instance == NULL)
- {
- cout << "单例首次被执行,需要加载对象...";
- instance = new ThreadPool<T>();
- instance->Start();
- }
- }
- return instance;
- }
复制代码 任务提交
- void Equeue(T &in)
- {
- LockGuard lockguard(_lock);
- if (!_isrunning)
- return;
- _taskq.push(in);
- if (_wait_num > 0)
- _cond.Notify();
- }
复制代码 线程调度
- void Start()
- {
- if (_isrunning)
- return;
- _isrunning = true;
- for (auto &thread_ptr : _threads)
- {
- cout << "启动线程" << thread_ptr->Name() << " ... 成功";
- thread_ptr->Start();
- }
- }
复制代码 停止调度
- void Stop()
- {
- LockGuard lockguard(_lock);
- if (_isrunning)
- {
- _isrunning = false; // 不工作
- // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
- if (_wait_num > 0)
- _cond.NotifyAll();
- }
- }
复制代码 ThreadPool.hpp完整代码如下
- #include <iostream>#include <string>#include <queue>#include <vector>#include <memory>#include "Mutex.hpp"#include "Cond.hpp"#include "Thread.hpp"using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 5;template <class T>class ThreadPool{private:
- bool IsEmpty() { return _taskq.empty(); }
- void HandlerTask(string name)
- {
- cout << "线程: " << name << ", 进入HandlerTask的逻辑";
- while (true)
- {
- // 1. 拿任务
- T t;
- {
- LockGuard lockguard(_lock);
- while (IsEmpty() && _isrunning)
- {
- _wait_num++;
- _cond.Wait(_lock);
- _wait_num--;
- }
- // 2. 任务队列为空 && 线程池退出了
- if (IsEmpty() && !_isrunning)
- break;
- t = _taskq.front();
- _taskq.pop();
- }
- // 2. 处理任务
- t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
- }
- cout << "线程: " << name << " 退出";
- }
- ThreadPool(const ThreadPool<T> &) = delete;
- ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
- ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
- {
- for (int i = 0; i < _num; i++)
- {
- _threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
- cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
- }
- }
- public:
- static ThreadPool<T> *getInstance()
- {
- if (instance == NULL)
- {
- LockGuard lockguard(mutex);
- if (instance == NULL)
- {
- cout << "单例首次被执行,需要加载对象...";
- instance = new ThreadPool<T>();
- instance->Start();
- }
- }
- return instance;
- } void Equeue(T &in)
- {
- LockGuard lockguard(_lock);
- if (!_isrunning)
- return;
- _taskq.push(in);
- if (_wait_num > 0)
- _cond.Notify();
- } void Start()
- {
- if (_isrunning)
- return;
- _isrunning = true;
- for (auto &thread_ptr : _threads)
- {
- cout << "启动线程" << thread_ptr->Name() << " ... 成功";
- thread_ptr->Start();
- }
- } void Wait() { for (auto &thread_ptr : _threads) { thread_ptr->Join(); cout << "回收线程" << thread_ptr->Name() << " ... 成功"; } } void Stop()
- {
- LockGuard lockguard(_lock);
- if (_isrunning)
- {
- _isrunning = false; // 不工作
- // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
- if (_wait_num > 0)
- _cond.NotifyAll();
- }
- }private: vector<thread_t> _threads; int _num; int _wait_num; std::queue<T> _taskq; // 临界资源 Mutex _lock; Cond _cond; bool _isrunning; static ThreadPool<T> *instance; static Mutex mutex; // 只用来保护单例};template <class T>ThreadPool<T> *ThreadPool<T>::instance = NULL;template <class T>Mutex ThreadPool<T>::mutex; // 只用来保护单例
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |