在多线程中一个线程的执行可能需要依赖另一个线程任务的完成,就像大家都熟知的生产者消费者模型一样,消费者消费的前提是生产者生产出来了产品。所以多线程需要某种机制来同步线程之间的操作,让他们更好地合作来完成任务。
等待事件或条件
因为线程同步是很常规的操作,所以C++里提供了条件变量来实现线程间的同步,尽可能的提高操作效率。C++的头文件里提供了std::condition_variable和std::condition_varable_any来实现线程同步,但是他们都必须和mutex一起工作。condition_variable只能和C++提供的标准std::mutex一起使用,而condition_variable_any则可以和自定义的mutex一起工作,除非需要额外的灵活性,首选应该是condition_variable,因为他的开销相比后者更小。下面是一个使用条件变量来等待数据队列数据到来并进行处理的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| using namespace std; mutex mut; queue<Data> data_queue; condition_variable data_cond; void data_preparation_thread() { while(more_data_to_prepare()) { Data const data = prepare_data(); lock_guard<mutex> lk(mut); data_queue.push(data); data_cond.notify_one(); } } void data_process_thread() { while(true) { unique_lock<mutex>lk(mut); data_cond.wait(lk,[]{return !data_queue.empty();}); Data data = data_queue.front(); data_queue.pop(); lk.unlock(); process(data); if(is_last_data(data)) break; } }
|
线程安全队列
以queue实现线程安全队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| #include<memory> template<typename T> class threadsafe_queue { public: threadsafe_queue(){} threadsafe_queue(const threadsafe_queue& other) { lock_guard<mutex> lk(other.mut); data_queue = other.data_queue; } threadsafe_queue& operator=(const threadsafe_queue&)=delete; void push(T new_value) { lock_guard <mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } bool try_pop(T& value); { lock_guard<mutex> lk(mut); if(data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { lock_guard<mutex> lk(mut); if(data_queue.empty()) return shared_ptr<T>(); shared_ptr<T> res(make_shared<T>(data_queue.front())); data_queue.pop(); return res; } void wait_and_pop(T& value) { unique_lock<mutex>lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { unique_lock<mutex> lk(mut); data_cond.wait(lk,[this]{return !data_queue.empty();}); shared_ptr<T> res(make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { lock_guard<mutex> lk(mut); return data_queue.empty(); } private: mutable std::mutex mut; std::queue<T> data_queue; std::condition_varable data_cond; }
|
当调用notify_one的时候只有一个线程会被唤醒,但不知道到底哪一个会被唤醒。notify_all则会唤醒所有的线程去检查自己当前的操作满不满足条件,如果不满足,则继续被阻塞。
future的使用
C++提供future来代表一次性事件,线程可以在future上等待一小段时间检查事件是否发生,而在检查的间隙执行其他任务。它还可以去执行其他任务,直到自己想所需要的事件已经发生才继续进行。 一旦状态已经发生,future就无法复位。C++里有两种future,unique_future:std::future和std::shared_future,语义和共享指针一样。虽然future被用于线程通讯,但是其本身并不提供同步访问,所以需要程序员手工提供同步或互斥保护。如果多个线程公用shared_future可以直接访问自己的shared_future而无需进一步的同步操作。如果你有一个耗时的计算任务,你需要该任务的运行结果来进行后续的计算可以使用std::async来启动一个异步任务,它返回一个future对象,当需要获取该值时,只需要在future对象上调用get(),便会一直阻塞线程直到获取到计算的返回值。
1 2 3 4 5 6 7 8 9
| #include <future> int find_the_answer(); void do_some_other(); int main() { future<int> the_answer = async(find_the_answer); do_some_other(); cout<<"The answer is "<<the_answer.get()<<endl; }
|
std::launch::async std::launch::async::deferred一个是在另一个线程上执行函数,另一个则是推迟函数的执行,直到调用future对象的wait或者get为止。async允许穿衣参数来为即将调用的函数传入参数,如果参数是右值则通过移动操作转移参数创建副本。
async并不是将任务和future对象绑定的唯一方式,package_task<>与promise<>也可以实现package_task是比promise更高层次的抽象,package_task将一个future绑定到可调用对象或者函数上,当package_task被调用时,他会调用函数或者可调用对象,并且等待future就绪,将返回值作为关联数据进行存储。使用package_task在线程之间传递任务的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| mutex mut; deque<package_task<void()>> tasks; bool is_shutdown_message(); void get_and_process_message(); void gui_thread() { while(!is_shutdown_message()) { get_and_process_message(); package_task<void()> task; { lock_guard<mutex> lk(mut); if(tasks.empty()) contune; task = move(tasks.front()); tasks.pop_front(); } task(); } }
thread bg_thread(gui_thread); template<typename Func> future<void> post_task_for_thread(Func f) { package_task<void()> task(f); future<void()> res = task.get_future(); lock_guard<mutex> lk(mut); tasks.push_back(move(task)); return res; }
|
Author:
Hongtao Zhang
Permalink:
https://zhootoo.github.io/2019/07/07/C-Concurrency/
License:
Copyright (c) 2019 CC-BY-NC-4.0 LICENSE
Slogan:
真正重要的东西,用眼睛是看不到的,必须用心灵去寻找.