简单实用可线上应用的线程池组件,简单实用线上


0 前言

线程池的组件网上很多,之前我自己也尝试写个一个demo,但这些组件一般都比较简单,没有完整的实现后台线程池组件应用的功能。因此,这里我们实现一个可以用在线上环境的线程池组件,该线程池组件具备线程池应用的特性,如下所示:

1. 伸缩性:即线程池中线程的个数应该是动态变化的。繁忙的时候可以申请更多的线程;空闲的时候则注销一部分线程。

2. 线程状态:线程池中对线程的管理引入睡眠、唤醒机制。当线程没有任务在运行时,使线程处于睡眠状态。

3. 线程管理:对线程池中线程的申请和注销,不是通过创建一个单独线程来管理,而是线程池自动管理。

最终,我们实现的线程池局部一下几个特点:

1. 线程池中线程的个数介于min和max之间;

2. 线程池中线程具有睡眠和唤醒机制;

3. 当线程池中的线程睡眠时间超过1秒钟,则结束一个线程;当线程池中持续1秒没有空闲线程时,则创建一个新线程。

1 关键数据结构

主要的数据结构包括两个CThreadPool和CThreadWorker。

这里,我们先给出数据结构的定义,然后简要描述他们的作用。

相关数据结构定义如下:

 1 class CThreadWorker
 2 {
 3     CThreadWorker *next_;    // 线程池中的线程是单链表结构保存
 4     
 5     int wake_up_;            // 唤醒标志
 6     pthread_cond_t wake_;    
 7     pthread_mutex_t mutex_;
 8     
 9     pthread_t tid_;
10     void (*func)(void *arg); // 函数执行体
11     void *arg;               // 函数执行体参数
12     time_t sleep_when_;      // 线程睡眠时间
13 };
14 
15 class CThreadPool
16 {
17     CThreadWorker *head_;
18     
19     unsigned int min_;     // 最少线程数
20     unsigned int cur_;     // 当前线程数
21     unsigned int max_;     // 最大线程数
22     
23     time_t last_empty_;    // 最后一次线程池中没有线程的时间
24     pthread_mutex_t mutex_;
25 };

其中,CThreadPool用来管理线程池,记录线程池当前线程个数、最小线程个数、最大线程个数等。

CThreadWorker是具体保存线程记录的实体,它里面保存了线程的执行函数体,函数参数、睡眠时间等等。还有一个信号量,用来让线程睡眠或者唤醒。

当我们创建一个线程池时,即默认创建了min个CThreadWorker,每个线程实体默认都是睡眠的,阻塞在pthread_cond_wait处,然后我们具体执行用户函数时,从线程池中获取线程,更改该线程的func,arg等参数,然后唤醒该线程。

中间,线程池会对线程做一些管理,保证线程池的伸缩性。

2 源码

2.1 头文件

 1 #ifndef _THREAD_POOL_H_
 2 #define _THREAD_POOL_H_
 3 
 4 #include <pthread.h>
 5 #include <time.h>
 6 
 7 struct CThreadWorker
 8 {
 9     CThreadWorker *next_;    // 线程池中的线程是单链表结构保存
10     
11     int wake_up_;            // 唤醒标志
12     pthread_cond_t wake_;    
13     pthread_mutex_t mutex_;
14     
15     pthread_t tid_;
16     void (*func)(void *arg); // 函数执行体
17     void *arg;               // 函数执行体参数
18     time_t sleep_when_;      // 线程睡眠时间
19 };
20 
21 struct CThreadPool
22 {
23     CThreadWorker *head_;
24     
25     unsigned int min_;     // 最少线程数
26     unsigned int cur_;     // 当前线程数
27     unsigned int max_;     // 最大线程数
28     
29     time_t last_empty_;    // 最后一次线程池中没有线程的时间
30     pthread_mutex_t mutex_;
31 };
32 
33 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max);
34 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg);
35 #endif

2.2 实现文件

实现文件主要完成.h中的CreateThreadPool和StartWork。我们先给个流程图大概描述一下。

2.2.1 CreateThreadPool流程描述

如下图所示为创建线程池的流程,其中最关键的步骤是DoProcess,当执行到DoProcess时,我们默认创建了min个线程,所有线程都因为信号而睡眠。StartWork里面会获取一个线程,然后修改该线程的func,arg参数,最后唤醒线程执行我们的任务。当一个线程执行完的时候,我们需要判断线程池当前的状态,是需要新创建一个线程,还是把该线程重新加入到线程池,还是注销该线程。具体的这些逻辑图示不好描述,我们在下面代码里给出注释。

2.2.2 StartWork流程描述

如下图是启动一个任务的流程,2.2.1我们已经描述到,启动任务时,我们会从线程池中拿出一个线程,修改该线程的func/arg属性,然后唤醒该线程。当线程执行完以后,是需要重新加入线程池,还是注销,则是在2.2.1的DoProcess中处理。

2.3 实现文件

  1 #include <stdlib.h>
  2 #include "pthread_pool.h"
  3 
  4 // 线程池持续1秒没有空闲线程
  5 #define WaitWorkerTimeout(pool)     ((time(NULL) - pool->last_empty_) > 1)
  6 // 线程池中没有线程,所有的线程已经pop出去执行具体的任务去了
  7 #define NoThreadInPool(pool)        (pool->head_ == NULL)
  8 #define CanCreateThread(pool)       (pool->cur_ < pool->max_)
  9 
 10 
 11 static int CreateOneThread(CThreadPool *pool);
 12 static void *DoProcess(void *arg);
 13 static void PushWork(CThreadPool *pool, CThreadWorker *worker);
 14 static void PopWork(CThreadPool *pool, CThreadWorker *worker);
 15 static void InitWorker(CThreadWorker *worker);
 16 static int WorkerIdleTimeout(CThreadPool *pool);
 17 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg);
 18 static void WakeupWorkerThread(CThreadWorker *worker);
 19 
 20 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg);
 21 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max);
 22 
 23 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max)
 24 {
 25     CThreadPool *poo;
 26     
 27     pool = (CThreadPool *)malloc(sizeof(CThreadPool));
 28     
 29     if (pool == NULL) {
 30         return NULL;
 31     }
 32     
 33     pool->head_ = NULL;
 34     pool->min_  = min;
 35     pool->cur_  = 0;
 36     pool->max_  = max;
 37     pool->last_empty_ = time(NULL);
 38     pthread_mutex_init(&pool->mutex_, NULL);
 39     
 40     int ret = 0;
 41     while (min--) {
 42         ret = CreateOneThread(pool);
 43         if (ret != 0) {
 44             exit(0);
 45         }
 46     }
 47     return pool;
 48 }
 49 
 50 static int CreateOneThread(CThreadPool *pool)
 51 {
 52     pthread_t tid;
 53     return pthread_create(&tid, NULL, DoProcess, pool);
 54 }
 55 
 56 static void *DoProcess(void *arg)
 57 {
 58     CThreadPool *pool = (CThreadPool *)arg;
 59     
 60     CThreadWorker worker;
 61     
 62     InitWorker(&worker);
 63     
 64     pthread_mutex_lock(&pool->mutex_);
 65     pool->cur_ += 1;
 66     
 67     for (;;) {
 68         PushWork(pool, &worker);
 69         worker.sleep_when_ = time(NULL);
 70         pthread_mutex_unlock(&pool->mutex_);
 71         
 72         pthread_mutex_lock(&worker.mutex_);
 73         while (worker.wake_up_ != 1) {
 74             pthread_cond_wait(&worker.wake_, &worker.mutex_);
 75         }
 76         // worker线程已被唤醒,准备开始执行任务,修改wake_up_标志。
 77         worker.wake_up_ = 0;
 78         pthread_mutex_unlock(&worker.mutex_);
 79         
 80         // 执行我们的任务,执行完毕之后,修改worker.func为NULL。
 81         worker.func(arg);
 82         worker.func = NULL;
 83         
 84         // 任务执行完以后,线程池需要根据当前的线程池状态来判断是要把该线程重新加入线程池,还是要创建一个新的线程。
 85         pthread_mutex_lock(&pool->mutex_);
 86         if (WaitWorkerTimeout(pool) && NoThreadInPool(pool) && CanCreateThread(pool)) {
 87             // 在我们执行这个任务的时候,其他任务等待空闲线程的时间超过了1秒,而且线程池中没有线程,且线程池当前线程数没有超过最大允许创建线程数
 88             CreateOneThread(pool);
 89         }
 90         
 91         // 线程池中没有线程了,重新把该线程加入线程池
 92         if (NoThreadInPool(pool)) {
 93             continue;
 94         }
 95         
 96         // 线程池中线程数低于最低阈值,重新把该线程加入线程池
 97         if (pool->curr <= pool->min) {
 98             continue;
 99         }
100         
101         // 线程中睡眠的线程时间超过了1秒,说明线程池不是很繁忙,不需要把该线程重新加回线程池
102         if (WorkerIdleTimeout(pool)) {
103             break;
104         }
105     }
106     
107     pool->cur -= 1;
108     pthread_mutex_unlock(&pool->mutex);
109     
110     pthread_cond_destroy(&worker.wake_);
111     pthread_mutex_destroy(&worker.mutex_);
112 
113     pthread_exit(NULL);
114 }
115 
116 static void InitWorker(CThreadWorker *worker)
117 {
118     worker->next_ = NULL;
119     worker->wake_up_ = 0;
120     pthread_cond_init(&worker->wake_, NULL);
121     pthread_mutex_init(&worker->mutex_, NULL);
122     worker->tid_ = pthread_self();
123     worker->func = NULL;
124     worker->arg = NULL;
125     worker->sleep_when_ = 0;
126 }
127 
128 static void PushWork(CThreadPool *pool, CThreadWorker *worker)
129 {
130     worker->next_ = pool->head_;
131     pool->next_ = worker;
132 }
133 
134 static int WorkerIdleTimeout(CThreadPool *pool)
135 {
136     CThreadWorker *worker;
137     
138     if (NoThreadInPool(pool)) {
139         return 0;
140     }
141     worker = pool->head_;
142     return (time(NULL) > worker->sleep_when_ + 1)? 1 : 0;
143 }
144 
145 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg)
146 {
147     if (func == NULL) {
148         return -1;
149     }
150     
151     CThreadWorker *worker;
152     pthread_mutex_lock(&pool->mutex_);
153     worker = GetWorker(pool, func, arg);
154     pthread_mutex_unlock(&pool->mutex_);
155     
156     if (worker == NULL) {
157         return -2;
158     }
159     
160     WakeupWorkerThread(worker);
161     return 0;
162 }
163 
164 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg)
165 {
166     CThreadWorker *worker;
167     
168     if (NoThreadInPool(pool)) {
169         return NULL;
170     }
171     
172     worker = pool->head_;
173     PopWork(pool, worker);
174     
175     if (NoThreadInPool(pool)) {
176         pool->last_empty_ = time(NULL);
177     }
178 
179     worker->func = func;
180     worker->arg = arg;
181 
182     return worker;
183 }
184 
185 static void PopWork(CThreadPool *pool, CThreadWorker *worker)
186 {
187     pool->head_ = worker->next_;
188     worker->next_ = NULL;
189 }
190 
191 static void WakeupWorkerThread(CThreadWorker *worker)
192 {
193     pthread_mutex_lock(&worker->mutex_);
194     worker->wake_up_ = 1;
195     pthread_mutex_unlock(&worker->mutex_);
196 
197     pthread_cond_signal(&worker->wake_);
198 }

3 总结

该线程池模型实现了我们线上环境实际的一些应用场景,没有考虑的问题有这么几点:

1. 没有考虑任务类的概念,写一个任务基类,然后具体的任务类继承这个基类,实现自己的功能,具体执行时候,只需要add_task,把任务加进线程池即可,这样做的目的是想着可以给每个任务发信号,是否要终止任务的执行。当前的线程池做不到这点,也做不到判断任务执行时间,是否超时。

2. 每个线程只执行一个任务,而不是搞一个任务队列,去执行任务队列里的任务。关于这点,我也不清楚,搞任务队列的优势是什么。

 

个人能想到的就是这些,不知道诸位平时工作中,具体应用线程池的时候需要考虑一些什么场景,我这里想到的就是1、可以灵活的终止任务;2、可以判断任务是否超时;3、可以对需要执行的任务做到负载均衡(觉得这一点,在这里的线程池里不是问题,极端的情况是来了一堆任务,线程池中线程不够了,那确实会有这个问题)。

关于这几个问题,第3个我个人能解决,可以把上面的线程池稍微修改一下,每个线程结果ThreadWorker里面再加个任务队列,当没有任务时候线程阻塞,否则就取出任务来执行。

关于第一点,怎么修改这个框架,达到可以给任务发信号呢。

还有第二点,判断任务是否超时呢?

欢迎大家一起讨论!


需要一个windows下C++的线程池代码,有测试代码更好

计算机语言的种类非常的多,总的来说可以分成机器语言,汇编语言,高级语言三大类。

电脑每做的一次动作,一个步骤,都是按照以经用计算机语言编好的程序来执行的,程序是计算机要执行的指令的集合,而程序全部都是用我们所掌握的语言来编写的。所以人们要控制计算机一定要通过计算机语言向计算机发出命令。

计算机所能识别的语言只有机器语言,即由0和1构成的代码。但通常人们编程时,不采用机器语言,因为它非常难于记忆和识别。

目前通用的编程语言有两种形式:汇编语言和高级语言。

汇编语言的实质和机器语言是相同的,都是直接对硬件操作,只不过指令采用了英文缩写的标识符,更容易识别和记忆。它同样需要编程者将每一步具体的操作用命令的形式写出来。汇编程序通常由三部分组成:指令、伪指令和宏指令。汇编程序的每一句指令只能对应实际操作过程中的一个很细微的动作,例如移动、自增,因此汇编源程序一般比较冗长、复杂、容易出错,而且使用汇编语言编程需要有更多的计算机专业知识,但汇编语言的优点也是显而易见的,用汇编语言所能完成的操作不是一般高级语言所能实现的,而且源程序经汇编生成的可执行文件不仅比较小,而且执行速度很快。

高级语言是目前绝大多数编程者的选择。和汇编语言相比,它不但将许多相关的机器指令合成为单条指令,并且去掉了与具体操作有关但与完成工作无关的细节,例如使用堆栈、寄存器等,这样就大大简化了程序中的指令。同时,由于省略了很多细节,编程者也就不需要有太多的专业知识。

高级语言主要是相对于汇编语言而言,它并不是特指某一种具体的语言,而是包括了很多编程语言,如目前流行的VB、VC、FoxPro、Delphi等,这些语言的语法、命令格式都各不相同。

高级语言所编制的程序不能直接被计算机识别,必须经过转换才能被执行,按转换方式可将它们分为两类:

解释类:执行方式类似于我们日常生活中的“同声翻译”,应用程序源代码一边由相应语言的解释器“翻译”成目标代码(机器语言),一边执行,因此效率比较低,而且不能生成可独立执行的可执行文件,应用程序不能脱离其解释器,但这种方式比较灵活,可以动态地调整、修改应用程序。

编译类:编译是指在应用源程序执行之前,就将程序源代码“翻译”成目标代码(机器语言),因此其目标程序可以脱离其语言环境独立执行,使用比较方便、效率较高。但应用程序一旦需要修改,必须先修改源代码,再重新编译生成新的目标文件(* .OBJ)才能执行,只有目标文件而没有源代码,修改很不方便。现在大多数的编程语言都是编译型的,例如Visual C++、Visual Foxpro、Delphi等。

[NextPage]

学习编程,从何入手

如果您想学习编程,却又不知从何入手,那么您不妨看看下面的几种学习方案,可能会给您一些启示吧!
==============================================
方案一 Basic语言 & Visual Basic

优点
(1)Basic 简单易学,很容易上手。
(2)Visual Basic 提供了强大的可视化编程能力,可以让你轻松地做出漂亮的程序。
(3)众多的控件让编程变得象垒积木一样简单。
(4)Visual Basic 的全部汉化让我们这些见了English就头大的人喜不自禁。

缺点
(1)Visual Basic 不是真正的面向对象的开发文具。
(2)Visual Basic 的数据类型太少,而且不支持指针,这使得它的表达能力很有限。
(3)Visual Basic 不......余下全文>>
 

易语言怎运用多线程?

在易语言自带多线程的使用中,会发现一些不稳定的现象,如:程序运行不稳定、退出程序时报错、有死机现象、多线程中不能直接销毁组件等。例如运行以下代码,程序会自动退出。.子程序 _按钮1_被单击
启动线程 (&多线程子程序, ).子程序 多线程子程序
按钮1.销毁 () ' 直接用就是不行哦!  现在可以改变一下调用多线程的方式,用标签的反馈事件执行多线程子程序,绝大部分不稳定问题都可以解决了。
  具体执行代码要写在标签反馈事件中,多线程中再用标签的“调用反馈事件”去调用。易语言对这个标签组件的反馈事件做过特殊处理。
  上述代码可写为以下,运行就可以通过了:.子程序 _按钮1_被单击
启动线程 (&多线程子程序, ).子程序 多线程子程序
标签1.调用反馈事件 (0, 0, 假).子程序 _标签1_反馈事件, 整数型
.参数 参数一, 整数型
.参数 参数二, 整数型
按钮1.销毁 ()注意:以下调用是错误的!
启动线程 (&_标签1_反馈事件, )

.子程序 多线程子程序
_标签1_反馈事件()
 

相关内容

    暂无相关文章