lnux下c语言实现线程池
发布日期:2022-02-17 02:39:59 浏览次数:26 分类:技术文章

本文共 7904 字,大约阅读时间需要 26 分钟。

线程池状态:

线程池中一些主要用到的变量:

typedef struct{

pthread_mutex_t lock;
pthread_mutex_t work_lock;
pthread_cond_t taskqueue_empty;
pthread_cond_t taskqueue_full;
pthread_t manage_thread; //管理线程id
pthread_t *threads_ID; //保存线程id的动态数组
int min_thread_num;
int max_thread_num;
int cur_thread_num;
int cur_work_num;
int wait_exit_thread_num; //等待销毁的线程个数
threadpool_task *taskqueue; //任务队列,用数组表示
int taskqueue_front;
int taskqueue_tail;
int taskqueue_cur_size;
int taskqueue_max_size;
int shutdown; //是否关闭线程池
}threadpool;

其中解释一下pthread_mutex_t lock;pthread_mutex_t work_lock;lock用于互斥保护主线程、管理线程和工作线程三者中共享的全局变量,而work_lock则是只用于互斥保护工作线程之间共享的pool_->cur_work_num

先上代码

/*************************************************************************	> File Name: thread_pool.h	> Author: ggboypc12138	> Mail: lc1030244043@outlook.com 	> Created Time: 2020年10月21日 星期三 18时32分56秒 ************************************************************************/#include 
//任务队列typedef struct{
void *(*func)(void *); void *arg;}threadpool_task;//线程池对象typedef struct{
pthread_mutex_t lock; pthread_mutex_t work_lock; pthread_cond_t taskqueue_empty; pthread_cond_t taskqueue_full; threadpool_task *taskqueue; //任务队列,用数组表示 pthread_t manage_thread; //管理线程id pthread_t *threads_ID; //保存线程id的动态数组 int min_thread_num; int max_thread_num; int cur_thread_num; int cur_work_num; int wait_exit_thread_num; // int taskqueue_front; int taskqueue_tail; int taskqueue_cur_size; int taskqueue_max_size; int shutdown; //是否关闭线程池 }threadpool; threadpool* threadpool_create(int , int , int);void threadpool_addtask(threadpool *pool, void *(*func)(void *arg), void *arg);void *threadpool_runtask(void *pool);void *threadpool_manage(void *pool);void threadpool_destroy(void *pool);void threadpool_free(void *pool);int is_thread_alive(pthread_t tid);
/*************************************************************************	> File Name: thread_pool.c 	> Author: ggboypc12138 	> Mail: lc1030244043@outlook.com 	> Created Time: 2020年10月21日 星期三 18时30分26秒 ************************************************************************/#include 
#include
#include
#include
#include
#include
#include
#include
#include "thread_pool.h"//管理线程每20s检查一遍是否#define DEFAULT_CHECK_TIME 20//任务队列累积任务,taskqueue_cur_size > MIN_WAIT_TAST_NUM 时需要添加线程到线程池#define MIN_WAIT_TAST_NUM 10//每次增加和销毁线程个数#define DERAULT_ADD_DEL_THREAD 4//线程池初始化threadpool *threadpool_create(int min_thr_num, int max_thr_num, int taskqueue_max_size){ threadpool *pool = malloc(sizeof(threadpool)); pool->min_thread_num = min_thr_num; pool->max_thread_num = max_thr_num; pool->cur_thread_num = min_thr_num; pool->cur_work_num = 0; pool->threads_ID = malloc(sizeof(pthread_t) * max_thr_num); bzero(pool->threads_ID, max_thr_num * sizeof(pthread_t)); //任务队列 pool->taskqueue_front = 0; pool->taskqueue_tail = 0; pool->taskqueue_cur_size = 0; pool->taskqueue_max_size = taskqueue_max_size; pool->taskqueue = malloc(taskqueue_max_size * sizeof(threadpool_task)); bzero(pool->taskqueue, taskqueue_max_size * sizeof(threadpool_task)); //创建工作线程 for(int i = 0; i < min_thr_num; ++i){ pthread_create(&pool->threads_ID[i], NULL, threadpool_runtask, (void *)pool); } //创建管理线程 pthread_create(&pool->manage_thread, NULL, threadpool_manage, (void *)pool); //初始化锁和条件变量 pthread_mutex_init(&pool->lock, NULL); pthread_mutex_init(&pool->work_lock, NULL); pthread_cond_init(&pool->taskqueue_empty, NULL); pthread_cond_init(&pool->taskqueue_full, NULL); pool->shutdown = 0; return pool;}//主线程调用,用于向任务队列中添加任务void threadpool_addtask(threadpool *pool, void *(*func)(void *), void *arg){ pthread_mutex_lock(&pool->lock); while(pool->taskqueue_cur_size == pool->taskqueue_max_size && pool->shutdown == 0){ pthread_cond_wait(&pool->taskqueue_empty, &pool->lock); } if(pool->shutdown == 1){ pthread_mutex_unlock(&pool->lock); return; } threadpool_task task; task.func = func; task.arg = arg; pool->taskqueue[pool->taskqueue_tail] = task; pool->taskqueue_tail = ++pool->taskqueue_tail % pool->taskqueue_max_size; pthread_mutex_unlock(&pool->lock); pthread_cond_signal(&pool->taskqueue_full);}//工作线程运行任务void *threadpool_runtask(void *pool){ threadpool *pool_ = (threadpool *)pool; threadpool_task task; while(1){ pthread_mutex_lock(&pool_->lock); if(pool_->shutdown == 0){ while(pool_->taskqueue_cur_size == 0){ pthread_cond_wait(&pool_->taskqueue_full, &pool_->lock); //被唤醒后,若线程池被要求关闭 if(pool_->shutdown == 1){ pthread_mutex_unlock(&pool_->lock); printf("thread 0x%x is exiting\n", (unsigned int)pthread_self()); pthread_exit(NULL); } //被唤醒后,若wait_exit_thread_num > 0,自行退出 if(pool_->wait_exit_thread_num > 0){ --pool_->wait_exit_thread_num; pthread_mutex_unlock(&pool_->lock); printf("thread 0x%x is exiting\n", (unsigned int)pthread_self()); pthread_exit(NULL); } } //从任务队列中取任务执行 task = pool_->taskqueue[pool_->taskqueue_front]; pool_->taskqueue_front = ++pool_->taskqueue_front % pool_->taskqueue_max_size; --pool_->taskqueue_cur_size; pthread_mutex_unlock(&pool_->lock); pthread_cond_signal(&pool_->taskqueue_empty); } else { pthread_mutex_unlock(&pool_->lock); return NULL; } pthread_mutex_lock(&pool_->work_lock); ++pool_->cur_work_num; pthread_mutex_unlock(&pool_->work_lock); task.func(task.arg); pthread_mutex_lock(&pool_->work_lock); --pool_->cur_work_num; pthread_mutex_unlock(&pool_->work_lock); }}//管理线程调用,管理线程主要向线程池中添加线程()和销毁多余线程()void *threadpool_manage(void *pool){ threadpool *pool_ = (threadpool *)pool; while(1){ pthread_mutex_lock(&pool_->lock); if(pool_->shutdown == 1){ pthread_mutex_unlock(&pool_->lock); return NULL; } //需要添加线程? if(pool_->cur_work_num == pool_->cur_thread_num && pool_->taskqueue_cur_size > 0 && pool_->cur_thread_num < pool_->max_thread_num){ int cur_num = pool_->cur_thread_num; pthread_mutex_unlock(&pool_->lock); int accumulate = 0; //pool_->taskqueue_max_size,DERAULT_ADD_DEL_THREAD,pool_->max_thread_num值一般是只读的,不需要加锁 for(int i = 0; i < pool_->taskqueue_max_size && accumulate < DERAULT_ADD_DEL_THREAD && (cur_num + accumulate) < pool_->max_thread_num; ++i){ if(pool_->threads_ID[i] == 0){ pthread_create(&pool_->threads_ID[i], NULL, threadpool_runtask, pool_); ++accumulate; } } pthread_mutex_lock(&pool_->lock); pool_->cur_thread_num = cur_num + accumulate; pthread_mutex_unlock(&pool_->lock); } else if(pool_->cur_work_num * 2 < pool_->cur_thread_num && pool_->cur_thread_num > pool_->min_thread_num){ //需要销毁多余线程? pool_->wait_exit_thread_num = DERAULT_ADD_DEL_THREAD; pthread_mutex_unlock(&pool_->lock); for(int i = 0; i < DERAULT_ADD_DEL_THREAD; ++i) pthread_cond_signal(&pool_->taskqueue_full); } }}//销毁线程池中所有线程并释放线程池相关资源void threadpool_destroy(void *pool){ threadpool *pool_ = (threadpool *)pool; pthread_mutex_lock(&pool_->lock); pool_->shutdown = 1; int thr_num = pool_->cur_thread_num; //必须先销毁管理线程 pthread_mutex_unlock(&pool_->lock); pthread_join(pool_->manage_thread, NULL); for(int i = 0; i < thr_num; ++i){ pthread_cond_broadcast(&pool_->taskqueue_full); pthread_join(pool_->threads_ID[i], NULL);//无需加锁,管理线程已退出 } threadpool_free(pool);}//释放线程池中相关资源void threadpool_free(void *pool){ }int is_thread_alive(pthread_t tid){ return pthread_kill(tid, 0) == ESRCH ? 0 : 1;}

处理流程中,主要有三种并行的处理流:主线程管理线程工作线程

主线程:负责正常的处理流程之外,还负责向线程池中的任务队列添加任务。
管理线程:当线程池中 空闲线程为零任务队列有任务存在 负责向线程池中添加和销毁多余的工作线程。
工作线程:从任务队列中取一个任务并执行。

具体而言:

在使用threadpool_create()函数初始化线程池时,主线程会创建若干工作线程和一个管理线程。起始情况下工作线程会直接等待阻塞在pool_->taskqueue_full条件变量上,管理线程则是循环判断是否需要添加或销毁多余线程。再具体一点,工作线程执行流也是一个死循环,每次在从任务队列取任务执行前,工作线程首先会判断一下pool_->shutdown == 1以及pool_->wait_exit_thread_num > 0条件是否满足,若满足则调用pthread_exit(NULL)终止自身执行,否则从任务队列中取一个任务并执行。管理线程在向池中添加线程时,其判断依据为:pool_->cur_work_num == pool_->cur_thread_num && pool_->taskqueue_cur_size > 0 && pool_->cur_thread_num < pool_->max_thread_num。销毁多余空闲工作线程的判断依据为:pool_->cur_work_num * 2 < pool_->cur_thread_num && pool_->cur_thread_num > pool_->min_thread_num(处于忙状态的工作线程个数少于当前线程池中线程数的二分之一)。

转载地址:https://blog.csdn.net/qq_39815320/article/details/115218822 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:2021-01-12
下一篇:2021-7-7

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月06日 15时21分22秒