Linux 线程池

打印 上一主题 下一主题

主题 820|帖子 820|积分 2460

1.概念介绍

线程池是一种多线程处理形式,它维护着多个线程,这些线程处于等待状态,随时准备担当任务并实行。线程池的主要目的是为了提高系统的性能和资源使用率,避免在处理短时间任务时频仍创建和烧毁线程所带来的开销。
线程池的优点

  • 提高性能:避免了频仍创建和烧毁线程的开销,因为线程的创建和烧毁是比力耗时的操作。
  • 控制资源:可以限制线程的数目,防止过多的线程竞争系统资源,导致系统性能下降甚至崩溃。
  • 提高响应性:可以或许更快地响应新的任务请求,因为线程已经准备好,无需等待线程创建。
  • 可管理性:线程池可以统一管理、分配、调优和监控此中的线程。
线程池的应用场景

  • 需要大量的线程来完成任务,且完成任务的时间比力短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常符合的。因为单个任务小,而任务数目巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  • 对性能要求苛刻的应用,比如要求服务器敏捷响应客户请求。
  • 担当突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,固然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
2.线程池的实现

起首,为了方便使用互斥锁,条件变量和线程,我们需要将这些封装起来。
Mutex.hpp对线程进行封装,代码如下:
封装互斥锁

  1. #include <iostream>
  2. #include <pthread.h>
  3. using namespace std;
  4. class Mutex
  5. {
  6. public:
  7.     Mutex(const Mutex&)=delete;
  8.     const Mutex& operator=(const Mutex&)=delete;
  9.     Mutex()
  10.     {
  11.         pthread_mutex_init(&_lock,nullptr);
  12.     }
  13.     ~Mutex()
  14.     {
  15.         pthread_mutex_destroy(&_lock);
  16.     }
  17.     void Lock()
  18.     {
  19.         pthread_mutex_lock(&_lock);
  20.     }
  21.     pthread_mutex_t * LockPtr()
  22.     {
  23.         return &_lock;
  24.     }
  25.     void Unlock()
  26.     {
  27.         pthread_mutex_unlock(&_lock);
  28.     }
  29. private:
  30.     pthread_mutex_t _lock;
  31. };
  32. class LockGuard
  33. {
  34.     public:
  35.     LockGuard(Mutex& m)
  36.     :_mutex(m)
  37.     {
  38.         _mutex.Lock();
  39.     }
  40.     ~LockGuard()
  41.     {
  42.         _mutex.Unlock();
  43.     }
  44.     private:
  45.     Mutex& _mutex;
  46. };
复制代码
封装条件变量

Cond.hpp对线程进行封装,代码如下:
  1. #include"Mutex.hpp"
  2. class Cond
  3. {
  4.     public:
  5.     Cond()
  6.     {
  7.         pthread_cond_init(&_cond,nullptr);
  8.     }
  9.     ~Cond()
  10.     {
  11.         pthread_cond_destroy(&_cond);
  12.     }
  13.     void Wait(Mutex& mutex)
  14.     {
  15.         pthread_cond_wait(&_cond,mutex.LockPtr());
  16.     }
  17.     void Notify()
  18.     {
  19.         pthread_cond_signal(&_cond);
  20.     }
  21.     void NotifyAll()
  22.     {
  23.         pthread_cond_broadcast(&_cond);
  24.     }
  25.     private:
  26.     pthread_cond_t _cond;
  27. };
复制代码
封装线程

Thread.hpp对线程进行封装,代码如下:
  1. #include <pthread.h>
  2. #include <iostream>
  3. #include <functional>
  4. #include <string>
  5. #include <unistd.h>
  6. using namespace std;
  7. using func_t = function<void(string)>;
  8. static int number = 1;
  9. enum STATUS
  10. {
  11.     NEW,
  12.     RUNNING,
  13.     STOP
  14. };
  15. class Thread
  16. {
  17. private:
  18.     static void *Routine(void *arg)
  19.     {
  20.         Thread *t = static_cast<Thread *>(arg);
  21.         t->_func(t->_name);
  22.         return nullptr;
  23.     }
  24. public:
  25.     Thread(func_t func)
  26.         : _func(func), _status(NEW), _joinable(true)
  27.     {
  28.         _name = "Thread-" + to_string(number++);
  29.         _pid = getpid();
  30.     }
  31.     bool Start()
  32.     {
  33.         if (_status != RUNNING)
  34.         {
  35.             _status = RUNNING;
  36.             int n = pthread_create(&_tid, nullptr, Routine, this);
  37.             if (n != 0)
  38.             {
  39.                 return false;
  40.             }
  41.             return true;
  42.         }
  43.         return false;
  44.     }
  45.     bool Stop()
  46.     {
  47.         if (_status == RUNNING)
  48.         {
  49.             _status = STOP;
  50.             int n = pthread_cancel(_tid);
  51.             if (n != 0)
  52.             {
  53.                 return false;
  54.             }
  55.             return true;
  56.         }
  57.         return false;
  58.     }
  59.     bool Join()
  60.     {
  61.         if (_joinable)
  62.         {
  63.             _status = STOP;
  64.             int n = pthread_join(_tid, nullptr);
  65.             if (n != 0)
  66.             {
  67.                 return false;
  68.             }
  69.             return true;
  70.         }
  71.         return false;
  72.     }
  73.     void Detach()
  74.     {
  75.         _joinable = false;
  76.         pthread_detach(_tid);
  77.     }
  78.     string Name()
  79.     {
  80.         return _name;
  81.     }
  82. private:
  83.     string _name;
  84.     pthread_t _tid;
  85.     pid_t _pid;
  86.     STATUS _status;
  87.     bool _joinable;
  88.     func_t _func;
  89. };
复制代码
线程的成员变量
  1.     string _name;
  2.     pthread_t _tid;
  3.     pid_t _pid;
  4.     STATUS _status;
  5.     bool _joinable;
  6.     func_t _func;
复制代码
我们知道创建线程的时间 pthread_create 函数原型如下
  1. int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
  2.                    void *(*start_routine) (void *), void *arg);
复制代码
此中实行的函数 start_routine 类型为void*(* ) (void*),即函数参数和返回值都为 void* 类型,而我们封装类传入的函数_func为 void(string) 类型,以是我们不能直接使用_func,而需要进行二次封装,通过类型为void*(* ) (void*) 的Routine函数封装使用_func函数,而 pthread_create 可以使用Routine函数。
  1.     static void *Routine(void *arg)
  2.     {
  3.         Thread *t = static_cast<Thread *>(arg);
  4.         t->_func(t->_name);
  5.         return nullptr;
  6.     }
  7.     bool Start()
  8.     {
  9.         if (_status != RUNNING)
  10.         {
  11.             _status = RUNNING;
  12.             int n = pthread_create(&_tid, nullptr, Routine, this);
  13.             if (n != 0)
  14.             {
  15.                 return false;
  16.             }
  17.             return true;
  18.         }
  19.         return false;
  20.     }
复制代码
但是为什么写代码时我们要将Routine函数定义为静态函数呢?因为在类内定义时,Routine无形中多了一个参数,即this指针,现实上Routine函数类型为void*(* ) (thread<T>*,void*),这样就不满意使用 pthread_create 时需要的函数类型,以是我们需要把Routine函数定义为静态函数从而去掉第一个隐含参数。
但这样Routine函数就不可以访问类内的成员变量_func,以是我们在使用 pthread_create 时需要把 this 参数传已往,从而让Runfunc函数能访问到_func,从而实行_func函数。
封装线程池

线程池的成员变量
  1.     vector<thread_t> _threads;
  2.     int _num;
  3.     int _wait_num;
  4.     std::queue<T> _taskq; // 临界资源
  5.     //控制器
  6.     Mutex _lock;
  7.     Cond _cond;
  8.     bool _isrunning;
  9.     static ThreadPool<T> *instance;
  10.     static Mutex mutex; // 只用来保护单例
复制代码
线程池的主要组件包罗线程数组、任务队列和控制器。线程数组用来存放被创建的线程,任务队列将新任务添加到队列末了,并通知空闲线程可以从队列最前端取用任务实行,控制器管理着一个队列锁,其保护临界资源任务队列,包管线程间的互斥关系,以及一个信号量,包管线程间的同步关系。 
线程池的实现通常包罗线程池初始化、任务提交、线程调度和线程烧毁等步调。
线程池初始化
  1. private:
  2.     bool IsEmpty() { return _taskq.empty(); }
  3.     void HandlerTask(string name)
  4.     {
  5.         cout << "线程: " << name << ", 进入HandlerTask的逻辑";
  6.         while (true)
  7.         {
  8.             // 1. 拿任务
  9.             T t;
  10.             {
  11.                 LockGuard lockguard(_lock);
  12.                 while (IsEmpty() && _isrunning)
  13.                 {
  14.                     _wait_num++;
  15.                     _cond.Wait(_lock);
  16.                     _wait_num--;
  17.                 }
  18.                 // 2. 任务队列为空 && 线程池退出了
  19.                 if (IsEmpty() && !_isrunning)
  20.                     break;
  21.                 t = _taskq.front();
  22.                 _taskq.pop();
  23.             }
  24.             // 2. 处理任务
  25.             t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
  26.         }
  27.         cout << "线程: " << name << " 退出";
  28.     }
  29.     ThreadPool(const ThreadPool<T> &) = delete;
  30.     ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
  31.     ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
  32.     {
  33.         for (int i = 0; i < _num; i++)
  34.         {
  35.             _threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
  36.             cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
  37.         }
  38.     }
  39. public:
  40.     static ThreadPool<T> *getInstance()
  41.     {
  42.         if (instance == NULL)
  43.         {
  44.             LockGuard lockguard(mutex);
  45.             if (instance == NULL)
  46.             {
  47.                cout << "单例首次被执行,需要加载对象...";
  48.                 instance = new ThreadPool<T>();
  49.                 instance->Start();
  50.             }
  51.         }
  52.         return instance;
  53.     }
复制代码
任务提交
  1.     void Equeue(T &in)
  2.     {
  3.         LockGuard lockguard(_lock);
  4.         if (!_isrunning)
  5.             return;
  6.         _taskq.push(in);
  7.         if (_wait_num > 0)
  8.             _cond.Notify();
  9.     }
复制代码
线程调度
  1.     void Start()
  2.     {
  3.         if (_isrunning)
  4.             return;
  5.         _isrunning = true;
  6.         for (auto &thread_ptr : _threads)
  7.         {
  8.             cout << "启动线程" << thread_ptr->Name() << " ... 成功";
  9.             thread_ptr->Start();
  10.         }
  11.     }
复制代码
停止调度
  1.     void Stop()
  2.     {
  3.         LockGuard lockguard(_lock);
  4.         if (_isrunning)
  5.         {
  6.             _isrunning = false; // 不工作
  7.             // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
  8.             if (_wait_num > 0)
  9.                 _cond.NotifyAll();
  10.         }
  11.     }
复制代码
ThreadPool.hpp完整代码如下
  1. #include <iostream>#include <string>#include <queue>#include <vector>#include <memory>#include "Mutex.hpp"#include "Cond.hpp"#include "Thread.hpp"using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 5;template <class T>class ThreadPool{private:
  2.     bool IsEmpty() { return _taskq.empty(); }
  3.     void HandlerTask(string name)
  4.     {
  5.         cout << "线程: " << name << ", 进入HandlerTask的逻辑";
  6.         while (true)
  7.         {
  8.             // 1. 拿任务
  9.             T t;
  10.             {
  11.                 LockGuard lockguard(_lock);
  12.                 while (IsEmpty() && _isrunning)
  13.                 {
  14.                     _wait_num++;
  15.                     _cond.Wait(_lock);
  16.                     _wait_num--;
  17.                 }
  18.                 // 2. 任务队列为空 && 线程池退出了
  19.                 if (IsEmpty() && !_isrunning)
  20.                     break;
  21.                 t = _taskq.front();
  22.                 _taskq.pop();
  23.             }
  24.             // 2. 处理任务
  25.             t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
  26.         }
  27.         cout << "线程: " << name << " 退出";
  28.     }
  29.     ThreadPool(const ThreadPool<T> &) = delete;
  30.     ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
  31.     ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
  32.     {
  33.         for (int i = 0; i < _num; i++)
  34.         {
  35.             _threads.push_back(make_shared<Thread>(bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
  36.             cout << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
  37.         }
  38.     }
  39. public:
  40.     static ThreadPool<T> *getInstance()
  41.     {
  42.         if (instance == NULL)
  43.         {
  44.             LockGuard lockguard(mutex);
  45.             if (instance == NULL)
  46.             {
  47.                cout << "单例首次被执行,需要加载对象...";
  48.                 instance = new ThreadPool<T>();
  49.                 instance->Start();
  50.             }
  51.         }
  52.         return instance;
  53.     }    void Equeue(T &in)
  54.     {
  55.         LockGuard lockguard(_lock);
  56.         if (!_isrunning)
  57.             return;
  58.         _taskq.push(in);
  59.         if (_wait_num > 0)
  60.             _cond.Notify();
  61.     }    void Start()
  62.     {
  63.         if (_isrunning)
  64.             return;
  65.         _isrunning = true;
  66.         for (auto &thread_ptr : _threads)
  67.         {
  68.             cout << "启动线程" << thread_ptr->Name() << " ... 成功";
  69.             thread_ptr->Start();
  70.         }
  71.     }    void Wait()    {        for (auto &thread_ptr : _threads)        {            thread_ptr->Join();            cout << "回收线程" << thread_ptr->Name() << " ... 成功";        }    }    void Stop()
  72.     {
  73.         LockGuard lockguard(_lock);
  74.         if (_isrunning)
  75.         {
  76.             _isrunning = false; // 不工作
  77.             // 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
  78.             if (_wait_num > 0)
  79.                 _cond.NotifyAll();
  80.         }
  81.     }private:    vector<thread_t> _threads;    int _num;    int _wait_num;    std::queue<T> _taskq; // 临界资源    Mutex _lock;    Cond _cond;    bool _isrunning;    static ThreadPool<T> *instance;    static Mutex mutex; // 只用来保护单例};template <class T>ThreadPool<T> *ThreadPool<T>::instance = NULL;template <class T>Mutex ThreadPool<T>::mutex; // 只用来保护单例
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表