这篇博客讲授了历程池的创建过程,并在最后附上了完整代码。
现在有一个父历程,然后提前创建出一批子历程,将来假如父历程master有一些任务要交给子历程去运行,而不消像shell,需要执行命令才回去创建历程,创建历程本身也是有成本的。父历程要把任务派发给一些子历程,注定了要进行通讯,我们可以提前给每一个历程创建管道,由父历程持有写端,子历程持有读端。我们有了管道这种技能,就可以让父历程通过管道将任务传递给子历程,想让哪个历程执行任务,就给哪个管道写入任务,我们把提前创建的这批历程叫做历程池, 这种预先创建的历程就可以大大减少将来执行任务时创建历程的成本。master只负责往管道写任务,子历程只会等候任务的到来,一旦来了就会处置惩罚。假如父历程不往管道里写任务,管道里没数据,管道读写端的文件形貌符也没关,子历程会壅闭等候,等候任务的原理!!master向哪一个管道写入,就是叫醒哪一个子历程来处置惩罚任务!
如许就通过管道实现了历程的协同,可以由父历程定向叫醒一个或多个历程。我们在给子历程分配任务时,不能让一个特殊忙而是让它们均衡一些,父历程在进行任务分别时要做到分别的负载均衡!
我们站在父历程的角度,创建一个信道类Channel,
- //master
- class Channel
- {
- private:
- int _wfd;
- pid_t _subprocessid;
- std::string _name;
- };
复制代码 此中,_wfd是管道的写入端,_subprocessid是对应子历程的id,_name表示管道的名字。
并通过vector来管理管道:
- std::vector<Channel> channels;
复制代码 有一个提示需要交接一下,在C++中,我们的形参定名规范:
const &:输入型参数
& :输入输出型参数
* :输出型参数
我们接下来就是创建信道和子历程,并把它们打印出来,看看我们的代码框架有没有标题:
- #include <iostream>#include <string>#include <vector>#include <sys/types.h>#include <unistd.h>void work(int rfd){ while(true) { sleep(1); }}//masterclass Channel{public: Channel(int wfd,pid_t id,const std::string& name) :_wfd(wfd) ,_subprocessid(id) ,_name(name) { } int GetWfd() {return _wfd ;} pid_t GetProcessId(){return _subprocessid;} std::string GetName(){return _name;} ~Channel() { }private: int _wfd; pid_t _subprocessid; std::string _name;};void CreateChannelAndSub(int num, std::vector<Channel>* channels){ for(int i = 0; i<num ; i++) { //1.创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if(n < 0) exit(1); //2.创建子历程 pid_t id = fork(); if(id == 0) { //child close(pipefd[1]); work(pipefd[0]); close(pipefd[0]); exit(0); } //father close(pipefd[0]); //a.子历程的pid b.父历程关心的管道的w端 //3.构造一个通道的名字 std::string channel_name = "Channel-" + std::to_string(i); channels->push_back(Channel(pipefd[1],id,channel_name)); }}// ./processpool 5int main(int argc,char* argv[]){ if(argc != 2) { std::cerr << "Usage: " << argv[0] << " processnum" << std::endl; return 1; } std::vector<Channel> channels; int num = std::stoi(argv[1]); //1.创建信道和子历程 CreateChannelAndSub(num, &channels); //for test for(auto& channel : channels) { std::cout<<"==============================="<<std::endl; std::cout<<channel.GetName()<<std::endl; std::cout<<channel.GetWfd()<<std::endl; std::cout<<channel.GetProcessId()<<std::endl; } sleep(100); return 0;}
复制代码
通过运行程序,我们给的命令行参数是10,创建10个子历程,然后打开历程监测窗口:
可以看到我们创建管道和子历程乐成,非常好!
我们搭建好框架后,接下来就要通过channel控制子历程、接纳管道和子历程。
父历程需要给子历程发任务,那任务是什么呢?父历程是没有办法把函数发送到管道里的,而任务其实就是让子历程执行某段代码,而父子历程数据可以写时拷贝,但是代码是共享的,以是,我们要构建任务,我们可以由父历程预先规定一些任务,这些任务本质就是一张表(函数指针数组),保存了各种方法的地址,将来我们就可以往管道里写固定长度的4字节的数组下标(任务码),以是,我们现在要转过头构建一批任务,为了方便,我们创建Task.hpp文件,.hpp允许声明和实现写在一个头文件里,.hpp文件的缺点是无法形成库,只能开源式地给别人,一样平常在开源项目里会用到:
- #pragma once
- #include <iostream>
- #include <stdlib.h>
- #include <ctime>
- #include <sys/types.h>
- #include <unistd.h>
- #define TaskNum 3
- typedef void (*task_t)(); // task_t 函数指针类型
- void Print()
- {
- std::cout << "I am print task" << std::endl;
- }
- void DownLoad()
- {
- std::cout<< "I am a download task" << std::endl;
- }
- void Flush()
- {
- std::cout<< "I am a flush task" << std::endl;
- }
- task_t tasks[TaskNum];
- void LoadTask()
- {
- srand(time(nullptr) ^ getpid() ^ 1642);
- tasks[0] = Print;
- tasks[1] = DownLoad;
- tasks[2] = Flush;
- }
- void ExcuteTask(int number)
- {
- if(number < 0 || number > 2) return;
- tasks[number]();
- }
- int SelectTask()
- {
- return rand() % TaskNum;
- }
复制代码 在我们的代码中,按顺序给各个子历程发送任务,这种叫轮询方案,以下是我们的具体实今世码:
- #include <iostream>#include <string>#include <vector>#include <sys/types.h>#include <sys/wait.h>#include <unistd.h>#include "Task.hpp"void work(int rfd){ while(true) { int command = 0; int n = read(rfd,&command,sizeof(command)); if(n == sizeof(int)) { std::cout << "pid is : " << getpid() << " handler task" << std::endl; ExcuteTask(command); } else if(n == 0) { std::cout << "sub process : " << getpid() << "quit" << std::endl; break; } }}//masterclass Channel{public: Channel(int wfd,pid_t id,const std::string& name) :_wfd(wfd) ,_subprocessid(id) ,_name(name) { } int GetWfd() {return _wfd ;} pid_t GetProcessId(){return _subprocessid;} std::string GetName(){return _name;} void CloseChannel() { close(_wfd); } void Wait() { pid_t rid = waitpid(_subprocessid, nullptr, 0); if(rid > 0) { std::cout << "wait " << rid << " success" << std::endl; } } ~Channel() { }private: int _wfd; pid_t _subprocessid; std::string _name;};void CreateChannelAndSub(int num, std::vector<Channel>* channels){ for(int i = 0; i<num ; i++) { //1.创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if(n < 0) exit(1); //2.创建子历程 pid_t id = fork(); if(id == 0) { //child close(pipefd[1]); work(pipefd[0]); close(pipefd[0]); exit(0); } //father close(pipefd[0]); //a.子历程的pid b.父历程关心的管道的w端 //3.构造一个通道的名字 std::string channel_name = "Channel-" + std::to_string(i); channels->push_back(Channel(pipefd[1],id,channel_name)); }}int NextChannel(int channelnum){ static int next = 0; int channel = next; next++; next %= channelnum; return channel;}void SendTaskCommand(Channel& channel, int taskcommand){ write(channel.GetWfd(),&taskcommand,sizeof(taskcommand));}void ctrlProcessOnce(std::vector<Channel>& channels){ sleep(1); //a. 选择一个任务 int taskcommand = SelectTask(); //b. 选择一个信道和历程 int channel_index = NextChannel(channels.size()); //c. 发送任务 SendTaskCommand(channels[channel_index],taskcommand); std::cout << std::endl; std::cout << "taskcommand: " << taskcommand << " channel: " \ << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;}void ctrlProcess(std::vector<Channel>& channels, int times = -1){ if(times > 0) { while(times--) { ctrlProcessOnce(channels); } } else { while(true) { ctrlProcessOnce(channels); } } }void CleanUpChannel(std::vector<Channel>& channels){ for(auto& channel : channels) { channel.CloseChannel(); } for(auto& channel : channels) { channel.Wait(); }}// ./processpool 5int main(int argc,char* argv[]){ if(argc != 2) { std::cerr << "Usage: " << argv[0] << " processnum" << std::endl; return 1; } std::vector<Channel> channels; int num = std::stoi(argv[1]); LoadTask(); //1.创建信道和子历程 CreateChannelAndSub(num, &channels); //2.通过channel控制子历程 ctrlProcess(channels,num); //3.接纳管道和子历程 a.关闭所有的写端 b.接纳子历程 CleanUpChannel(channels); sleep(5); return 0;}
复制代码 此中,上面的代码有两个小细节处置惩罚:
1.在创建子历程时, ,如许就可以让子历程不关心管道读端,只需要从标准输入读就行。
如许就可以将管道的逻辑和子历程执行任务的逻辑解耦。
2.子历程要执行的word本身就是一个任务,可以作为CreateChannelAndSub的参数传入,在然后回调work。此中task_t task叫做回调函数,将来所有子历程都回去调用传入的task。如许之后,历程池本身的代码和任务本身两个文件就彻底解耦了!(把work函数放到Task.hpp )
可是现在我们的代码还存在一个BUG,我们来看,
里面写了两个循环,不可以放到一个循环里吗?
我们发现,随着管道的创建,越来越多的写端指向第一个管道,假如创建了10个子历程,那就有10个写端指向第一个管道。以是,假如两个循环写到一起,就会重新向后关管道的文件形貌符,第一个关完后,照旧有九个文件形貌符指向第一个管道,管道中对文件形貌符有引用计数,此时,这个管道并没有向我们预期的那样退出,写端没有关完,读端什么都读不到,读端仍然壅闭,子历程不退出,历程就壅闭了。
那为什么写成两个循环就可以呢?因为当关掉最后一个管道时,最后一个子历程指向上一个管道的写端就被开释了,类似于递归,从下往上就关掉了。
我们现在意识到了这个标题,那我们就可以倒着先关闭最下面的管道,
- void CleanUpChannel(std::vector<Channel>& channels)
- {
- int num = channels.size()-1;
- while(num >= 0)
- {
- channels[num].CloseChannel();
- channels[num--].Wait();
- }
- }
复制代码 如许做是没有标题的,但是我们并没有从源头上办理这个bug,我们就不应该让这种情况发生,我们的想法是如许的,假如是第二次及之后创建子历程时,*channels数组肯定不为空,里面肯定包罗写端,那就把里面的每一个管道的写端关闭一下就好:
完整的历程池代码附上:
Makefile
- processpool:ProcessPool.cc
- g++ -o $@ $^ -std=c++11
- .PHONY:clean
- clean:
- rm -f processpool
复制代码 Task.hpp
- #pragma once
- #include <iostream>
- #include <stdlib.h>
- #include <ctime>
- #include <sys/types.h>
- #include <unistd.h>
- #define TaskNum 3
- typedef void (*task_t)(); // task_t 函数指针类型
- void Print()
- {
- std::cout << "I am print task" << std::endl;
- }
- void DownLoad()
- {
- std::cout<< "I am a download task" << std::endl;
- }
- void Flush()
- {
- std::cout<< "I am a flush task" << std::endl;
- }
- task_t tasks[TaskNum];
- void LoadTask()
- {
- srand(time(nullptr) ^ getpid() ^ 1642);
- tasks[0] = Print;
- tasks[1] = DownLoad;
- tasks[2] = Flush;
- }
- void ExcuteTask(int number)
- {
- if(number < 0 || number > 2) return;
- tasks[number]();
- }
- int SelectTask()
- {
- return rand() % TaskNum;
- }
- void work(){ while(true) { int command = 0; int n = read(0,&command,sizeof(command)); if(n == sizeof(int)) { std::cout << "pid is : " << getpid() << " handler task" << std::endl; ExcuteTask(command); } else if(n == 0) { std::cout << "sub process : " << getpid() << "quit" << std::endl; break; } }}
复制代码 ProcessPool.cc
- #include <iostream>#include <string>#include <vector>#include <sys/types.h>#include <sys/wait.h>#include <unistd.h>#include "Task.hpp"// void work(int rfd)// {// while(true)// {// int command = 0;// int n = read(rfd,&command,sizeof(command));// if(n == sizeof(int))// {// std::cout << "pid is : " << getpid() << " handler task" << std::endl;// ExcuteTask(command);// }// else if(n == 0)// {// std::cout << "sub process : " << getpid() << "quit" << std::endl;// break;// }// }// }// masterclass Channel{public: Channel(int wfd, pid_t id, const std::string &name) : _wfd(wfd), _subprocessid(id), _name(name) { } int GetWfd() { return _wfd; } pid_t GetProcessId() { return _subprocessid; } std::string GetName() { return _name; } void CloseChannel() { close(_wfd); } void Wait() { pid_t rid = waitpid(_subprocessid, nullptr, 0); if (rid > 0) { std::cout << "wait " << rid << " success" << std::endl; } } ~Channel() { }private: int _wfd; pid_t _subprocessid; std::string _name;};void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task){ for (int i = 0; i < num; i++) { // 1.创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1); // 2.创建子历程 pid_t id = fork(); if (id == 0) { // 第二次及之后创建管道 if (!channels->empty()) { for (auto &channel : *channels) { channel.CloseChannel(); } } // child close(pipefd[1]); dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入 task(); close(pipefd[0]); exit(0); } // father close(pipefd[0]); // a.子历程的pid b.父历程关心的管道的w端 // 3.构造一个通道的名字 std::string channel_name = "Channel-" + std::to_string(i); channels->push_back(Channel(pipefd[1], id, channel_name)); }}int NextChannel(int channelnum){ static int next = 0; int channel = next; next++; next %= channelnum; return channel;}void SendTaskCommand(Channel &channel, int taskcommand){ write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));}void ctrlProcessOnce(std::vector<Channel> &channels){ sleep(1); // a. 选择一个任务 int taskcommand = SelectTask(); // b. 选择一个信道和历程 int channel_index = NextChannel(channels.size()); // c. 发送任务 SendTaskCommand(channels[channel_index], taskcommand); std::cout << std::endl; std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;}void ctrlProcess(std::vector<Channel> &channels, int times = -1){ if (times > 0) { while (times--) { ctrlProcessOnce(channels); } } else { while (true) { ctrlProcessOnce(channels); } }}void CleanUpChannel(std::vector<Channel> &channels){ int num = channels.size() - 1; while (num >= 0) { channels[num].CloseChannel(); channels[num--].Wait(); } // for(auto& channel : channels) // { // channel.CloseChannel(); // } // for(auto& channel : channels) // { // channel.Wait(); // }}// ./processpool 5int main(int argc, char *argv[]){ if (argc != 2) { std::cerr << "Usage: " << argv[0] << " processnum" << std::endl; return 1; } std::vector<Channel> channels; int num = std::stoi(argv[1]); LoadTask(); // 1.创建信道和子历程 CreateChannelAndSub(num, &channels, work); // 2.通过channel控制子历程 ctrlProcess(channels, num); // 3.接纳管道和子历程 a.关闭所有的写端 b.接纳子历程 CleanUpChannel(channels); sleep(5); return 0;}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |