前言
偶然发现github上有个ThreadPool项目(https://github.com/progschj/ThreadPool ),star数居然3k+,里面也就两个文件,一个ThreadPool.h,一个example.cpp。
看了一下,项目代码是cpp11写的。老实说,代码极其简洁又难懂。
下面是ThreadPool.h可以看看,有个直观印象。
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
class ThreadPool {
public:
ThreadPool(size_t);
template
auto enqueue(F&& f, Args&&... args)
-> std::future
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i workers.emplace_back( [this] { for(;;) { std::function { std::unique_lock this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future { using return_type = typename std::result_of auto task = std::make_shared< std::packaged_task std::bind(std::forward ); std::future { std::unique_lock // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); } #endif 而example.cpp是对线程池ThreadPool.h的调用。 #include #include #include #include "ThreadPool.h" int main() { ThreadPool pool(4); std::vector< std::future for(int i = 0; i < 8; ++i) { results.emplace_back( pool.enqueue([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; }) ); } for(auto && result: results) std::cout << result.get() << ' '; std::cout << std::endl; return 0; } 看了以上代码应该是要劝退不少人了,反正我的第一感觉是这样的。 ThreadPool分析 ThreadPool类中有: 5个成员变量 std::vector< std::thread > workers 用于存放线程的数组,用vector容器保存 std::queue< std::function std::mutex queue_mutex 一个访问任务队列的互斥锁,在插入任务或者线程取出任务都需要借助互斥锁进行安全访问 std::condition_variable condition 一个用于通知线程任务队列状态的条件变量,若有任务则通知线程可以执行,否则进入wait状态 bool stop 标识线程池的状态,用于构造与析构中对线程池状态的了解 3个成员函数 ThreadPool(size_t) 线程池的构造函数 auto enqueue(F&& f, Args&&... args) 将任务添加到线程池的任务队列中 ~ThreadPool() 线程池的析构函数 class ThreadPool { public: ThreadPool(size_t); template auto enqueue(F&& f, Args&&... args) -> std::future ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; 构造函数解析 inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i workers.emplace_back( [this] { for(;;) { std::function { std::unique_lock this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } 构造函数定义为inline。 接收参数threads表示线程池中要创建多少个线程。 初始化成员变量stop为false,即表示线程池启动着。 然后进入for循环,依次创建threads个线程,并放入线程数组workers中。 在vector中,emplace_back()成员函数的作用是在容器尾部插入一个对象,作用效果与push_back()一样,但是两者有略微差异,即emplace_back(args)中放入的对象的参数,而push_back(OBJ(args))中放入的是对象。即emplace_back()直接在容器中以传入的参数直接调用对象的构造函数构造新的对象,而push_back()中先调用对象的构造函数构造一个临时对象,再将临时对象拷贝到容器内存中。 我们知道,在C++11中,创建线程的方式为: std::thread t(fun); //fun为线程的执行函数 所以,上述workers.emplace_back()中,我们传入的lambda表达式就是创建线程的fun()函数。 下面来分析下该lambda表达式: [this]{ for(;;) { std::function { std::unique_lock this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } lambda表达式的格式为: [ 捕获 ] ( 形参 ) 说明符(可选) 异常说明 attr -> 返回类型 { 函数体 } 所以上述lambda表达式为 [ 捕获 ] { 函数体 } 类型。 该lambda表达式捕获线程池指针this用于在函数体中使用(调用线程池成员变量stop、tasks等) 分析函数体,for(;;)为一个死循环,表示每个线程都会反复这样执行,这其实每个线程池中的线程都会这样。 在循环中,,先创建一个封装void()函数的std::function对象task,用于接收后续从任务队列中弹出的真实任务。 在C++11中, std::unique_lock 可以在退出作用区域时自动解锁,无需显式解锁。所以,{}起的作用就是在退出 } 时自动回释放线程池的queue_mutex。 在{}中,我们先对任务队列加锁,然后根据条件变量判断条件是否满足。 void wait(unique_lock { while (!p()) wait(lock); } 为条件标量wait的运行机制, wait在p 为false的状态下,才会进入wait(lock)状态。当前线程阻塞直至条件变量被通知。 this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); }); 所以p表示上述代码中的lambda表达式[this]{ return this->stop || !this->tasks.empty(); },其中this->stop为false, !this->tasks.empty()也为false。即其表示若线程池已停止或者任务队列中不为空,则不会进入到wait状态。 由于刚开始创建线程池,线程池表示未停止,且任务队列为空,所以每个线程都会进入到wait状态。 (借用 https://blog.csdn.net/shichao1470/article/details/89856443 一张图便于说明wait的过程) 在线程池刚刚创建,所有的线程都阻塞在了此处,即wait处。 若后续条件变量来了通知,线程就会继续往下进行: if(this->stop && this->tasks.empty()) return; 若线程池已经停止且任务队列为空,则线程返回,没必要进行死循环。 task = std::move(this->tasks.front()); this->tasks.pop(); 这样,将任务队列中的第一个任务用task标记,然后将任务队列中该任务弹出。(此处线程实在获得了任务队列中的互斥锁的情况下进行的,从上图可以看出,在条件标量唤醒线程后,线程在wait周期内得到了任务队列的互斥锁才会继续往下执行。所以最终只会有一个线程拿到任务,不会发生惊群效应) 在退出了{ },我们队任务队列的所加的锁也释放了,然后我们的线程就可以执行我们拿到的任务task了,执行完毕之后,线程又进入了死循环。 至此,我们分析了ThreadPool的构造函数。 添加任务函数解析 template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future { using return_type = typename std::result_of auto task = std::make_shared< std::packaged_task std::bind(std::forward ); std::future { std::unique_lock // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } 添加任务的函数本来不难理解,但是作者增加了许多新的C++11特性,这样就变得难以理解了。 template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future equeue是一个模板函数,其类型形参为F与Args。其中class... Args表示多个类型形参。 auto用于自动推导出equeue的返回类型,函数的形参为(F&& f, Args&&... args),其中&&表示右值引用。表示接受一个F类型的f,与若干个Args类型的args。 -> std::future 表示返回类型,与lambda表达式中的表示方法一样。 返回的是什么类型呢? typename std::result_of std::future //std::future用来访问异步操作的结果 所以,最终返回的是放在std::future中的F(Args…)返回类型的异步执行结果。 举个简单的例子来理解吧: // 来自 packaged_task 的 future std::packaged_task std::future std::thread(std::move(task)).detach(); // 将task函数挂载在线程上运行 f1.wait(); //f1等待异步结果的输入 f1.get(); //f1获取到的异步结果 struct S { double operator()(char, int&); float operator()(int) { return 1.0;} }; std::result_of std::result_of 经过上述两个简单的小例子可以知道: -> std::future //等价于 //F(Args...) 为 int f(args) //std::result_of //std::future //return f1 //在后续我们根据f1.get就可以取出存放在里面的int值 //最终返回了一个F(Args...)类型的值,而这个值是存储在std::future中,因为线程是异步处理的 接着分析: using return_type = typename std::result_of 表示使用return_type表示F(Args...)的返回类型。 auto task = std::make_shared< std::packaged_task std::bind(std::forward ); 由上述小例子,我们已经知道std::packaged_task是一个包装函数,所以 auto sp = std::make_shared std::packaged_task auto task = std::make_shared< std::packaged_task //即 std::packaged_task //然后task指向了t1,即task指向了返回值为return_type的f(args) std::packaged_task 所以最终,task指向了传递进来的函数。 std::future //res中保存了类型为return_type的变量,有task异步执行完毕才可以将值保存进去 所以,res会在异步执行完毕后即可获得所求。 { std::unique_lock // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); //(*task)() ---> f(args) } 在新的作用于内加锁,若线程池已经停止,则抛出异常。 否则,将task所指向的f(args)插入到tasks任务队列中。需要指出,这儿的emplace中传递的是构造函数的参数。 condition.notify_one(); //任务加入任务队列后,需要去唤醒一个线程 return res; //待线程执行完毕,将异步执行的结果返回 经过上述分析,这样将每个人物插入到任务队列中的过程就完成了。 析构函数解析 inline ThreadPool::~ThreadPool() { { std::unique_lock stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); } 在析构函数中,先对任务队列中加锁,将停止标记设置为true,这样后续即使有新的插入任务操作也会执行失败。 使用条件变量唤醒所有线程,所有线程都会往下执行: if(this->stop && this->tasks.empty()) return; 在stop设置为true且任务队列中为空时,对应的线程进而跳出循环结束。 for(std::thread &worker: workers) worker.join(); 将每个线程设置为join,等到每个线程结束完毕后,主线程再退出。 主函数解析 ThreadPool pool(4); //创建一个线程池,池中线程为4 std::vector< std::future for(int i = 0; i < 8; ++i) { //创建8个任务 results.emplace_back( //一次保存每个异步结果 pool.enqueue([i] { //将每个任务插入到任务队列中,每个任务的功能均为“打印+睡眠1s+打印+返回结果” std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; }) ); } for(auto && result: results) //一次取出保存在results中的异步结果 std::cout << result.get() << ' '; std::cout << std::endl; 需要对主函数中的任务函数进行说明: [i] { //将每个任务插入到任务队列中,每个任务的功能均为“打印+睡眠1s+打印+返回结果” std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; } 这个lambda表达式用来表示一个匿名函数,该函数分写执行 打印-睡眠-打印-返回结果。 pool.enqueue(fun); 对应于类中的 auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future 其中,F&& f 是lambda表达式(或者说fun)的形参,而参数为0。 而 std::future 则用来保存 i*i 。 对应的 std::result_of 上述是简要的分析。::type d = 3.14; // d 拥有 double 类型,等价于double d = 3.14::type x = 3.14; // x 拥有 float 类型,等价于float x = 3.14