C语言实现线程池功能
发布日期:2021-06-23 04:43:52 浏览次数:5 分类:技术文章

本文共 13443 字,大约阅读时间需要 44 分钟。

1. 线程池基本原理

在这里插入图片描述

2. 线程池C语言实现

2.1 线程池的数据结构

#include 
#include
#include
#include
#include
#include
#include
typedef struct { void *(*function)(void *); /*函数指针,回调函数*/ void *arg; /*上面函数的参数*/} threadpool_task_t; /*各子线程任务结构体*/typedef struct threadpool_s { pthread_mutex_t lock; /*用于锁住本结构体*/ pthread_mutex_t thread_counter; /*记录忙状态线程个数的锁 -- busy_thread_num*/ pthread_cond_t queue_not_full; /*当队列任务满时,添加任务的线程阻塞,等待此条件变量*/ pthread_cond_t queue_not_empty; /*任务队列不为空时,通知等待任务的线程*/ pthread_t *workers_tid; /*存放线程池中每个线程的tid,数组*/ pthread_t manager_tid; /*存管理线程tid*/ threadpool_task_t *task_queue; /*任务队列*/ int min_thread_num; /*线程池最小线程数*/ int max_thread_num; /*线程池最大线程数*/ int live_thread_num; /*当前存活线程个数*/ int busy_thread_num; /*忙线程个数*/ int wait_exit_thr_num; /*要销毁的线程个数*/ int queue_front; /*task_queue队头下表*/ int queue_rear; /*task_queue队尾下表*/ int queue_size; /*task_queue队列中实际任务数*/ int queue_max_size; /*task_queue队列可容纳任务上限*/ int shutdown; /*标志位,线程池使用状态,true或者false*/} threadpool_t;

2.2 线程池的创建

/**************************************************************************** 								创建线程池*函数名: *		 threadpool_create()*参  数:*		 min_thread_num	    :线程池中最小线程数量*		 max_thread_num		:线程池中最大线程数量*		 queue_max_size		:任务队列的最大长度*作  用: *		 创建一个指定大小的线程池*内  容:*		1)线程池基本参数*		2)工作线程*		3)管理线程*		4)任务队列*		5)互斥锁、条件变量****************************************************************************/threadpool_t *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size){
int i; threadpool_t *pool = NULL; do {
pool = (threadpool_t *)malloc(sizeof(threadpool_t)); if (pool == NULL) {
printf("malloc threadpool fail\n"); goto err_1; } pool->min_thread_num = min_thread_num; pool->max_thread_num = max_thread_num; pool->busy_thread_num = 0; pool->live_thread_num = min_thread_num; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->queue_front = 0; pool->queue_rear = 0; pool->shutdown = 0; /*根据最大线程上限数,给工作线程数据开辟空间,并清零*/ pool->workers_tid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num); if (pool->workers_tid == NULL) {
printf("malloc workers_tid fail\n"); goto err_2; } memset(pool->workers_tid, 0, sizeof(pthread_t) * max_thread_num); /* 队列开辟空间 */ pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size); if (pool->task_queue == NULL) {
printf("malloc task_queue fail\n"); goto err_3; } /* 初始化互斥锁,条件变量 */ if (pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0) {
printf("init the lock or cond fail\n"); goto err_4; } /* 启动 min_thread_num 个 work thread */ for (i = 0; i < min_thread_num; i++) {
pthread_create(&(pool->workers_tid[i]), NULL, workers_thread,(void *)pool); /*pool指向当前线程池*/ printf("start thread 0x%x...\n", (unsigned int)pool->workers_tid[i]); } /*创建管理者线程*/ pthread_create(&(pool->manager_tid), NULL, manager_thread, (void *)pool); } while(0); return pool; //threadpool_free(pool); /*前面代码调用失败,释放poll存储空间*/ err_4: /*需要销毁互斥锁和条件变量*/ free(pool->task_queue); err_3: free(pool->workers_tid);err_2: free(pool);err_1: return NULL;}

2.3 管理线程处理函数

#define DEFAULT_TIME 			60#define MIN_WAIT_TASK_NUM  		10#define DEFAULT_THREAD_VERY  	5/**************************************************************************** 								管理者线程*函数名: *		manager_thread()*参  数:*		threadpool	: 使用的线程池*作  用: *		 根据任务的数量动态调整线程池大小*内  容:*		1)获取当前线程池中存在的线程和任务队列中积累的任务*		2)根据需求动态调整线程池中的线程的数量*缺  点:*		使用了太多的互斥锁和条件变量,效率上值得商榷****************************************************************************/void *manager_thread(void *threadpool){
threadpool_t *pool = (threadpool_t *)threadpool; int i; while (!pool->shutdown) {
sleep(DEFAULT_TIME); /*定时对线程池管理*/ pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; int live_thread_num = pool->live_thread_num;/*线程池中存在的线程数量*/ pthread_mutex_unlock(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); int busy_thread_num = pool->busy_thread_num; pthread_mutex_unlock(&(pool->thread_counter)); /* 创建新线程算法,任务数大于最小线程池个数, * 且存活的线程数小于最大线程数时 */ if (queue_size >= MIN_WAIT_TASK_NUM && live_thread_num < pool->max_thread_num){
pthread_mutex_lock(&(pool->lock)); int add = 0; /* 一次增加 DEFAULT_THREAD_VERY 个线程*/ for (i = 0; i < pool->max_thread_num && add < DEFAULT_THREAD_VERY && pool->live_thread_num < pool->max_thread_num; i++) {
if (pool->workers_tid[i] == 0 || !is_thread_alive(pool->workers_tid[i])) {
pthread_create(&(pool->workers_tid[i]), NULL, workers_thread,(void *)pool); add++; pool->live_thread_num++; } } pthread_mutex_unlock(&(pool->lock)); } /* 销毁多余的空闲线程算法,忙线程 x2 小于存活的线程数 且 * 存活的线程数大于最小线程数时 */ if (busy_thread_num * 2 < live_thread_num && live_thread_num > pool->min_thread_num) {
/*一次销毁 DEFAULT_THREAD_VERY 个线程*/ pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_VERY; pthread_mutex_unlock(&(pool->lock)); for (i = 0; i < DEFAULT_THREAD_VERY; i++) {
/*通知处在空闲状态的线程,他们会自行终止,线程自杀*/ pthread_cond_signal(&(pool->queue_not_empty)); } } } return NULL;}

2.4 工作者线程处理函数

/**************************************************************************** 								工作线程处理函数*函数名: *		 workers_thread()*参  数:*		 threadpool	    :包含线程池中所有的参数*作  用: *		 等待分配任务并执行之*内  容:*		1)睡眠等待分配任务*		2)是否终结本线程*		3)从任务队列上取任务,更改线程busy状态*		4)执行任务*		5)恢复为空闲状态*缺  点:*		使用了太多的互斥锁和条件变量,效率上值得商榷****************************************************************************/void *workers_thread(void *threadpool){
threadpool_t *pool = (threadpool_t *)thr eadpool; threadpool_task_t task; while(1) {
/* Lock must be taken to wait on condition variable */ /* 刚创建出线程,等待任务队列里面有任务,否则阻塞等待任务队列里有任务再唤醒 * 接收任务 */ pthread_mutex_lock(&(pool->lock)); /* queue_size == 0 说明没有任务,调wait 阻塞在条件变量上,若有任务,跳过该while */ while((pool->queue_size == 0) && (!pool->shutdown)) {
printf("Workers'thread ID 0x%x is waiting\n", (unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); /* 清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程 */ if (pool->wait_exit_thr_num > 0) {
/* 如果线程池里的线程个数大于最小值时可以结束当前线程 */ if (pool->live_thread_num > pool->min_thread_num) {
printf("Workers'thread ID 0x%x is exiting\n", (unsigned int)pthread_self()); pool->live_thread_num--; pool->wait_exit_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } } } /*如果关闭了线程池,自行退出处理*/ if (pool->shutdown == 1) {
printf("Workers'thread ID 0x%x is exiting\n", (unsigned int)pthread_self()); pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } /*从任务队列里获取任务,是一个出队操作*/ task.function = pool->task_queue[pool->queue_front].function; task.arg = pool->task_queue[pool->queue_front].arg; /*出队,模拟环形队列*/ pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; pool->queue_size--; /*通知可以有新的任务添加进来*/ pthread_cond_broadcast(&(pool->queue_not_full)); /*任务取出后,立即将线程池锁释放*/ pthread_mutex_unlock(&(pool->lock)); /*设置当前线程忙状态*/ pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量锁*/ pool->busy_thread_num++; /*忙状态线程数+1*/ pthread_mutex_unlock(&(pool->thread_counter)); /*执行任务*/ (*(task.function))(task.arg); /*由忙状态切换为空闲状态*/ pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thread_num--; pthread_mutex_unlock(&(pool->thread_counter)); } return NULL;}

2.5 任务的添加

/**************************************************************************** 						向线程池的任务队列中添加一个任务*函数名: *		threadpool_add()*参  数:*		pool		: 使用的线程池*		function	:任务的执行函数*		arg			:任务执行参数的参数*作  用: *		 向线程池的任务队列中添加一个任务*内  容:*		1)任务队列是否已满*		2)添加任务*		3)唤醒在任务队列上睡眠的线程*缺  点:*		使用了太多的互斥锁和条件变量,效率上值得商榷****************************************************************************/int threadpool_add(threadpool_t *pool, void *function(void *arg), void *arg){
pthread_mutex_lock(&(pool->lock)); /*为真,队列已满,调wait等待*/ while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); } if (pool->shutdown) {
pthread_mutex_unlock(&(pool->lock)); return 0; } /*清空工作线程 调用回调函数的参数 arg*/ if (pool->task_queue[pool->queue_rear].arg != NULL) {
free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; } /*添加任务到任务队列里面*/ pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; pool->queue_size++; /*添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程*/ pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0;}

2.6 线程池的销毁

/**************************************************************************** 								销毁线程池*函数名: *		threadpool_distory()*参  数:*		threadpool	: 要销毁的线程池*作  用: *		 销毁线程池*内  容:*		1)发送任务,销毁线程*		2)回收线程资源*缺  点:*		无****************************************************************************/int threadpool_distory(threadpool_t *pool){
int i; if (pool == NULL) {
return -1; } pool->shutdown = 1; /*先销毁管理线程*/ pthread_join(pool->manager_tid, NULL); for (i = 0; i < pool->live_thread_num; i++) {
/*通知所有空闲线程*/ pthread_cond_broadcast(&(pool->queue_not_empty)); } for (i = 0; i < pool->live_thread_num; i++) {
/*回收所有管理者线程资源*/ pthread_join(pool->workers_tid[i], NULL); } threadpool_free(pool); return 0;}/**************************************************************************** 释放线程池资源*函数名: * threadpool_free()*参 数:* threadpool : 要释放的线程池*作 用: * 释放线程池资源*内 容:* 1)释放任务队列* 2)销毁互斥锁和条件变量* 3)释放线程池*缺 点:* 无****************************************************************************/int threadpool_free(threadpool_t *pool){
if (pool == NULL) {
printf("thread pool is already free\n"); return -1; } if (pool->task_queue) {
free(pool->task_queue); } if (pool->workers_tid) {
free(pool->workers_tid); pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0;}

2.7 其他接口

/**************************************************************************** 							判断当前线程是否存在*函数名: *		is_thread_alive()*参  数:*		tid	: 待查询的线程PID*作  用: *		 判断当前线程是否存在*内  容:*		1)发送0信号来判断*缺  点:*		无****************************************************************************/int is_thread_alive(pthread_t tid){
int kill_rc = pthread_kill(tid, 0); /*发0号信号,测试线程是否存活*/ if (kill_rc == ESRCH) {
return 0; } return 1;}

2.9 测试demo

/**************************************************************************** 							以下为测试demo*函数名: *		process()*参  数:*		arg	: 任务参数*作  用: *		 任务处理函数*内  容:*		1)执行任务*缺  点:*		无****************************************************************************//* 线程池中的线程,模拟处理业务 */void process(void *arg){
printf("thread 0x%x working on task %d\n", (unsigned int)pthread_self(), *(int *)arg); sleep(1); printf("task %d is end\n", *(int *)arg);}int main(int argc, char **argv){
int num[20], i; /*threadpool_t *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size)*/ /* 创建线程池,池里最小3个线程,最大100,队列最大100 */ threadpool_t *thp = threadpool_create(3, 100, 100); if (thp == NULL) {
printf("threadpool_create fail\n"); return 0; } printf("pool init\n"); for (i = 0; i < 20; i++) {
num[i] = i; printf("add task %d\n", i); /* 向线程池中添加任务 */ threadpool_add(thp, (void *)&process, (void *)&num[i]); } /*等待子线程完成任务*/ sleep(30); threadpool_distory(thp); return 0;}

代码实现参考文章:

经典文章:

转载地址:https://blog.csdn.net/s2603898260/article/details/106749471 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:adb devices offline_关于adb devices连不上手机的几种情况
下一篇:柬埔寨之旅——穿越千年,感受震撼

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年03月21日 15时08分12秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

import java.io后报错_大神求解,IO报错文件名、目录名或卷标语法不正确 2019-04-21
java error discription_动易java script error description缺少对象的完美解决方案 2019-04-21
flexpaper java 例子_Flexpaper二次开发入门教程》(十) Flexpaper简单使用-第一个Flexpaper例子... 2019-04-21
给图片加上水印php视频,php给图片添加水印的实现过程 2019-04-21
php计算时间差js,js 求时间差怎么求实例代码 2019-04-21
php 分布式锁,php 实现Redis分布式锁 2019-04-21
php 替换表情符号,php – 表情符号用“<3”替换数组 2019-04-21
PHP程序中时间戳,PHP 时间戳 与 时间 2019-04-21
php groovy,编程语言Groovy重新崛起 2019-04-21
拆分订单php,订单拆分成子订单问题求教 2019-04-21
php模拟测试,使用模拟进行单元测试(PHP) 2019-04-21
java extends e,java泛型中的? extend E 和 ? super E 2019-04-21
php 上传头像 裁剪插件,PHP+ajaxfileupload+jcrop插件完美实现头像上传剪裁_PHP教程 2019-04-21
java script 与java,为什么JavaScript调用JavaScript,因为它与Java无关? 2019-04-21
java同步嵌套,更好的解决方案而不是Java中的嵌套同步块? 2019-04-21
实验二信号运算的matlab实现,实验1信号的产生及运算.doc 2019-04-21
php workerman怎么样,workerman怎样完成高并发_PHP开发框架教程 2019-04-21
php中http错误码,php – 哪些HTTP响应代码是错误的? 2019-04-21
字符串 生成a-z php,如何在excel中用函数或vba生成a-z的英文字母序列? 2019-04-21
chinaz php 解密,PHP加密解密 2019-04-21