在多任务并发处理的场景下,如果每来一个任务,就新建一个线程来处理,虽然功能上没问题,但由于线程的创建和销毁会带来很大的开销。线程池就是通过预先创建一定数量的线程,当有任务来时,就将任务分配给一个线程去处理!
线程池模型 下面是线程池的结构!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 struct threadpool_t { pthread_mutex_t lock; pthread_mutex_t thread_counter; pthread_cond_t queue_not_full; pthread_cond_t queue_not_empty; pthread_t *threads; pthread_t adjust_tid; threadpool_task_t *task_queue; int min_thr_num; int max_thr_num; int live_thr_num; int busy_thr_num; int wait_exit_thr_num; int queue_front; int queue_rear; int queue_size; int queue_max_size; bool shutdown; };
线程数组和任务队列 线程池的核心也就是线程数组和任务队列!
1. 线程数组:threads。
注意此处的实现中,线程的数量不是固定的,可在最小线程数和最大线程数之间动态调整。当突发任务数量大于最小线程数时,线程池会创建新线程以处理突发任务,任务处理完之后,若没有其他任务需要处理,管理线程 会定时回收空闲的线程,但保证线程池中线程数不会小于最小线程数。
当突发任务数量大于最大线程数时,线程池创建的线程数会被限制在最大线程数。
基于上述原理,线程池结构中,以下5个成员变量用于辅助管理线程数组:
1 2 3 4 5 6 7 pthread_t *threads; int min_thr_num; int max_thr_num; int live_thr_num; int busy_thr_num; int wait_exit_thr_num;
2. 任务队列:task_queue。
与线程数组不同,任务队列是一个固定大小的循环队列 ,用于存放待处理的任务,线程池需要处理的任务类型 和参数 不一,无法预先定义在线程中,只能通过任务传入。也就是说,任务队列中存储的内容要包括**任务类型(void *(*func)(void*arg))
和 参数(void *arg)
**。
使用线程池时,只需调用函数将任务添加到任务队列尾部即可,线程池会自动为任务分配线程处理。但当任务队列满时,线程池会拒绝添加新任务。
1 2 3 4 5 6 7 8 9 10 11 typedef struct { void *(*function)(void *); void *arg; } threadpool_task_t ; threadpool_task_t *task_queue; int queue_front; int queue_rear; int queue_size; int queue_max_size;
线程池管理线程 线程池中除了专门用于处理任务的线程,还需要有一个管理线程 ,用于管理线程池中的任务线程。线程池的动态扩容和销毁都是通过管理线程来完成的,管理线程定期 根据当前线程数组和任务数组情况,决定是否需要扩容或销毁线程。
条件变量与互斥锁 线程池的任务分配也可看作生产者消费者模型 ,任务队列中的元素是产品 ,线程池中的每一个线程都是消费者 ,向线程池中添加任务的是生产者 (一般为主线程)。也就是说,线程池可以看作单生产者、多消费者模型 。
一个任务只需要也只能分配给一个任务线程处理!因此,需要配合条件变量和互斥锁来实现任务分配。
消费者 :没有任务时,任务线程都阻塞在条件变量queue_not_empty
上,等待新任务到来。
生产者 :任务队列满时,主线程pthreadpool_add
阻塞在条件变量queue_not_full
上,等待任务队列空间可用。
thread_counter
:记录忙状态线程个数的锁!
此处可以先回顾基于条件变量实现的生产者消费者模型!此处的任务线程处理逻辑与消费者一致!
threadpool_create 函数 根据输入的参数创建一个线程池,并返回线程池的指针。
1 threadpool_t *threadpool_create (int min_thr_num, int max_thr_num, int queue_max_size) ;
min_thr_num
:线程池中最少含有线程数
max_thr_num
:线程池中最多最多线程数
queue_max_size
:线程池中任务队列的最大容量
返回值:线程池指针,如果创建失败,返回NULL
threadpool_add 函数 添加一个任务到线程池的任务队列中,如果线程池任务队列已满,则阻塞等待。
1 int threadpool_add (threadpool_t *pool, void *(*function)(void *arg), void *arg) ;
pool
:线程池指针
function
:任务函数
arg
:任务函数参数
返回值:0,添加成功;不会失败,只会阻塞等待!
threadpool_destroy 函数 销毁管理线程、通知所有空闲线程结束、等待忙线程结束,释放线程池所占空间。
1 int threadpool_destroy (threadpool_t *pool) ;
pool
:线程池指针
返回值:0,销毁成功;pool 为NULL 时,返回-1。
实现代码 下面的简单实现可以用来理解线程池的原理,但性能并不理想!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <assert.h> #include <stdio.h> #include <string.h> #include <signal.h> #include <stdbool.h> #include <errno.h> #include "threadpool.h" #define DEFAULT_TIME 10 #define MIN_WAIT_TASK_NUM 10 #define DEFAULT_THREAD_VARY 10 typedef struct { void *(*function)(void *); void *arg; } threadpool_task_t ; struct threadpool_t { pthread_mutex_t lock; pthread_mutex_t thread_counter; pthread_cond_t queue_not_full; pthread_cond_t queue_not_empty; pthread_t *threads; pthread_t adjust_tid; threadpool_task_t *task_queue; int min_thr_num; int max_thr_num; int live_thr_num; int busy_thr_num; int wait_exit_thr_num; int queue_front; int queue_rear; int queue_size; int queue_max_size; bool shutdown; }; void *threadpool_thread (void *threadpool) ;void *adjust_thread (void *threadpool) ;int is_thread_alive (pthread_t tid) ;int threadpool_free (threadpool_t *pool) ;threadpool_t *threadpool_create (int min_thr_num, int max_thr_num, int queue_max_size) { int i; threadpool_t *pool = NULL ; do { if ((pool = (threadpool_t *)malloc (sizeof (threadpool_t ))) == NULL ) { printf ("malloc threadpool failed!\n" ); break ; } pool->min_thr_num = min_thr_num; pool->max_thr_num = max_thr_num; pool->busy_thr_num = 0 ; pool->live_thr_num = min_thr_num; pool->wait_exit_thr_num = 0 ; pool->queue_size = 0 ; pool->queue_max_size = queue_max_size; pool->queue_front = 0 ; pool->queue_rear = 0 ; pool->shutdown = false ; pool->threads = (pthread_t *)malloc (sizeof (pthread_t )*max_thr_num); if (pool->threads == NULL ) { printf ("malloc threads failed!\n" ); break ; } memset (pool->threads, 0 , sizeof (pthread_t )*max_thr_num); pool->task_queue = (threadpool_task_t *)malloc (sizeof (threadpool_task_t )*queue_max_size); if (pool->task_queue == NULL ) { printf ("malloc task failed\n" ); break ; } if (pthread_mutex_init(&(pool->lock), NULL ) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL ) != 0 || pthread_cond_init(&(pool->queue_not_empty), NULL ) != 0 || pthread_cond_init(&(pool->queue_not_full), NULL ) != 0 ) { printf ("init lock or cond failed!\n" ); break ; } for (i = 0 ; i < min_thr_num; i++) { pthread_create(&pool->threads[i], NULL , threadpool_thread, (void *)pool); printf ("start thread 0x%x ... \n" , (unsigned int )pool->threads[i]); } pthread_create(&(pool->adjust_tid), NULL , adjust_thread, (void *)pool); return pool; } while (0 ); threadpool_free(pool); return NULL ; } int threadpool_add (threadpool_t *pool, void *(*function)(void *arg), void *arg) { pthread_mutex_lock(&(pool->lock)); while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) { pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); } if (pool->shutdown) { pthread_cond_broadcast(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0 ; } if (pool->task_queue[pool->queue_rear].arg != NULL ) { pool->task_queue[pool->queue_rear].arg = NULL ; } pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1 ) % pool->queue_max_size; pool->queue_size++; pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0 ; } void *threadpool_thread (void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; while (true ) { pthread_mutex_lock(&(pool->lock)); while ((pool->queue_size == 0 ) && (!pool->shutdown)) { printf ("thread 0x%x is waiting\n" , (unsigned int )pthread_self()); pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); if (pool->wait_exit_thr_num > 0 ) { pool->wait_exit_thr_num--; if (pool->live_thr_num > pool->min_thr_num) { printf ("thread 0x%x is exiting\n" , (unsigned int )pthread_self()); pool->live_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL ); } } } if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); printf ("thread 0x%x is exiting\n" , (unsigned int )pthread_self()); pthread_detach(pthread_self()); pthread_exit(NULL ); } task.function = pool->task_queue[pool->queue_front].function; task.arg = pool->task_queue[pool->queue_front].arg; pool->queue_front = (pool->queue_front + 1 ) % pool->queue_max_size; pool->queue_size--; pthread_mutex_unlock(&(pool->lock)); pthread_cond_broadcast(&(pool->queue_not_full)); printf ("thread 0x%x start working\n" , (unsigned int )pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num++; pthread_mutex_unlock(&(pool->thread_counter)); (*(task.function))(task.arg); printf ("thread 0x%x end working\n" , (unsigned int )pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num--; pthread_mutex_unlock(&(pool->thread_counter)); } pthread_exit(NULL ); } void *adjust_thread (void *threadpool) { int i; threadpool_t *pool = (threadpool_t *)threadpool; while (!pool->shutdown) { sleep(DEFAULT_TIME); pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; int live_thr_num = pool->live_thr_num; pthread_mutex_unlock(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) { pthread_mutex_lock(&(pool->lock)); int add = 0 ; for (i = 0 ; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++) { if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) { pthread_create(&(pool->threads[i]), NULL , threadpool_thread, (void *)pool); add++; pool->live_thr_num++; } } pthread_mutex_unlock(&(pool->lock)); } if ((busy_thr_num * 2 ) < live_thr_num && live_thr_num > pool->min_thr_num) { pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; pthread_mutex_unlock(&(pool->lock)); for (i = 0 ; i < DEFAULT_THREAD_VARY; i++) { pthread_cond_signal(&(pool->queue_not_empty)); } } } return NULL ; } int threadpool_destroy (threadpool_t *pool) { int i; if (pool == NULL ) { return -1 ; } pool->shutdown = true ; pthread_join(pool->adjust_tid, NULL ); for (i = 0 ; i < pool->live_thr_num; i++) { pthread_cond_broadcast(&(pool->queue_not_empty)); } for (i = 0 ; i < pool->live_thr_num; i++) { pthread_join(pool->threads[i], NULL ); } threadpool_free(pool); return 0 ; } int threadpool_free (threadpool_t *pool) { if (pool == NULL ) { return -1 ; } if (pool->task_queue) { free (pool->task_queue); } if (pool->threads) { free (pool->threads); pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free (pool); pool = NULL ; return 0 ; } int threadpool_all_threadnum (threadpool_t *pool) { int all_threadnum = -1 ; pthread_mutex_lock(&(pool->lock)); all_threadnum = pool->live_thr_num; pthread_mutex_unlock(&(pool->lock)); return all_threadnum; } int threadpool_busy_threadnum (threadpool_t *pool) { int busy_threadnum = -1 ; pthread_mutex_lock(&(pool->thread_counter)); busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); return busy_threadnum; } int is_thread_alive (pthread_t tid) { int kill_rc = pthread_kill(tid, 0 ); if (kill_rc == ESRCH) { return false ; } return true ; }
相关资料