简单易用的线程池实现,简单易用线程池


0 前言

最近在写MySQL冷备server的一个模块,稍微接触到一点线程池的东西,自己也就想尝试写一个简单的线程池练练手。

这个线程池在创建时,即按照最大的线程数生成线程。

然后作业任务通过add_task接口往线程池中加入需要运行的任务,再调用线程池的run函数开始运行所有任务,每个线程从任务队列中读取任务,处理完一个任务后再读取新的任务,直到最终任务队列为空。

 

补充:再回头看这个设计,其实算不了线程池,最多算是多线程执行任务。

把所有的任务描述信息先加入到CThreadPool的任务队列里面,然后调用CThreadPool的run函数去创建指定数量的线程,每个线程互斥的从任务队列里面取出任务去执行。

1 线程池设计

简单描述如下(假设任务类名为CTasklet):

1、CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);

2、创建任务,并把任务加入到线程池

    CTasklet *pTask1 = new CTasklet();

    CTasklet *pTask2 = new CTasklet();

    ...

    thread_pool.add_task(pTask1);

    thread_pool.add_task(pTask2);

    ...

3、调用线程池的run方法开始执行任务

    thread_pool.run();

4、等待任务执行完成

    thread_pool.join_thread();

2 源码

下面给出完整的线程池代码

1 /* 2 * file: thread_pool.h 3 * desc: 简单的线程池,一次性初始化任务队列和线程池。 4 * 5 */ 6 7 #ifndef _THREAD_POOL_H_ 8 #define _THREAD_POOL_H_ 9 10 #include <pthread.h> 11 #include <vector> 12 13 using namespace std; 14 15 template<typename workType> 16 class CThreadPool 17 { 18 public: 19 typedef void * (thread_func)(void *); 20 21 CThreadPool(int thread_num, size_t stack_size = 10485760); 22 ~CThreadPool(); 23 24 // 向任务队列中添加任务 25 int add_task(workType *pTask); 26 27 // 创建新线程并执行 28 int run(); 29 30 // 等待所有的线程执行结束 31 int join_thread(); 32 33 private: 34 int init_thread_attr(); 35 int destroy_thread_attr(); 36 37 int set_thread_stacksize(size_t stack_size); 38 int set_thread_joinable(); 39 40 protected: 41 // 线程池执行函数,必须为static 42 static void start_routine(void *para); 43 44 private: 45 pthread_attr_t attr_; 46 static pthread_mutex_t mutex_lock_; 47 static list<workType *> list_task_; 48 49 int thread_num_; // 最大线程数 50 vector<pthread_t> thread_id_vec_; 51 }; 52 #endif View Code

 

1 #include "pthread_pool.h" 2 3 template<typename workType> 4 pthread_mutex_t CThreadPool<workType>::mutex_lock_; 5 6 template<typename workType> 7 list<workType*> CThreadPool<workType*>::list_task_; 8 9 template<typename workType> 10 CThreadPool<workType>::CThreadPool(int thread_num, size_t stack_size) 11 { 12 thread_num_ = thread_num; 13 pthread_mutex_init(&mutex_lock_, NULL); 14 15 init_thread_attr(); 16 set_thread_stacksize(stack_size); 17 set_thread_joinable(); 18 } 19 20 template<typename workType> 21 CThreadPool<workType>::~CthreadPool() 22 { 23 destroy_thread_attr(); 24 } 25 26 template <typename workType> 27 int init_thread_attr() 28 { 29 return pthread_attr_init(&m_attr); 30 } 31 32 template <typename workType> 33 int CThreadPool<workType>::destroy_thread_attr() 34 { 35 return pthread_attr_destroy(&attr_); 36 } 37 38 template <typename workType> 39 int CThreadPool<workType>::set_thread_stacksize(size_t stack_size) 40 { 41 return pthread_attr_setstacksize(&attr_, stack_size); 42 } 43 44 template <typename workType> 45 int CThreadPool<workType>::set_thread_joinable() 46 { 47 return pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE); 48 } 49 50 template <typename workType> 51 void CThreadPool<workType>::start_routine(void *para) 52 { 53 workType *pWorkType = NULL; 54 55 while (1) { 56 pthread_mutex_lock(&mutex_lock_); 57 58 if (list_task_.empty()) { 59 pthread_mutex_unlock(mutex_lock_); 60 return; 61 } 62 63 pWorkType = *(list_task_.begin()); 64 list_task_.pop_front(); 65 pthread_mutex_unlock(&mutex_lock_); 66 67 pWorkType->run(); 68 delete pWorkType; 69 pWorkType = NULL; 70 } 71 } 72 73 template <typename workType> 74 int CThreadPool<workType>::add_task(workType *pTask) 75 { 76 pthread_mutex_lock(&mutex_lock_); 77 list_task_.push_back(pTask); 78 pthread_mutex_unlock(&mutex_lock_); 79 return 0; 80 } 81 82 template <typename workType> 83 int CThreadPool<workType>::run() 84 { 85 int rc; 86 pthread_t tid; 87 for (int i = 0; i < thread_num_; ++i) { 88 rc = pthread_create(&tid, &attr_, (thread_func)start_routine, NULL); 89 thread_id_vec_.push_back(tid); 90 } 91 return rc; 92 } 93 94 template <typename workType> 95 int CThreadPool<workType>::join_thread() 96 { 97 int rc = 0; 98 vector<pthread_t>::iterator iter; 99 for (iter = thread_id_vec_.begin(); iter != thread_id_vec_.end(); ++iter) { 100 rc = pthread_join((*iter), NULL); 101 } 102 thread_id_vec_.clear(); 103 return rc; 104 } View Code

 

测试代码如下

1 #include <unistd.h> 2 3 #include <iostream> 4 #include <list> 5 6 using namespace std; 7 8 class CTasklet 9 { 10 public: 11 CTasklet(int num) { 12 num_ = num; 13 cout << "CTasklet ctor create num: " << num_ << endl; 14 } 15 16 ~CTasklet() { 17 cout << "CTasklet dtor delete num: " << num_ << endl; 18 } 19 20 int run() { 21 cout << "CTasklet sleep begin: " << num_ << endl; 22 sleep(num_); 23 cout << "CTasklet sleep end: " << num_ << endl; 24 } 25 26 private: 27 int num_; 28 }; 29 30 #define MAX_THREAD_NUM 3 31 int main(int argc, char **argv) 32 { 33 // Step1. 创建线程池 34 CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM); 35 36 // Step2. 创建任务,并加入到线程池中 37 for (int i = 0; i < 6; ++i) { 38 CTasklet *pTask = new CTasklet(i); 39 thread_pool.add_task(pTask); 40 } 41 // Step3. 开始执行任务 42 thread_pool.run(); 43 // Step4. 等待任务结束 44 thread_pool.join_thread(); 45 46 return 0; 47 48 } View Code

3 总结

上面的线程池属于最简单的一类线程池,即相当于程序运行时候就开启n个线程来执行任务。真正的线程池需要考虑的方面比较多,比如1、线程池中的线程数应该能动态变化;2、线程池能动态调度线程来运行任务,以达到均衡;3、线程池还应该能记录任务的运行时间,防止超时等等。

不过,起码我们已经开了个头,实现了一个简单的线程池,接下来,让我们在这个基础上一步步调整、完善。

PS:对于线程池的考虑,我能想到的有动态增减线程数、超时机制、负载均衡。不知道大家理解线程池还需要考虑什么场景。


易语言想帮我写一个线程池与多线程操作的例子实现该网站进号功可以如图


 

java 怎实现线程池

最简单的可以利用java.util.concurrent.Executors
调用Executors.newCachedThreadPool()获取缓冲式线程池
Executors.newFixedThreadPool(int nThreads)获取固定大小的线程池
 

相关内容