C语言实现线程池功能
发布日期:2021-06-23 04:43:52
浏览次数:5
分类:技术文章
本文共 13443 字,大约阅读时间需要 44 分钟。
1. 线程池基本原理
2. 线程池C语言实现
2.1 线程池的数据结构
#include#include #include #include #include #include #include typedef struct { void *(*function)(void *); /*函数指针,回调函数*/ void *arg; /*上面函数的参数*/} threadpool_task_t; /*各子线程任务结构体*/typedef struct threadpool_s { pthread_mutex_t lock; /*用于锁住本结构体*/ pthread_mutex_t thread_counter; /*记录忙状态线程个数的锁 -- busy_thread_num*/ pthread_cond_t queue_not_full; /*当队列任务满时,添加任务的线程阻塞,等待此条件变量*/ pthread_cond_t queue_not_empty; /*任务队列不为空时,通知等待任务的线程*/ pthread_t *workers_tid; /*存放线程池中每个线程的tid,数组*/ pthread_t manager_tid; /*存管理线程tid*/ threadpool_task_t *task_queue; /*任务队列*/ int min_thread_num; /*线程池最小线程数*/ int max_thread_num; /*线程池最大线程数*/ int live_thread_num; /*当前存活线程个数*/ int busy_thread_num; /*忙线程个数*/ int wait_exit_thr_num; /*要销毁的线程个数*/ int queue_front; /*task_queue队头下表*/ int queue_rear; /*task_queue队尾下表*/ int queue_size; /*task_queue队列中实际任务数*/ int queue_max_size; /*task_queue队列可容纳任务上限*/ int shutdown; /*标志位,线程池使用状态,true或者false*/} threadpool_t;
2.2 线程池的创建
/**************************************************************************** 创建线程池*函数名: * threadpool_create()*参 数:* min_thread_num :线程池中最小线程数量* max_thread_num :线程池中最大线程数量* queue_max_size :任务队列的最大长度*作 用: * 创建一个指定大小的线程池*内 容:* 1)线程池基本参数* 2)工作线程* 3)管理线程* 4)任务队列* 5)互斥锁、条件变量****************************************************************************/threadpool_t *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size){ int i; threadpool_t *pool = NULL; do { pool = (threadpool_t *)malloc(sizeof(threadpool_t)); if (pool == NULL) { printf("malloc threadpool fail\n"); goto err_1; } pool->min_thread_num = min_thread_num; pool->max_thread_num = max_thread_num; pool->busy_thread_num = 0; pool->live_thread_num = min_thread_num; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->queue_front = 0; pool->queue_rear = 0; pool->shutdown = 0; /*根据最大线程上限数,给工作线程数据开辟空间,并清零*/ pool->workers_tid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num); if (pool->workers_tid == NULL) { printf("malloc workers_tid fail\n"); goto err_2; } memset(pool->workers_tid, 0, sizeof(pthread_t) * max_thread_num); /* 队列开辟空间 */ pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size); if (pool->task_queue == NULL) { printf("malloc task_queue fail\n"); goto err_3; } /* 初始化互斥锁,条件变量 */ if (pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0) { printf("init the lock or cond fail\n"); goto err_4; } /* 启动 min_thread_num 个 work thread */ for (i = 0; i < min_thread_num; i++) { pthread_create(&(pool->workers_tid[i]), NULL, workers_thread,(void *)pool); /*pool指向当前线程池*/ printf("start thread 0x%x...\n", (unsigned int)pool->workers_tid[i]); } /*创建管理者线程*/ pthread_create(&(pool->manager_tid), NULL, manager_thread, (void *)pool); } while(0); return pool; //threadpool_free(pool); /*前面代码调用失败,释放poll存储空间*/ err_4: /*需要销毁互斥锁和条件变量*/ free(pool->task_queue); err_3: free(pool->workers_tid);err_2: free(pool);err_1: return NULL;}
2.3 管理线程处理函数
#define DEFAULT_TIME 60#define MIN_WAIT_TASK_NUM 10#define DEFAULT_THREAD_VERY 5/**************************************************************************** 管理者线程*函数名: * manager_thread()*参 数:* threadpool : 使用的线程池*作 用: * 根据任务的数量动态调整线程池大小*内 容:* 1)获取当前线程池中存在的线程和任务队列中积累的任务* 2)根据需求动态调整线程池中的线程的数量*缺 点:* 使用了太多的互斥锁和条件变量,效率上值得商榷****************************************************************************/void *manager_thread(void *threadpool){ threadpool_t *pool = (threadpool_t *)threadpool; int i; while (!pool->shutdown) { sleep(DEFAULT_TIME); /*定时对线程池管理*/ pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; int live_thread_num = pool->live_thread_num;/*线程池中存在的线程数量*/ pthread_mutex_unlock(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); int busy_thread_num = pool->busy_thread_num; pthread_mutex_unlock(&(pool->thread_counter)); /* 创建新线程算法,任务数大于最小线程池个数, * 且存活的线程数小于最大线程数时 */ if (queue_size >= MIN_WAIT_TASK_NUM && live_thread_num < pool->max_thread_num){ pthread_mutex_lock(&(pool->lock)); int add = 0; /* 一次增加 DEFAULT_THREAD_VERY 个线程*/ for (i = 0; i < pool->max_thread_num && add < DEFAULT_THREAD_VERY && pool->live_thread_num < pool->max_thread_num; i++) { if (pool->workers_tid[i] == 0 || !is_thread_alive(pool->workers_tid[i])) { pthread_create(&(pool->workers_tid[i]), NULL, workers_thread,(void *)pool); add++; pool->live_thread_num++; } } pthread_mutex_unlock(&(pool->lock)); } /* 销毁多余的空闲线程算法,忙线程 x2 小于存活的线程数 且 * 存活的线程数大于最小线程数时 */ if (busy_thread_num * 2 < live_thread_num && live_thread_num > pool->min_thread_num) { /*一次销毁 DEFAULT_THREAD_VERY 个线程*/ pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_VERY; pthread_mutex_unlock(&(pool->lock)); for (i = 0; i < DEFAULT_THREAD_VERY; i++) { /*通知处在空闲状态的线程,他们会自行终止,线程自杀*/ pthread_cond_signal(&(pool->queue_not_empty)); } } } return NULL;}
2.4 工作者线程处理函数
/**************************************************************************** 工作线程处理函数*函数名: * workers_thread()*参 数:* threadpool :包含线程池中所有的参数*作 用: * 等待分配任务并执行之*内 容:* 1)睡眠等待分配任务* 2)是否终结本线程* 3)从任务队列上取任务,更改线程busy状态* 4)执行任务* 5)恢复为空闲状态*缺 点:* 使用了太多的互斥锁和条件变量,效率上值得商榷****************************************************************************/void *workers_thread(void *threadpool){ threadpool_t *pool = (threadpool_t *)thr eadpool; threadpool_task_t task; while(1) { /* Lock must be taken to wait on condition variable */ /* 刚创建出线程,等待任务队列里面有任务,否则阻塞等待任务队列里有任务再唤醒 * 接收任务 */ pthread_mutex_lock(&(pool->lock)); /* queue_size == 0 说明没有任务,调wait 阻塞在条件变量上,若有任务,跳过该while */ while((pool->queue_size == 0) && (!pool->shutdown)) { printf("Workers'thread ID 0x%x is waiting\n", (unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); /* 清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程 */ if (pool->wait_exit_thr_num > 0) { /* 如果线程池里的线程个数大于最小值时可以结束当前线程 */ if (pool->live_thread_num > pool->min_thread_num) { printf("Workers'thread ID 0x%x is exiting\n", (unsigned int)pthread_self()); pool->live_thread_num--; pool->wait_exit_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } } } /*如果关闭了线程池,自行退出处理*/ if (pool->shutdown == 1) { printf("Workers'thread ID 0x%x is exiting\n", (unsigned int)pthread_self()); pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } /*从任务队列里获取任务,是一个出队操作*/ task.function = pool->task_queue[pool->queue_front].function; task.arg = pool->task_queue[pool->queue_front].arg; /*出队,模拟环形队列*/ pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; pool->queue_size--; /*通知可以有新的任务添加进来*/ pthread_cond_broadcast(&(pool->queue_not_full)); /*任务取出后,立即将线程池锁释放*/ pthread_mutex_unlock(&(pool->lock)); /*设置当前线程忙状态*/ pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量锁*/ pool->busy_thread_num++; /*忙状态线程数+1*/ pthread_mutex_unlock(&(pool->thread_counter)); /*执行任务*/ (*(task.function))(task.arg); /*由忙状态切换为空闲状态*/ pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thread_num--; pthread_mutex_unlock(&(pool->thread_counter)); } return NULL;}
2.5 任务的添加
/**************************************************************************** 向线程池的任务队列中添加一个任务*函数名: * threadpool_add()*参 数:* pool : 使用的线程池* function :任务的执行函数* arg :任务执行参数的参数*作 用: * 向线程池的任务队列中添加一个任务*内 容:* 1)任务队列是否已满* 2)添加任务* 3)唤醒在任务队列上睡眠的线程*缺 点:* 使用了太多的互斥锁和条件变量,效率上值得商榷****************************************************************************/int threadpool_add(threadpool_t *pool, void *function(void *arg), void *arg){ pthread_mutex_lock(&(pool->lock)); /*为真,队列已满,调wait等待*/ while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) { pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); } if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); return 0; } /*清空工作线程 调用回调函数的参数 arg*/ if (pool->task_queue[pool->queue_rear].arg != NULL) { free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; } /*添加任务到任务队列里面*/ pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; pool->queue_size++; /*添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程*/ pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0;}
2.6 线程池的销毁
/**************************************************************************** 销毁线程池*函数名: * threadpool_distory()*参 数:* threadpool : 要销毁的线程池*作 用: * 销毁线程池*内 容:* 1)发送任务,销毁线程* 2)回收线程资源*缺 点:* 无****************************************************************************/int threadpool_distory(threadpool_t *pool){ int i; if (pool == NULL) { return -1; } pool->shutdown = 1; /*先销毁管理线程*/ pthread_join(pool->manager_tid, NULL); for (i = 0; i < pool->live_thread_num; i++) { /*通知所有空闲线程*/ pthread_cond_broadcast(&(pool->queue_not_empty)); } for (i = 0; i < pool->live_thread_num; i++) { /*回收所有管理者线程资源*/ pthread_join(pool->workers_tid[i], NULL); } threadpool_free(pool); return 0;}/**************************************************************************** 释放线程池资源*函数名: * threadpool_free()*参 数:* threadpool : 要释放的线程池*作 用: * 释放线程池资源*内 容:* 1)释放任务队列* 2)销毁互斥锁和条件变量* 3)释放线程池*缺 点:* 无****************************************************************************/int threadpool_free(threadpool_t *pool){ if (pool == NULL) { printf("thread pool is already free\n"); return -1; } if (pool->task_queue) { free(pool->task_queue); } if (pool->workers_tid) { free(pool->workers_tid); pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0;}
2.7 其他接口
/**************************************************************************** 判断当前线程是否存在*函数名: * is_thread_alive()*参 数:* tid : 待查询的线程PID*作 用: * 判断当前线程是否存在*内 容:* 1)发送0信号来判断*缺 点:* 无****************************************************************************/int is_thread_alive(pthread_t tid){ int kill_rc = pthread_kill(tid, 0); /*发0号信号,测试线程是否存活*/ if (kill_rc == ESRCH) { return 0; } return 1;}
2.9 测试demo
/**************************************************************************** 以下为测试demo*函数名: * process()*参 数:* arg : 任务参数*作 用: * 任务处理函数*内 容:* 1)执行任务*缺 点:* 无****************************************************************************//* 线程池中的线程,模拟处理业务 */void process(void *arg){ printf("thread 0x%x working on task %d\n", (unsigned int)pthread_self(), *(int *)arg); sleep(1); printf("task %d is end\n", *(int *)arg);}int main(int argc, char **argv){ int num[20], i; /*threadpool_t *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size)*/ /* 创建线程池,池里最小3个线程,最大100,队列最大100 */ threadpool_t *thp = threadpool_create(3, 100, 100); if (thp == NULL) { printf("threadpool_create fail\n"); return 0; } printf("pool init\n"); for (i = 0; i < 20; i++) { num[i] = i; printf("add task %d\n", i); /* 向线程池中添加任务 */ threadpool_add(thp, (void *)&process, (void *)&num[i]); } /*等待子线程完成任务*/ sleep(30); threadpool_distory(thp); return 0;}
代码实现参考文章:
经典文章:
转载地址:https://blog.csdn.net/s2603898260/article/details/106749471 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2024年03月21日 15时08分12秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
import java.io后报错_大神求解,IO报错文件名、目录名或卷标语法不正确
2019-04-21
给图片加上水印php视频,php给图片添加水印的实现过程
2019-04-21
php计算时间差js,js 求时间差怎么求实例代码
2019-04-21
php 分布式锁,php 实现Redis分布式锁
2019-04-21
php 替换表情符号,php – 表情符号用“<3”替换数组
2019-04-21
PHP程序中时间戳,PHP 时间戳 与 时间
2019-04-21
php groovy,编程语言Groovy重新崛起
2019-04-21
拆分订单php,订单拆分成子订单问题求教
2019-04-21
php模拟测试,使用模拟进行单元测试(PHP)
2019-04-21
java同步嵌套,更好的解决方案而不是Java中的嵌套同步块?
2019-04-21
实验二信号运算的matlab实现,实验1信号的产生及运算.doc
2019-04-21
php workerman怎么样,workerman怎样完成高并发_PHP开发框架教程
2019-04-21
php中http错误码,php – 哪些HTTP响应代码是错误的?
2019-04-21
字符串 生成a-z php,如何在excel中用函数或vba生成a-z的英文字母序列?
2019-04-21
chinaz php 解密,PHP加密解密
2019-04-21