Linux下c++实现进程池
发布日期:2022-02-17 02:39:59 浏览次数:29 分类:技术文章

本文共 7170 字,大约阅读时间需要 23 分钟。

代码主要来自《linux高性能服务器编程》,我主要是复现相关功能供自己以后参考(滑稽

初次见到使用进程的进程池有点没看懂,所以记录一下自己的理解。

我们知道在线程池中工作线程是共享进程的全局变量的,工作线程的执行流程是个死循环,所有空闲线程都睡眠在条件变量(queue_notfull)上。当任务队列中有任务产生时,主控线程(或管理线程)便通知工作线程从任务队列中取任务执行(直接从条件变量上唤醒即可)。

在进程池中,由于子进程是父进程复制(fork)而来,所以两者有很大的相似性。在fork函数执行之前, 注意,此时仅有一个父进程并没有产生子进程,fork函数之前执行过的代码所产生的一些影响(比如对全局变量的修改)会在父子进程共有(并不是共享同一变量,而是变量值在父子进程中相同)。管道就是用这个实现父子进程通信。在fork语句之后的代码,子进程和父进程执行流程会分叉开来,通过在子进程的执行流程中做个标识可以区分父子进程。 总的来说,虽然子进程和父进程代码是一样的,但是共有的变量值可能不一样,一定注意哪些是一样的哪些是不一样的。

其次,在此代码实现的进程池中,父进程和子进程各自维护自己的epollfd变量,父进程和子进程们均阻塞于epoll_wait()函数上。

父进程在epoll_wait()上等待的事件有:

  1. 监听描述符listenfd,当有新连接到达时,父进程会从epoll_wait()返回。此时,父进程需要做的就是通过sub_process[i].m_pipefd[0]向子进程发送通知:·send(sub_process[i].m_pipefd[0], (char *)new_conn, sizeof(new_conn), 0);这时阻塞在epoll_wait()上的子进程sub_process[i]会被唤醒并处理连接请求。
  2. 通过setup_sig_pipe()函数注册的各种信号。当父进程的信号处理函数sig_handler(int sig)被信号触发时,sig_handler()会向sig_pipefd[1]写入被触发的信号值,父进程会从epoll_wait()醒来并捕捉sig_pipefd[0]上的读事件,从而在主流程中处理各种信号。
template
void processpool
::run_parent(){
setup_sig_pipe();//???? addfd(epollfd, listenfd, EPOLLIN | EPOLLET); epoll_event events[MAX_EVENT_NUM]; int sub_process_counter = 9; int new_conn = 1; int ret = -1; while(!stop){
int readnum = epoll_wait(epollfd, events, MAX_EVENT_NUM, -1); if(readnum < 0 && errno != EINTR){
printf("epoll fail\n"); break; } for(int i = 0; i < readnum; ++i){
int socketfd = events[i].data.fd; if(socket == listenfd){
//有新连接到达,以Round Robin方式配分一个子进程处理 } else if(socketfd == sig_pipefd[0] && (events[i].events & EPOLLIN)){
//父进程处理接收到的信号 int sig; char signals[1024]; ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); if(ret <= 0) continue; else{
fo(int i = 0; i < ret; ++i){
} } } } }}

子进程在epoll_wait()上等待的事件有:

  1. sub_process[index].m_pipefd[1]描述符读事件,此描述符可读时说明有新连接到达,子进程需要接受新连接。
  2. 通过setup_sig_pipe()函数注册的各种信号。
  3. 客户端发送的数据处理请求,每个子进程维护的epollfd注册了多个客户端连接,当其监听的描述符有可读事件时子进程从epoll_wait()醒来并处理请求。
//每个子进程单独维护一个epollfd,需要在此epollfd上接受新连接并且处理每个连接请求template
void processpool
::run_child(){
setup_sig_pipe();//????? //父进程通过此fd与子进程通信 int pipefd = sub_process[index].m_pipefd[1]; // addfd(epollfd, pipefd, EPOLLET | EPOLLIN); epoll_event events[MAX_EVENT_NUM]; T* users = new T[USER_PRE_PROCESS]; while(!stop){
//所有子进程都会阻塞在这里 //当子进程被唤醒时(有文件描述符就绪时),子进程需要处理以下几种事件 // int readnum = epoll_wait(epollfd, events, MAX_EVENT_NUM, -1); if((readnum < 0) && (errno != EINTR)){
//(readnum < 0 && errno == EINTR)说明epoll_wait被信号中断 printf("epoll failure\n"); break; } for(int i = 0; i < readnum; ++i){
int socketfd = events[i].data.fd; //父进程向子进程发送信息,通知子进程有新连接到达 if(socketfd == pipefd && (events[i].events & EPOLLIN)){
int client = 0; int ret = recv(socketfd, &client, sizeof(client), 0); if((ret < 0 && errno != EINTR) || ret == 0) continue; else{
//有新连接到达 struct scokaddr_in clientaddr; socklen_t clientaddrlen; int confd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientaddrlen); addfd(epollfd, confd, EPOLLET | EPOLLIN); // //users[confd].init(epollfd, confd, clientaddr); } } else if(socketfd == sig_pipefd[0] && (events[i].events & EPOLLIN)){
//子进程接收到信号,信号处理函数通过管道通知子进程 int sig; char signals[1024]; int ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); if(ret <= 0) continue; else{
for(int i = 0; i < ret; ++i){
switch(signals[i]){
case SIGCHLD: pid_t pid; int stat; while((pid = waitpid(-1, &stat, WNOHANG)) > 0) continue; break; case SIGTERM: case SIGINT: stop = true; break; default: break; } } } } else if(events[i].events & EPOLLIN){
//其他可读数据,客户端请求 //users[socketfd].process(); } } } delete[] users; users = NULL; close(pipefd); close(epollfd);}
/*************************************************************************	> File Name: processpool.cpp	> Author: ggboypc12138	> Mail: lc1030244043@outlook.com 	> Created Time: 2020年10月31日 星期六 09时53分04秒 ************************************************************************/#ifndef PROCESSPOOL_H#define PROCESSPOOL_H#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
static int sig_pipefd[2];static int setnonblocking(int fd){ int flag = fcntl(fd, F_GETFL); flag |= O_NONBLOCK; fcntl(fd, F_SETFL, flag); return flag;}//向epoll注册事件static void addfd(int epfd, int fd, int events){ epoll_event event; event.data.fd = fd; event.events = events; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);}static void removefd(int epfd, int fd){ epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd);}static void sig_handler(int sig){ //统一事件源,接收到信号并不做处理,而是向进程发送通知 int save_errno = errno; send(sig_pipefd[1], (char *)&sig, 1, 0); errno = save_errno; //????}static void add_sighandler(int sig, void (*handler)(int), bool restart = true){ struct sigaction act; bzero(&act, sizeof(act)); act.sa_handler = handler; if(restart) act.sa_flags |= restart; //阻塞处理其他信号 sigfillset(&act.sa_mask); sigaction(sig, &act, NULL);}class process{ public: process() : pid(-1){ } private: pid_t pid; int m_pipefd[2];};//模板参数是处理逻辑任务的类template
class processpool{ public: processpool(const processpool&) = delete; processpool(const processpool&&) = delete; private: processpool(int listenfd, int process_num = 8); public: //单例模式 processpool *create(int listenfd, int process_num = 8); //启动进程池 void run(); void setup_sig_pipe(); private: void run_parent(); void run_child(); private: static constexpr int MAX_PROCESS_NUM = 16; static constexpr int USER_PRE_PROCESS = 65535; static constexpr int MAX_EVENT_NUM = 1000; int process_num; //子进程序号,父进程index为-1 int index; //子进程和父进程中不同, int epollfd; int listenfd; bool stop; //子进程描述信息 std::vector
sub_process;};template
processpool
::processpool(int listenfd, int process_num) :listenfd(listenfd), process_num(process_num),index(-1), stop(false){ assert(process_num > 0 && process_num <= MAX_PROCESS_NUM); sub_process.resize(process_num); //创建子进程 for(int i = 0; i < process_num; ++i){ //父进程与每个子进程都会由一个管道来保持联系 socketpair(PF_UNIX, SOCK_STREAM, 0, sub_process[i].pipefd); sub_process[i].pid = fork(); if(sub_process[i].pid > 0){ //父进程中 close(sub_process[i].pipefd[1]); continue; } else{ //子进程中 close(sub_process[i].pipefd[0]); //每个被创建的子进程中index均不一样!!! //由此可以在之后的共有接口(void run()函数)作为区分执行流程的依据 index = i; return; } }}template
processpool
*processpool
::create(int listenfd, int process_num){ static processpool instance(listenfd, process_num); return instance; }template
void processpool
::setup_sig_pipe(){ epollfd = epoll_create(5); assert(epollfd != -1); socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd); setnonblocking(sig_pipefd[1]); addfd(epollfd, sig_pipefd[0], EPOLLIN | EPOLLET); add_sighandler(SIGCHLD, sig_handler); add_sighandler(SIGTERM, sig_handler); add_sighandler(SIGINT, sig_handler); add_sighandler(SIGPIPE, sig_handler);}template
void processpool
::run(){ if(index < 0){ //在父进程中 run_parent(); } else{ //在子进程中 run_child(); }}#endif

转载地址:https://blog.csdn.net/qq_39815320/article/details/109434957 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:c++万能引用的理解与使用
下一篇:2021-01-12

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年02月29日 11时54分36秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

mysql行级锁升级_mysql innodb 行级锁升级 2019-04-21
c 调用mysql密码为空_C语言连MySQL - osc_srnunz15的个人空间 - OSCHINA - 中文开源技术交流社区... 2019-04-21
mysql怎么分组查询所有数据库_Mysql-4 分组查询与子查询 2019-04-21
mysql 多列union_Mysql联合查询UNION和UNION ALL的使用介绍 2019-04-21
mysql导数据出指定数量_mysql导出指定数据或部份数据的方法 2019-04-21
java thread 多线程_java用Thread方式创建多线程 2021-06-24
java 注解与反射_Java注解与反射直接上手 2021-06-24
java按钮退出_java – 如何在此程序中添加退出按钮?怎么样“清楚”? 2019-04-21
python土味情话_Python 将土味情话语录设置为桌面壁纸 2019-04-21
java ip 范围内打卡_定位地理位置PHP判断员工打卡签到经纬度是否在打卡之内 2019-04-21
与java线程有关的,线程多少和什么有关?大神们表示有话要说! 2019-04-21
php正则表达式 匹配数字,正则表达式之匹配数字范围 2019-04-21
php中带?错误,参考-此错误在PHP中意味着什么? 2019-04-21
php生成链接列表,根据URL链接和抛文本生成链接<a>标签的PHP函数 2019-04-21
matlab里inline定义矩阵,Matlab中的inline函数_matlab中inline函数 2019-04-21
php html标签自定义属性,浅谈JS读取DOM对象(标签)的自定义属性 2019-04-21
如何使用matlab的siso,利用Matlab内建程式SISODesignTool完成系统分析(Matlab61)开启.PDF... 2019-04-21
php 实现 model层,Thinkhphp5控制器调用的Model层的方法总结 2019-04-21
matlab6.0序列号,MFC软件获取USB设备的制造商、产品、序列号 2019-04-21
matlab中多边形滤波器,几种常见空间滤波器MATLAB实现 2019-04-21