🌏博客主页:PH_modest的博客主页
🚩当前专栏:Linux跬步积累
💌其他专栏:
🔴 逐日一题
🟡 C++跬步积累
🟢 C语言跬步积累
🌈座右铭:广积粮,缓称王!
一、怎样实现一个线程
1、根本布局
Thread类是用来形貌一个线程的,以是成员变量中必要有tid和线程名。以是第一个成员变量就是==_tid==;
一个线程创建出来,肯定是用来实行对于的任务的,那么我们还必要一个成员变量来吸收转达的函数,以是第三个成员变量就是==_func==,是一个void(T&)范例的参数,以是第四个成员变量就是转达过来的参数_data。
- template<class T>
- using func_t = std::function<void(T&)>;//模版方法
- template<class T>
- class Thread
- {
- public:
- Thread(){}
- static void* ThreadRoutinue(){}
- void Start(){}
- void Join(){}
- void Detach(){}
- ~Thread(){}
- private:
- pthread_t _tid; //线程tid
- std::string _threadname; //线程名
- func_t<T> _func; //线程执行的函数
- T _data; //需要处理的数据
- };
复制代码 2、实现成员函数
成员函数中我们必要注意的就是pthread_create()函数,它的第三个参数是一个参数为void *,返回值为void *的一个函数。
但是我们将这个函数ThreadRoutinue()界说在这个类里,他就是一个成员函数,成员函数的参数,会潜伏this指针,以是假如我们正常写,就会报错,由于这个函数不满足pthread_create()函数的条件。
但是只要让ThreadRoutinue()的参数中没有this指针就可以了,那么该怎样实现呢?
在前面加上static就可以,变成静态成员变量,就不会有this指针了。那么标题又来了,没有了this指针,我们又该怎样访问到成员变量呢?
别忘了这个函数可以转达一个返回值为void*的参数,我们只必要将this指针转达已往,在函数内部强转一下就可以了。
- template<class T>
- using func_t = std::function<void(T&)>;//模版方法
- template<class T>
- class Thread
- {
- public:
- //thread(func,5,"thread-1");
- Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
- :_func(func),_data(data),_threadname(name)
- {}
- //需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
- static void* ThreadRoutinue(void* args)
- {
- //将传过来的this指针强转一下,然后就可以访问到_func和_data了
- Thread<T>* self = static_cast<Thread<T>*>(args);
- self->_func(self->_data);
- return nullptr;
- }
- void Start()
- {
- //创建线程
- int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
- return ret==0;
- }
- void Join()
- {
- pthread_join(_tid,nullptr);
- }
- void Detach()
- {
- pthread_detach(_tid);
- }
- ~Thread(){}
- private:
- pthread_t _tid; //线程tid
- std::string _threadname; //线程名
- func_t<T> _func; //线程执行的函数
- T _data; //需要处理的数据
- };
复制代码 3、演示
让我们写一段测试代码,来看一下结果:
- #include"Thread.hpp"
- void test(int x)
- {
- while(true)
- {
- std::cout<<x<<std::endl;
- sleep(1);
- }
- }
- int main()
- {
- MyThread::Thread<int> mt(test,2025,"thread-1");
- if(mt.Start() == true)
- {
- std::cout<<"MyThread start success!\n";
- }
- else
- {
- std::cout<<"MyThread start failed!\n";
- }
- mt.Join();
- return 0;
- }
复制代码 运行结果:
让我们使用ps -aL指令来检察一下是否真的创建了线程:
可以看到,步调运行之后,真的创建出了两个名为mythread的线程。
4、代码总汇
Thread.hpp
- #pragma once
- #include<string>
- #include<pthread.h>
- #include<unistd.h>
- #include<iostream>
- #include<functional>
- namespace MyThread
- {
- template<class T>
- using func_t = std::function<void(T&)>;//模版方法
- template<class T>
- class Thread
- {
- public:
- //thread(func,5,"thread-1");
- Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
- :_func(func),_data(data),_threadname(name)
- {}
- //需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
- static void* ThreadRoutinue(void* args)
- {
- //将传过来的this指针强转一下,然后就可以访问到_func和_data了
- Thread<T>* self = static_cast<Thread<T>*>(args);
- self->_func(self->_data);
- return nullptr;
- }
- bool Start()
- {
- //创建线程
- int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
- return ret==0;
- }
- void Join()
- {
- pthread_join(_tid,nullptr);
- }
- void Detach()
- {
- pthread_detach(_tid);
- }
- ~Thread(){}
- private:
- pthread_t _tid; //线程tid
- std::string _threadname; //线程名
- func_t<T> _func; //线程执行的函数
- T _data; //需要处理的数据
- };
- }
复制代码 Main.cc
- #include"Thread.hpp"
- void test(int x)
- {
- while(true)
- {
- std::cout<<x<<std::endl;
- sleep(1);
- }
- }
- int main()
- {
- MyThread::Thread<int> mt(test,2025,"thread-1");
- if(mt.Start() == true)
- {
- std::cout<<"MyThread start success!\n";
- }
- else
- {
- std::cout<<"MyThread start failed!\n";
- }
- mt.Join();
- return 0;
- }
复制代码 二、怎样封装线程池
1、筹划成员变量
线程池内部维护多个线程和一个任务队列,主线程将任务放入任务队列当中,然后子线程就从任务队列中拿取任务举行处置惩罚。
以是必要一个数组来管理多个线程:_threads
以及一个任务队列:_taskQueue
别的我们还必要知道一共有多少个线程:_threadNum
然后还可以设置一个变量来检察真在等候任务的线程数量:_waitNum
末了我们再设置一个变量来判定当火线程池是否运行,假如已经退出了,我们必要将任务队列中的任务处置惩罚完再退出:_isRunning
界说这些变量就够了吗?
我们忽略了一个多线程编程中最紧张的标题:线程之间的互斥和同步
我们的任务是:主线程往任务队列中放入任务,子线程从任务队列中拿取任务。那么我们思索一下下面几个标题:
- 多个线程之间可以同时从任务队列中拿任务吗?
答:不能,任务队列是临界资源,线程和线程之间要互斥,否则会出现差异的线程拿取同一个任务的情况。
- 主线程放入任务时,子线程可以同时拿取任务吗?
答:不能,主线程和子线程之间也必要互斥。
由于他们都是竞争任务队列这一个资源,以是我们只要定一个一把锁就可以了。如许互斥的标题就办理了。
那么同步呢?
是不是只有任务队列中有任务时,子线程才气获取任务,以是必要主线程先放任务,子线程才气拿任务,这就必要一个条件变量来维护。
综上:
我们还必要两个成员变量:_mutex和_cond
- #include"Thread.hpp"
- #include<vector>
- template<class T>
- class ThreadPool
- {
- private:
- std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
- std::queue<T> _taskQueue;//任务队列
- int _threadNum;//线程数
- int _waitNum;//等待的线程数
- bool _isRunning;//线程池是否在运行
- pthread_mutex_t _mutex;//互斥锁
- pthread_cond_t _cond;//条件变量
- };
复制代码 2、构造函数与析构函数
构造和析构的重要作用就是对_mutex和_cond的初始化和烧毁。
同时我们还必要知道这个线程池必要创建多少个线程,以是必要外部转达参数来告诉我们。
然后就是构造函数对其他成员变量举行初始化。
- template<class T>
- class ThreadPool
- {
- public:
- ThreadPool(const int num = 5)
- :_threadNum(num),_waitNum(0),_isRunning(false)
- {
- pthread_mutex_init(&_mutex,nullptr);
- pthread_cond_init(&_cond,nullptr);
- }
- ~ThreadPool()
- {
- pthread_mutex_destroy(&_mutex);
- pthread_cond_destroy(&_cond);
- }
- }
复制代码 3、初始化
上面的构造函数只是创建了锁和条件变量,以及部分变量的初始化,并没有创建出线程对象。
我们可以界说一个ThreadInit()函数来创建线程。
让我们先回顾一下Thread的构造函数必要哪些变量:
- Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
复制代码 func参数是必要调用的函数,data是这个函数必要处置惩罚的数据,name是线程名。
在此,我们让线程去实行一个叫做handerTask的函数,这个函数内部实现线程到任务队列中获取任务的过程。
而handerTask的第一个参数也是线程的名字,以便在handerTask内部检察是哪个线程实行了任务。
这里我们使用bind函数来将HanderTask函数与this参数绑定在一起,而且将这个参数绑定到 HandleTask 的第一个参数位置。
- void HanderTask(std::string)
- {
- //执行任务队列的任务
- }
- void InitThread()
- {
- for(int i=0;i<_threadNum;i++)
- {
- auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
- std::string name = "Thread-"+std::to_string(i);
- //_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
- _threads.emplace_back(func,name,name);
- }
- _isRunning = true;
- }
复制代码 4、启动与接纳
我们已经创建出来了一批线程,接下来还必要启动这一批线程,而且接纳。
因此还必要界说成员函数StartAll和JoinAll来启动和等候这批线程。
- void StartAll()
- {
- for(auto& thread : _threads)
- {
- thread.Start();
- }
- }
- void JoinAll()
- {
- for(auto& thread : _threads)
- {
- thread.Join();
- }
- }
复制代码 5、主线程放入任务
我们可以界说一个EnQueue,用来让主线程往任务队列中投放任务。
投放任务的要求:
- 访问队列时必要与其他线程互斥,即对_mutex加锁;
- 添加任务后,就可以唤醒在等候的线程了
- void EnQueue(const T& task)
- {
- pthread_mutex_lock(&_mutex);
- if(_isRunning)
- {
- _taskQueue.push(task);
- if(_waitNum > 0)
- {
- pthread_cond_signal(&_cond);
- }
- }
- pthread_mutex_unlock(&_mutex);
- }
复制代码 6、子线程读取任务
子线程读取任务的要求如下:
- 保持互斥,从任务队列获取数据前必要加锁,获取竣事后解锁;
- 保持同步,假如任务队列中没有数据,就去_cond劣等候,等候被唤醒。
- void HanderTask(std::string name)
- {
- //子线程需要一直处理,所以这里使用死循环
- while(true)
- {
- pthread_mutex_lock(&_mutex);
- while(_taskQueue.empty())//这里是while循环,不是if判断,避免伪唤醒
- {
- _waitNum++;
- pthread_cond_wait(&_cond,&_mutex);
- _waitNum--;
- }
- T task = _taskQueue.front();
- _taskQueue.pop();
- std::cout<<name<<"get a task..."<<std::endl;
- pthread_mutex_unlock(&_mutex);
- task();
- }
- }
复制代码 这里必要注意一点,判定当前任务队列是否为空时,使用的是while循环,而不是if语句,由于当火线程被主线程唤醒之后,大概会发生伪唤醒,着实任务队列中根本没有任务。以是还要进入下一次while判定,确保访问任务队列时,肯定是有任务的。
但是现在尚有一个标题,假如线程访问任务队列时,线程池被制止了怎么办?
我们可以通过_isRunning来判定,在实行任务时判定一下_isRunning的值:
- 假如为true:正常运行
- 假如为false:
- 假如任务队列中尚有任务:把任务实行完
- 假如没有任务:当火线程退出
于是我们的代码改进为:
- void HanderTask(std::string name)
- {
- //子线程需要一直处理,所以这里使用死循环
- while(true)
- {
- pthread_mutex_lock(&_mutex);
- while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
- {
- _waitNum++;
- pthread_cond_wait(&_cond,&_mutex);
- _waitNum--;
- }
- //线程池终止了,并且任务队列中没有任务 --> 线程退出
- if(_taskQueue.empty()&&!_isRunning)
- {
- pthread_mutex_unlock(&_mutex);
- std::coud<<name<<" quit..."<<std::endl;
- break;
- }
- //走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
- T task = _taskQueue.front();
- _taskQueue.pop();
- std::cout<<name<<" get a task..."<<std::endl;
- pthread_mutex_unlock(&_mutex);
- task();
- }
- }
复制代码 7、制止线程池
制止线程池不光仅是将_isRunning设置为false这么简朴,必要思量以下标题:
- 假如在Stop的时间,有线程正在调用HanderTask函数怎么办?
答:此时多个线程访问变量_isRunning,就有大概会造成线程安全标题,以是访问_isRunning时也要加锁,由于之前全部的访问_isRunning的操纵,都在_mutex锁中,以是和之前共用同一把锁就行。
- 假如Stop之后,尚有线程在_cond下面等候怎么办?
答:假如线程不绝在_cond下面等候,就会导致无法退出,此时在_isRunning = false之后,还要通过pthread_cond_broadcast唤醒全部等候的线程,让他们重新实行HanderTask的逻辑,从而正常退出。
- void Stop()
- {
- pthread_mutex_lock(&_mutex);
- _isRunning = false;//终止线程池
- pthread_cond_broadcast(&_cond);//唤醒所有等待的线程
- pthread_mutex_unlock(&_mutex);
- }
复制代码 三、测试
我们可以用以下代码举行测试:
- #include <iostream>
- #include <vector>
- #include <string>
- #include <ctime>
- #include <cstdlib>
- #include <unistd.h>
- #include <pthread.h>
- #include"ThreadPool.hpp"
- int Add()
- {
- int a = rand() % 100 + 1;
- int b = rand() % 100 + 1;
- std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
- return a+b;
- }
- int main()
- {
- srand(static_cast<unsigned int>(time(nullptr)));
- ThreadPool<int(*)(void)> tp(3);
- tp.ThreadInit();
- tp.StartAll();
- for (int i = 0; i < 10; i++)
- {
- tp.EnQueue(Add);
- sleep(1);
- }
- tp.Stop();
- tp.JoinAll();
- return 0;
- }
复制代码 通过ThreadPool<int(*)(void)> tp(3);创建有三个线程的线程池,实行的任务范例为int(void),但是要注意,此处要传入可调用对象,C++的可调用对象有:函数指针,仿函数,lambda表达式。此处我用了函数指针int(*)(void)。
接着ThreadInit初始化线程池,此时线程对象Thread已经创建出来了,但是尚有没创建线程。随后调用StartAll,此时才真正创建了线程。
然后进入一个for循环,给任务队列派发任务,统共派发十个任务,都是函数Add,此中天生两个随机数的加法。
末了调用Stop制止退出线程池,此时线程也会一个个退出,然后调用JoinAll接纳全部线程。
运行结果:
四、线程池总代码
1、ThreadPool.hpp
- #include"Thread.hpp"
- #include<vector>
- #include<queue>
- #include<string>
- #include <unistd.h>
- #include <pthread.h>
- template<class T>
- class ThreadPool
- {
- public:
- ThreadPool(const int num = 5)
- :_threadNum(num),_waitNum(0),_isRunning(false)
- {
- pthread_mutex_init(&_mutex,nullptr);
- pthread_cond_init(&_cond,nullptr);
- }
- void HanderTask(std::string name)
- {
- //子线程需要一直处理,所以这里使用死循环
- while(true)
- {
- pthread_mutex_lock(&_mutex);
- while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
- {
- _waitNum++;
- pthread_cond_wait(&_cond,&_mutex);
- _waitNum--;
- }
- //线程池终止了,并且任务队列中没有任务 --> 线程退出
- if(_taskQueue.empty()&&!_isRunning)
- {
- pthread_mutex_unlock(&_mutex);
- std::cout<<name<<" quit..."<<std::endl;
- break;
- }
- //走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
- T task = _taskQueue.front();
- _taskQueue.pop();
- std::cout<<name<<" get a task..."<<std::endl;
- pthread_mutex_unlock(&_mutex);
- task();
- }
- }
- void ThreadInit()
- {
- for(int i=0;i<_threadNum;i++)
- {
- auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
- std::string name = "Thread-"+std::to_string(i);
- //_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
- _threads.emplace_back(func,name,name);
- }
- _isRunning = true;
- }
- void StartAll()
- {
- for(auto& thread : _threads)
- {
- thread.Start();
- }
- }
- void JoinAll()
- {
- for(auto& thread : _threads)
- {
- thread.Join();
- }
- }
- void EnQueue(const T& task)
- {
- pthread_mutex_lock(&_mutex);
- if(_isRunning)
- {
- _taskQueue.push(task);
- if(_waitNum > 0)
- {
- pthread_cond_signal(&_cond);
- }
- }
- pthread_mutex_unlock(&_mutex);
- }
- void Stop()
- {
- pthread_mutex_lock(&_mutex);
- _isRunning = false;//终止线程池
- pthread_cond_broadcast(&_cond);//唤醒所有等待的线程
- pthread_mutex_unlock(&_mutex);
- }
- ~ThreadPool()
- {
- pthread_mutex_destroy(&_mutex);
- pthread_cond_destroy(&_cond);
- }
- private:
- std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
- std::queue<T> _taskQueue;//任务队列
- int _threadNum;//线程数
- int _waitNum;//等待的线程数
- bool _isRunning;//线程池是否在运行
- pthread_mutex_t _mutex;//互斥锁
- pthread_cond_t _cond;//条件变量
- };
复制代码 2、Main.cc
- #include <iostream>
- #include <vector>
- #include <string>
- #include <ctime>
- #include <cstdlib>
- #include <unistd.h>
- #include <pthread.h>
- #include"ThreadPool.hpp"
- int Add()
- {
- int a = rand() % 100 + 1;
- int b = rand() % 100 + 1;
- std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
- return a+b;
- }
- int main()
- {
- srand(static_cast<unsigned int>(time(nullptr)));
- ThreadPool<int(*)(void)> tp(3);
- tp.ThreadInit();
- tp.StartAll();
- for (int i = 0; i < 10; i++)
- {
- tp.EnQueue(Add);
- sleep(1);
- }
- tp.Stop();
- tp.JoinAll();
- return 0;
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|