【Linux跬步积累】—— 线程池详解(有源代码)

[复制链接]
发表于 2025-10-20 18:23:08 | 显示全部楼层 |阅读模式
🌏博客主页:PH_modest的博客主页
🚩当前专栏:Linux跬步积累
💌其他专栏:
🔴 逐日一题
🟡 C++跬步积累
🟢 C语言跬步积累
🌈座右铭:广积粮,缓称王!
  

一、怎样实现一个线程

1、根本布局

   Thread类是用来形貌一个线程的,以是成员变量中必要有tid和线程名。以是第一个成员变量就是==_tid==;
  一个线程创建出来,肯定是用来实行对于的任务的,那么我们还必要一个成员变量来吸收转达的函数,以是第三个成员变量就是==_func==,是一个void(T&)范例的参数,以是第四个成员变量就是转达过来的参数_data。
  1. template<class T>
  2. using func_t = std::function<void(T&)>;//模版方法
  3. template<class T>
  4. class Thread
  5. {
  6. public:
  7.     Thread(){}
  8.     static void* ThreadRoutinue(){}
  9.     void Start(){}
  10.     void Join(){}
  11.     void Detach(){}
  12.     ~Thread(){}
  13. private:
  14.     pthread_t _tid;          //线程tid
  15.     std::string _threadname; //线程名
  16.     func_t<T> _func;         //线程执行的函数
  17.     T _data;                 //需要处理的数据
  18. };
复制代码
2、实现成员函数

   成员函数中我们必要注意的就是pthread_create()函数,它的第三个参数是一个参数为void *,返回值为void *的一个函数。
  但是我们将这个函数ThreadRoutinue()界说在这个类里,他就是一个成员函数,成员函数的参数,会潜伏this指针,以是假如我们正常写,就会报错,由于这个函数不满足pthread_create()函数的条件。
  但是只要让ThreadRoutinue()的参数中没有this指针就可以了,那么该怎样实现呢?
  在前面加上static就可以,变成静态成员变量,就不会有this指针了。那么标题又来了,没有了this指针,我们又该怎样访问到成员变量呢?
  别忘了这个函数可以转达一个返回值为void*的参数,我们只必要将this指针转达已往,在函数内部强转一下就可以了。
  1. template<class T>
  2. using func_t = std::function<void(T&)>;//模版方法
  3. template<class T>
  4. class Thread
  5. {
  6. public:
  7.     //thread(func,5,"thread-1");
  8.     Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
  9.         :_func(func),_data(data),_threadname(name)
  10.     {}
  11.     //需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
  12.     static void* ThreadRoutinue(void* args)
  13.     {
  14.         //将传过来的this指针强转一下,然后就可以访问到_func和_data了
  15.         Thread<T>* self = static_cast<Thread<T>*>(args);
  16.         self->_func(self->_data);
  17.         return nullptr;
  18.     }
  19.     void Start()
  20.     {
  21.         //创建线程
  22.         int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
  23.         return ret==0;
  24.     }
  25.     void Join()
  26.     {
  27.         pthread_join(_tid,nullptr);
  28.     }
  29.     void Detach()
  30.     {
  31.         pthread_detach(_tid);
  32.     }
  33.     ~Thread(){}
  34. private:
  35.     pthread_t _tid;          //线程tid
  36.     std::string _threadname; //线程名
  37.     func_t<T> _func;         //线程执行的函数
  38.     T _data;                 //需要处理的数据
  39. };
复制代码
3、演示

让我们写一段测试代码,来看一下结果:
  1. #include"Thread.hpp"
  2. void test(int x)
  3. {
  4.     while(true)
  5.     {
  6.         std::cout<<x<<std::endl;
  7.         sleep(1);
  8.     }
  9. }
  10. int main()
  11. {
  12.     MyThread::Thread<int> mt(test,2025,"thread-1");
  13.     if(mt.Start() == true)
  14.     {
  15.         std::cout<<"MyThread start success!\n";
  16.     }
  17.     else
  18.     {
  19.         std::cout<<"MyThread start failed!\n";
  20.     }
  21.     mt.Join();
  22.     return 0;
  23. }
复制代码
运行结果:

让我们使用ps -aL指令来检察一下是否真的创建了线程:

可以看到,步调运行之后,真的创建出了两个名为mythread的线程。
4、代码总汇

Thread.hpp

  1. #pragma once
  2. #include<string>
  3. #include<pthread.h>
  4. #include<unistd.h>
  5. #include<iostream>
  6. #include<functional>
  7. namespace MyThread
  8. {
  9.     template<class T>
  10.     using func_t = std::function<void(T&)>;//模版方法
  11.     template<class T>
  12.     class Thread
  13.     {
  14.     public:
  15.         //thread(func,5,"thread-1");
  16.         Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
  17.             :_func(func),_data(data),_threadname(name)
  18.         {}
  19.         //需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
  20.         static void* ThreadRoutinue(void* args)
  21.         {
  22.             //将传过来的this指针强转一下,然后就可以访问到_func和_data了
  23.             Thread<T>* self = static_cast<Thread<T>*>(args);
  24.             self->_func(self->_data);
  25.             return nullptr;
  26.         }
  27.         bool Start()
  28.         {
  29.             //创建线程
  30.             int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
  31.             return ret==0;
  32.         }
  33.         void Join()
  34.         {
  35.             pthread_join(_tid,nullptr);
  36.         }
  37.         void Detach()
  38.         {
  39.             pthread_detach(_tid);
  40.         }
  41.         ~Thread(){}
  42.     private:
  43.         pthread_t _tid;          //线程tid
  44.         std::string _threadname; //线程名
  45.         func_t<T> _func;         //线程执行的函数
  46.         T _data;                 //需要处理的数据
  47.     };
  48. }
复制代码
Main.cc

  1. #include"Thread.hpp"
  2. void test(int x)
  3. {
  4.     while(true)
  5.     {
  6.         std::cout<<x<<std::endl;
  7.         sleep(1);
  8.     }
  9. }
  10. int main()
  11. {
  12.     MyThread::Thread<int> mt(test,2025,"thread-1");
  13.     if(mt.Start() == true)
  14.     {
  15.         std::cout<<"MyThread start success!\n";
  16.     }
  17.     else
  18.     {
  19.         std::cout<<"MyThread start failed!\n";
  20.     }
  21.     mt.Join();
  22.     return 0;
  23. }
复制代码
二、怎样封装线程池

1、筹划成员变量

   线程池内部维护多个线程和一个任务队列,主线程将任务放入任务队列当中,然后子线程就从任务队列中拿取任务举行处置惩罚。
  以是必要一个数组来管理多个线程:_threads
  以及一个任务队列:_taskQueue
  别的我们还必要知道一共有多少个线程:_threadNum
  然后还可以设置一个变量来检察真在等候任务的线程数量:_waitNum
  末了我们再设置一个变量来判定当火线程池是否运行,假如已经退出了,我们必要将任务队列中的任务处置惩罚完再退出:_isRunning
  界说这些变量就够了吗?
  我们忽略了一个多线程编程中最紧张的标题:线程之间的互斥和同步
  我们的任务是:主线程往任务队列中放入任务,子线程从任务队列中拿取任务。那么我们思索一下下面几个标题:
  

  • 多个线程之间可以同时从任务队列中拿任务吗?
    答:不能,任务队列是临界资源,线程和线程之间要互斥,否则会出现差异的线程拿取同一个任务的情况。
  • 主线程放入任务时,子线程可以同时拿取任务吗?
    答:不能,主线程和子线程之间也必要互斥。
  由于他们都是竞争任务队列这一个资源,以是我们只要定一个一把锁就可以了。如许互斥的标题就办理了。
  那么同步呢?
  是不是只有任务队列中有任务时,子线程才气获取任务,以是必要主线程先放任务,子线程才气拿任务,这就必要一个条件变量来维护。
  综上:
  我们还必要两个成员变量:_mutex和_cond
  1. #include"Thread.hpp"
  2. #include<vector>
  3. template<class T>
  4. class ThreadPool
  5. {   
  6. private:
  7.     std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
  8.     std::queue<T> _taskQueue;//任务队列
  9.     int _threadNum;//线程数
  10.     int _waitNum;//等待的线程数
  11.     bool _isRunning;//线程池是否在运行
  12.     pthread_mutex_t _mutex;//互斥锁
  13.     pthread_cond_t _cond;//条件变量
  14. };
复制代码
2、构造函数与析构函数

   构造和析构的重要作用就是对_mutex和_cond的初始化和烧毁。
  同时我们还必要知道这个线程池必要创建多少个线程,以是必要外部转达参数来告诉我们。
  然后就是构造函数对其他成员变量举行初始化。
  1. template<class T>
  2. class ThreadPool
  3. {   
  4. public:
  5.     ThreadPool(const int num = 5)
  6.         :_threadNum(num),_waitNum(0),_isRunning(false)
  7.     {
  8.         pthread_mutex_init(&_mutex,nullptr);
  9.         pthread_cond_init(&_cond,nullptr);
  10.     }
  11.     ~ThreadPool()
  12.     {
  13.         pthread_mutex_destroy(&_mutex);
  14.         pthread_cond_destroy(&_cond);
  15.     }
  16. }
复制代码
3、初始化

   上面的构造函数只是创建了锁和条件变量,以及部分变量的初始化,并没有创建出线程对象。
    我们可以界说一个ThreadInit()函数来创建线程。
  让我们先回顾一下Thread的构造函数必要哪些变量:
  1. Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
复制代码
func参数是必要调用的函数,data是这个函数必要处置惩罚的数据,name是线程名。
  在此,我们让线程去实行一个叫做handerTask的函数,这个函数内部实现线程到任务队列中获取任务的过程。
  而handerTask的第一个参数也是线程的名字,以便在handerTask内部检察是哪个线程实行了任务。
  这里我们使用bind函数来将HanderTask函数与this参数绑定在一起,而且将这个参数绑定到 HandleTask 的第一个参数位置。
  1. void HanderTask(std::string)
  2. {
  3.     //执行任务队列的任务
  4. }
  5. void InitThread()
  6. {
  7.     for(int i=0;i<_threadNum;i++)
  8.     {
  9.         auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
  10.         std::string name = "Thread-"+std::to_string(i);
  11.         //_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
  12.         _threads.emplace_back(func,name,name);
  13.     }
  14.     _isRunning = true;
  15. }
复制代码
4、启动与接纳

   我们已经创建出来了一批线程,接下来还必要启动这一批线程,而且接纳。
  因此还必要界说成员函数StartAll和JoinAll来启动和等候这批线程。
  1. void StartAll()
  2. {
  3.     for(auto& thread : _threads)
  4.     {
  5.         thread.Start();
  6.     }
  7. }
  8. void JoinAll()
  9. {
  10.     for(auto& thread : _threads)
  11.     {
  12.         thread.Join();
  13.     }
  14. }
复制代码
5、主线程放入任务

   我们可以界说一个EnQueue,用来让主线程往任务队列中投放任务。
  投放任务的要求:
  

  • 访问队列时必要与其他线程互斥,即对_mutex加锁;
  • 添加任务后,就可以唤醒在等候的线程了
  1. void EnQueue(const T& task)
  2. {
  3.     pthread_mutex_lock(&_mutex);
  4.     if(_isRunning)
  5.     {
  6.         _taskQueue.push(task);
  7.         if(_waitNum > 0)
  8.         {
  9.             pthread_cond_signal(&_cond);
  10.         }
  11.     }
  12.     pthread_mutex_unlock(&_mutex);
  13. }
复制代码
6、子线程读取任务

   子线程读取任务的要求如下:
  

  • 保持互斥,从任务队列获取数据前必要加锁,获取竣事后解锁;
  • 保持同步,假如任务队列中没有数据,就去_cond劣等候,等候被唤醒。
  1. void HanderTask(std::string name)
  2. {
  3.     //子线程需要一直处理,所以这里使用死循环
  4.     while(true)
  5.     {
  6.         pthread_mutex_lock(&_mutex);
  7.         while(_taskQueue.empty())//这里是while循环,不是if判断,避免伪唤醒
  8.         {
  9.             _waitNum++;
  10.             pthread_cond_wait(&_cond,&_mutex);
  11.             _waitNum--;
  12.         }
  13.         T task = _taskQueue.front();
  14.         _taskQueue.pop();
  15.         std::cout<<name<<"get a task..."<<std::endl;
  16.         pthread_mutex_unlock(&_mutex);
  17.         task();
  18.     }
  19. }
复制代码
  这里必要注意一点,判定当前任务队列是否为空时,使用的是while循环,而不是if语句,由于当火线程被主线程唤醒之后,大概会发生伪唤醒,着实任务队列中根本没有任务。以是还要进入下一次while判定,确保访问任务队列时,肯定是有任务的。
  但是现在尚有一个标题,假如线程访问任务队列时,线程池被制止了怎么办?
  我们可以通过_isRunning来判定,在实行任务时判定一下_isRunning的值:
  

  • 假如为true:正常运行
  • 假如为false:

    • 假如任务队列中尚有任务:把任务实行完
    • 假如没有任务:当火线程退出

  于是我们的代码改进为:
  1. void HanderTask(std::string name)
  2. {
  3.     //子线程需要一直处理,所以这里使用死循环
  4.     while(true)
  5.     {
  6.         pthread_mutex_lock(&_mutex);
  7.         while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
  8.         {
  9.             _waitNum++;
  10.             pthread_cond_wait(&_cond,&_mutex);
  11.             _waitNum--;
  12.         }
  13.         //线程池终止了,并且任务队列中没有任务 --> 线程退出
  14.         if(_taskQueue.empty()&&!_isRunning)
  15.         {
  16.             pthread_mutex_unlock(&_mutex);
  17.             std::coud<<name<<" quit..."<<std::endl;
  18.             break;
  19.         }
  20.         //走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
  21.         T task = _taskQueue.front();
  22.         _taskQueue.pop();
  23.         std::cout<<name<<" get a task..."<<std::endl;
  24.         pthread_mutex_unlock(&_mutex);
  25.         task();
  26.     }
  27. }
复制代码
7、制止线程池

   制止线程池不光仅是将_isRunning设置为false这么简朴,必要思量以下标题:
  

  • 假如在Stop的时间,有线程正在调用HanderTask函数怎么办?
    答:此时多个线程访问变量_isRunning,就有大概会造成线程安全标题,以是访问_isRunning时也要加锁,由于之前全部的访问_isRunning的操纵,都在_mutex锁中,以是和之前共用同一把锁就行。
  • 假如Stop之后,尚有线程在_cond下面等候怎么办?
    答:假如线程不绝在_cond下面等候,就会导致无法退出,此时在_isRunning = false之后,还要通过pthread_cond_broadcast唤醒全部等候的线程,让他们重新实行HanderTask的逻辑,从而正常退出。
  1. void Stop()
  2. {
  3.     pthread_mutex_lock(&_mutex);
  4.     _isRunning = false;//终止线程池
  5.     pthread_cond_broadcast(&_cond);//唤醒所有等待的线程
  6.     pthread_mutex_unlock(&_mutex);
  7. }
复制代码
三、测试

我们可以用以下代码举行测试:
  1. #include <iostream>
  2. #include <vector>
  3. #include <string>
  4. #include <ctime>
  5. #include <cstdlib>
  6. #include <unistd.h>
  7. #include <pthread.h>
  8. #include"ThreadPool.hpp"
  9. int Add()
  10. {
  11.     int a = rand() % 100 + 1;
  12.     int b = rand() % 100 + 1;
  13.     std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
  14.     return a+b;
  15. }
  16. int main()
  17. {
  18.     srand(static_cast<unsigned int>(time(nullptr)));
  19.     ThreadPool<int(*)(void)> tp(3);
  20.     tp.ThreadInit();
  21.     tp.StartAll();
  22.     for (int i = 0; i < 10; i++)
  23.     {
  24.         tp.EnQueue(Add);
  25.         sleep(1);
  26.     }
  27.     tp.Stop();
  28.     tp.JoinAll();
  29.     return 0;
  30. }
复制代码
  通过ThreadPool<int(*)(void)> tp(3);创建有三个线程的线程池,实行的任务范例为int(void),但是要注意,此处要传入可调用对象,C++的可调用对象有:函数指针,仿函数,lambda表达式。此处我用了函数指针int(*)(void)。
  接着ThreadInit初始化线程池,此时线程对象Thread已经创建出来了,但是尚有没创建线程。随后调用StartAll,此时才真正创建了线程。
  然后进入一个for循环,给任务队列派发任务,统共派发十个任务,都是函数Add,此中天生两个随机数的加法。
  末了调用Stop制止退出线程池,此时线程也会一个个退出,然后调用JoinAll接纳全部线程。
  运行结果:

四、线程池总代码

1、ThreadPool.hpp

  1. #include"Thread.hpp"
  2. #include<vector>
  3. #include<queue>
  4. #include<string>
  5. #include <unistd.h>
  6. #include <pthread.h>
  7. template<class T>
  8. class ThreadPool
  9. {   
  10. public:
  11.     ThreadPool(const int num = 5)
  12.         :_threadNum(num),_waitNum(0),_isRunning(false)
  13.     {
  14.         pthread_mutex_init(&_mutex,nullptr);
  15.         pthread_cond_init(&_cond,nullptr);
  16.     }
  17.     void HanderTask(std::string name)
  18.     {
  19.         //子线程需要一直处理,所以这里使用死循环
  20.         while(true)
  21.         {
  22.             pthread_mutex_lock(&_mutex);
  23.             while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
  24.             {
  25.                 _waitNum++;
  26.                 pthread_cond_wait(&_cond,&_mutex);
  27.                 _waitNum--;
  28.             }
  29.             //线程池终止了,并且任务队列中没有任务 --> 线程退出
  30.             if(_taskQueue.empty()&&!_isRunning)
  31.             {
  32.                 pthread_mutex_unlock(&_mutex);
  33.                 std::cout<<name<<" quit..."<<std::endl;
  34.                 break;
  35.             }
  36.             //走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
  37.             T task = _taskQueue.front();
  38.             _taskQueue.pop();
  39.             std::cout<<name<<" get a task..."<<std::endl;
  40.             pthread_mutex_unlock(&_mutex);
  41.             task();
  42.         }
  43.     }
  44.     void ThreadInit()
  45.     {
  46.         for(int i=0;i<_threadNum;i++)
  47.         {
  48.             auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
  49.             std::string name = "Thread-"+std::to_string(i);
  50.             //_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
  51.             _threads.emplace_back(func,name,name);
  52.         }
  53.         _isRunning = true;
  54.     }
  55.     void StartAll()
  56.     {
  57.         for(auto& thread : _threads)
  58.         {
  59.             thread.Start();
  60.         }
  61.     }
  62.     void JoinAll()
  63.     {
  64.         for(auto& thread : _threads)
  65.         {
  66.             thread.Join();
  67.         }
  68.     }
  69.     void EnQueue(const T& task)
  70.     {
  71.         pthread_mutex_lock(&_mutex);
  72.         if(_isRunning)
  73.         {
  74.             _taskQueue.push(task);
  75.             if(_waitNum > 0)
  76.             {
  77.                 pthread_cond_signal(&_cond);
  78.             }
  79.         }
  80.         pthread_mutex_unlock(&_mutex);
  81.     }
  82.     void Stop()
  83.     {
  84.         pthread_mutex_lock(&_mutex);
  85.         _isRunning = false;//终止线程池
  86.         pthread_cond_broadcast(&_cond);//唤醒所有等待的线程
  87.         pthread_mutex_unlock(&_mutex);
  88.     }
  89.     ~ThreadPool()
  90.     {
  91.         pthread_mutex_destroy(&_mutex);
  92.         pthread_cond_destroy(&_cond);
  93.     }
  94. private:
  95.     std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
  96.     std::queue<T> _taskQueue;//任务队列
  97.     int _threadNum;//线程数
  98.     int _waitNum;//等待的线程数
  99.     bool _isRunning;//线程池是否在运行
  100.     pthread_mutex_t _mutex;//互斥锁
  101.     pthread_cond_t _cond;//条件变量
  102. };
复制代码
2、Main.cc

  1. #include <iostream>
  2. #include <vector>
  3. #include <string>
  4. #include <ctime>
  5. #include <cstdlib>
  6. #include <unistd.h>
  7. #include <pthread.h>
  8. #include"ThreadPool.hpp"
  9. int Add()
  10. {
  11.     int a = rand() % 100 + 1;
  12.     int b = rand() % 100 + 1;
  13.     std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
  14.     return a+b;
  15. }
  16. int main()
  17. {
  18.     srand(static_cast<unsigned int>(time(nullptr)));
  19.     ThreadPool<int(*)(void)> tp(3);
  20.     tp.ThreadInit();
  21.     tp.StartAll();
  22.     for (int i = 0; i < 10; i++)
  23.     {
  24.         tp.EnQueue(Add);
  25.         sleep(1);
  26.     }
  27.     tp.Stop();
  28.     tp.JoinAll();
  29.     return 0;
  30. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表