ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Linux:进程池制作(匿名管道版本 & 定名管道版本)
[打印本页]
作者:
王海鱼
时间:
2024-8-12 02:56
标题:
Linux:进程池制作(匿名管道版本 & 定名管道版本)
前言
创建进程是有时间成本的。当计算机要实行任务时才创建进程,势必会影响实行任务的性能。所以我们可以通过提前创建一批进程,当有任务需要被实行时直接喂给这些进程即可。我们把这些提前创建好的进程称为进程池!!
下面我们会通过一个主进程(父进程)通过匿名管道和一批工作进程(子进程)举行通讯。父进程通过不停派发任务给子进程,子进程通过读取管道文件中的任务码来实行对应的任务,从而模拟进程池的整个行为!!
一、匿名管道制作进程池
一、进程池框架
父进程创建一批子进程,并创建单向通讯通道。我们这里规定父进程向匿名管道中一次只能写4字节数据,子进程一次只能从管道文件中读取4字节数据。由于匿名管道的性质,父进程只需向目的子进程所对应的管道中发送任务码即可。如果管道文件中存在数据,子进程会不停读取任务码,实行对应的任务;否则子进程进入阻塞状态,等待主进程向管道中写入数据!
但主进程需要知道向那个管道中发送任务码,向那个匿名管道中举行写入,以及子进程信息。即我们需要对管道举行管理!
先描述,在组织!这里我们通过一个类对管道举行描述,其中生存着:主进程控制写端文件描述符、工作进程id和名字!由于我们需要实行快速随机访问,所以选择vector举行组织管理!
所以这个进程池的制作大致框架如下:
【描述结构体】:
#define NUM 5//任务进程个数
int number = 1;//管道编号
class channel
{
public:
channel(int fd, pid_t id)
:ctrlfd(fd)
,workerid(id)
{
name = "channle-" + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;
std::string name;
};
复制代码
【框架】:
int main()
{
std::vector<channel> channels;//用于管理管道
//1、创建管道,创建进程,工作进程工作方式
CreaterChannel(&channels);
//2、主进程向工作进程发送任务
// 这里特殊设计,我们通过g_always_loop来判断主进程是一直发送任务,还是发送指定次后就结束退出
const bool g_always_loop = true;
SendTask(channels, !g_always_loop, 10);
// 回收资源: 关闭写端,子进程自动退出
ReleaseChannels(channels);
return 0;
}
复制代码
二、创建管道、创建进程、工作进程实行任务
2.1 创建管道、创建进程
我们可以通过父进程循环NUM次,每次先创建管道,然后创建子进程。此时关闭父进程和子进程中不需要的读写段,创建单向通讯通道。此时就可以创建如下关系:
但上述简单关闭管道文件的读写段会存在题目的!我们需要特殊处理:
上述这些多余的指向管道读端是fork创建子进程时,子进程继承父进程的信息之一(红线)!所以我们每次创建出的子进程还需将全部继承父进程多余的读端全部关闭,否则无法回收子进程导致内存走漏!!
其中最简单的解决办法就是:
我们将父进程的写端文件描述符全部记录下来,每次创建出子进程时,子进程所继承的多余读写信息已经全部生存。我们只需依次将其关闭即可!!
void CreaterChannel(std::vector<channel> *channels)
{
std::vector<int> old;//记录子进程继承的多余读端
for(int i = 0; i < NUM; i++)//创建NUM个工作进程
{
// 1. 创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;//防止编译器报警
//2. 创建子进程
pid_t id = fork();
if(id < 0)
{
perror("fork");
return;
}
// 3. 形成单向通信
if(id == 0)//子进程,工作进程
{
if(!old.empty())//将子进程继承父进程的多余读端全部关闭
{
for(auto rfd : old)
{
close(rfd);
}
}
close(pipefd[1]);
dup2(pipefd[0], 0);//将读端重定向到标准输入
work();//子进程执行任务
exit(0);
}
//父进程,主进程
close(pipefd[0]);
channels->push_back(channel(pipefd[1], id));//主进程对应写端和工作进程信息
old.push_back(pipefd[1]);
}
}
复制代码
2.2 工作进程实行任务
当管道中有数据时,子进程只需读取相干任务码,然后不停实行即可!但如果此时父进程退出时,由于匿名管道的特性,read的返回值会被设为0,此时子进程在举行读取就没有任何意义了,子进程退出!!(查抄任务码和实行任务码实现后续会同一分析)
void work()
{
while(1)
{
int code = 0;
ssize_t n = read(0, &code, sizeof(code));
if(n == 0)
{//主进程写端退出,子进程也退出
break;
}
else if(n > 0)
{
if(!init.CheckSafe(code))//检查任务码是否合法
continue;
init.RunTask(code);//执行任务码
}
else
{
//nothing
}
}
}
复制代码
三、主进程向子进程发送任务
3.1 任务封装
下面我们仅仅通过一些打印数据来子进程待实行的全部模拟任务
【待实行任务】:
void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void Printflog()
{
std::cout << "我是一个打印日志任务" << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流任务" << std::endl;
}
复制代码
【任务封装】:
我们通过包装器function将上述指针函数举行同一。同时我们向管道中读取和写入的是任务码,所以下面我们给出了相应的任务码,并将上述怎样通过vector容器举行管理,下标对应任务码信息,并封装成了类!
除此之外,还提供选择任务接口(随机选择)、查抄任务码是否公道、任务码转对应任务名、运行特定任务码对应任务等接口!
详细如下:
using task_t = std::function<void()>;
class Init
{
public:
//任务码
static const int g_Download_code = 0;
static const int g_Printflog_code = 1;
static const int g_PushVideoStream_code = 2;
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(Printflog);
tasks.push_back(PushVideoStream);
srand(time(nullptr));
}
int SelectTask()//选择任务接口
{
return rand() % tasks.size();
}
std::string CodeToName(int code)//任务码转对应任务名
{
switch(code)
{
case 0:
return "Download";
case 1:
return "Printflog";
case 2:
return "PushVideoStream";
default:
return "Nothing";
}
}
bool CheckSafe(int code)//检查任务码是否合理
{
return code >= 0 && code < tasks.size();
}
void RunTask(int code)//行特定任务码对应任务
{
tasks[code]();
}
};
Init init;
复制代码
3.2 主进程向子进程发送任务
思量到子进程完成任务的负载均衡,我们通过循环依次向每一个子进程发送任务码,较为均匀的将任务分配给子进程!
向子进程发送任务码,起主要确定待发送的任务,信道。然后将任务码写入信道!
前面已经提到过,我们这里所设计的发送任务接口支持:死循环不停发送任务、发送指定次数任务后退出!所以最后我们需要判定是否发送任务需求竣事,退出!详细如下:
void SendTask(const std::vector<channel> &channels, bool flag, int num)
{
int pos = 0;//所选择信道所在数组位置下标
while(true)
{
// 1. 选择任务
int commit = init.SelectTask();
if(init.CheckSafe(commit))
{
// 2. 选择信道, 发送任务码
channel c = channels[pos++];
pos %= channels.size();
write(c.ctrlfd, &commit, sizeof(commit));
}
//判断是否需要退出
if(!flag)
{
if(--num == 0)
break;
}
}
}
复制代码
四、回收资源
当父进程发送任务信息退出后,我们仅需将
写端关闭即可
此时子进程实行read的返回值为0。子进程此时就能辨认到写端已经退出,此时子进程也退出!
最后让父进程等待子进程,防止子进程僵尸导致内存走漏即可!!
void ReleaseChannels(std::vector<channel> &channels)
{
for(auto & c : channels)
{
close(c.ctrlfd);
pid_t rid = waitpid(c.workerid, nullptr, 0);
if(rid == -1)
{
perror("waitpid");
return;
}
std::cout << "wait " << c.name << " " << c.workerid << " success" << std::endl;
}
}
复制代码
五、全部源代码
emsp;到处为止,进程池的简单制作到处就竣事了,下面是全部源码,其中还包含调试代码!
5.1源码
【ProcessPool.chh】
#pragma once #include <iostream>#include <functional>#include <vector>#include <ctime>using task_t = std::function<void()>;void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void Printflog()
{
std::cout << "我是一个打印日志任务" << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流任务" << std::endl;
}
class Init{public: //任务码 static const int g_Download_code = 0; static const int g_Printflog_code = 1; static const int g_PushVideoStream_code = 2; std::vector<task_t> tasks;public: Init() { tasks.push_back(Download); tasks.push_back(Printflog); tasks.push_back(PushVideoStream); srand(time(nullptr)); } int SelectTask() { return rand() % tasks.size(); } std::string CodeToName(int code) { switch(code) { case 0: return "Download"; case 1: return "Printflog"; case 2: return "PushVideoStream"; default: return "Nothing"; } } bool CheckSafe(int code) { return code >= 0 && code < tasks.size(); } void RunTask(int code) { tasks[code](); }};Init init;
复制代码
【processPool.cpp】
#include <iostream>#include <unistd.h>#include <cassert>#include <cerrno>#include <cstdlib>#include <string>#include <vector>#include <cstdio> #include <sys/wait.h>#include "ProcessPool.chh"#define NUM 5//任务进程个数
int number = 1;//管道编号
class channel
{
public:
channel(int fd, pid_t id)
:ctrlfd(fd)
,workerid(id)
{
name = "channle-" + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;
std::string name;
};
void work(){ while(1) { int code = 0; ssize_t n = read(0, &code, sizeof(code)); if(n == 0) {//主进程写端退出,子进程也退出 break; } else if(n > 0) { if(!init.CheckSafe(code)) continue; init.RunTask(code); } else { //nothing } }}void CreaterChannel(std::vector<channel> *channels){ std::vector<int> old;//记录主进程的读端 for(int i = 0; i < NUM; i++) { // 1. 创建管道 int pipefd[2]; int n = pipe(pipefd); assert(n == 0); (void)n;//防止编译器报警 //2. 创建子进程 pid_t id = fork(); if(id < 0) { perror("fork"); return; } // 3. 形成单向通讯 if(id == 0)//子进程,工作进程 { if(!old.empty())//将子进程继承父进程多余读端全部关闭 { for(auto rfd : old) { close(rfd); } for(auto id : old) { std::cout << "creater quit id:" << id << " " << std::endl; } } close(pipefd[1]); dup2(pipefd[0], 0);//将读端重定向到标准输入 work(); exit(0); } //父进程,主进程 close(pipefd[0]); channels->push_back(channel(pipefd[1], id)); old.push_back(pipefd[1]); }}//Debugvoid PrintChannel(const std::vector<channel> &channels){ std::cout << channels.size() << std::endl; for(int i = 0; i < channels.size(); i++) { std::cout << "name:" << channels[i].name << ", 写端描述符:" << channels[i].ctrlfd <<", 工作进程id:" << channels[i].workerid << std::endl; } std::cout << "Creater success" << std::endl;}void SendTask(const std::vector<channel> &channels, bool flag, int num){ int pos = 0;//所选择信道地点数组位置下标 while(true) { // 1. 选择任务,返回任务码 int commit = init.SelectTask(); if(init.CheckSafe(commit)) { // 2. 选择信道, 发送任务码 channel c = channels[pos++]; pos %= channels.size(); write(c.ctrlfd, &commit, sizeof(commit)); // Debug std::cout << "select channel: " << c.name << ", send task:" << init.CodeToName(commit) << "[" << commit << "]" << ", workerid:" << c.workerid << std::endl; } //判定是否需要退出 if(!flag) { if(--num == 0) break; } }}void ReleaseChannels(std::vector<channel> &channels)
{
for(auto & c : channels)
{
close(c.ctrlfd);
pid_t rid = waitpid(c.workerid, nullptr, 0);
if(rid == -1)
{
perror("waitpid");
return;
}
std::cout << "wait " << c.name << " " << c.workerid << " success" << std::endl;
}
}
int main(){ std::vector<channel> channels; //创建管道,创建进程 CreaterChannel(&channels); std::cout << "Creater end" << std::endl; PrintChannel(channels); //主进程向工作进程发送任务 const bool g_always_loop = true; SendTask(channels, !g_always_loop, 10); // 回收资源: 关闭写端,子进程主动退出 //sleep(3); ReleaseChannels(channels); return 0;}
复制代码
5.2 运行结果(包含调试信息)
六、定名管道实现进程池
6.1 实现思绪
定名管道实现进程池,我们可以创建多个定名管道,同时父进程打开写端,子进程打开读端!(注意此时创建的子进程还是会出现基础多余的读端文件描述符。所以子进程在打开定名管道读端前,同样需要先将多余的写端描述符关闭)
发生任务码,和子进程实行任务和匿名管道实现方式一样,就不多说了。
至于回收资源,那就更简单了。我们只需将定名管道写端描述符关闭,此时子进程会获取的读端退出信息(即read返回值为0),将管道读端也关闭!!最后父进程等待回收子进程,防止子进程僵尸即可!!
6.2 源码
【ProcessPool.cpp】
#include <iostream>
#include <unistd.h>
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <string>
#include <vector>
#include <cstdio>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "ProcessPool.chh"
#define NUM 5//任务进程个数
class fifo
{
public:
fifo(std::string str, pid_t id, int fd)
:FILENAME(str.c_str())
,workerid(id)
,ctrlfd(fd)
{ }
public:
const char *FILENAME;
pid_t workerid;
int ctrlfd;
};
int Work()
{
int code = 0;
while(true)
{
ssize_t n = read(0, &code, sizeof(code));
std::cout << "read size:" << n << std::endl;
if(n == 0)
{
std::cout << "write quit, mee to!" << std::endl;
break;
}
else if(n > 0)
{
init.RunTask(code);
}
else
{
std::cerr << "errno: " << errno << "errstring: " << strerror(errno) << std::endl;
return 1;
}
}
return 0;
}
bool Createrfifo(std::vector<fifo> *fifos)
{
std::vector<int> old;
for(int i = 1; i <= NUM; i++)
{
std::string filename = "fifo-" + std::to_string(i);
int s = mkfifo(filename.c_str(), 0644);
if(s == -1)
{
std::cout << "errno:" << errno << " strerror" << strerror(errno) << std::endl;
return false;
}
pid_t id = fork();
if(id == 0)//child
{
if(!old.empty())
{
for(auto& fd : old)
close(fd);
}
int rfd = open(filename.c_str(), O_RDONLY);
dup2(rfd, 0);
Work();
std::cout << "read close!!!!!!!!!!!!!!!!!!!!!" << std::endl;
close(rfd);
exit(0);
}
//parent
int wfd = open(filename.c_str(), O_WRONLY);
fifos->push_back(fifo(filename, id, wfd));
old.push_back(wfd);
}
return true;
}
void SendCommit(const std::vector<fifo> &fifos, int flag, int num)
{
int pos = 0;
while(true)
{
// 1. 选择任务
int code = init.SelectTask();
if(init.CheckSafe(code))
{
//2. 发送任务码
write(fifos[pos].ctrlfd, &code, sizeof(code));
pos = (pos + 1) % fifos.size();
}
// 3.判断是否退出
if(!flag)
{
if(--num == 0)
{
break;
}
}
}
std::cout << "write finish!!!" << std::endl;
}
void Releasefifo(std::vector<fifo> &fifos)
{
std::cout << "start release" << std::endl;
for(auto& fifo : fifos)
{
std::cout << "colse wfd: " << fifo.ctrlfd << std::endl;
close(fifo.ctrlfd);
pid_t id = waitpid(fifo.workerid, nullptr, 0);
}
std::cout << "release success!" << std::endl;
}
int main()
{
std::vector<fifo> fifos;
// 1.创建命名管道,进程
if(!Createrfifo(&fifos))
return 0;
sleep(2);
for(const auto& fifo : fifos)
std::cout << fifo.FILENAME <<":" << fifo.workerid << ":" << fifo.ctrlfd << std::endl;
// 2.发送任务
sleep(2);
bool g_always_loop = true;
SendCommit(fifos, !g_always_loop, 10);
// 3. 回收资源
sleep(2);
Releasefifo(fifos);
return 0;
}
复制代码
【ProcessPool.chh】
#pragma once #include <iostream>#include <functional>#include <vector>#include <ctime>using task_t = std::function<void()>;void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void Printflog()
{
std::cout << "我是一个打印日志任务" << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流任务" << std::endl;
}
class Init{public: //任务码 static const int g_Download_code = 0; static const int g_Printflog_code = 1; static const int g_PushVideoStream_code = 2; std::vector<task_t> tasks;public: Init() { tasks.push_back(Download); tasks.push_back(Printflog); tasks.push_back(PushVideoStream); srand(time(nullptr)); } int SelectTask() { return rand() % tasks.size(); } std::string CodeToName(int code) { switch(code) { case 0: return "Download"; case 1: return "Printflog"; case 2: return "PushVideoStream"; default: return "Nothing"; } } bool CheckSafe(int code) { return code >= 0 && code < tasks.size(); } void RunTask(int code) { tasks[code](); }};Init init;
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4