同步 同步实际上就是在任务队列中的处理方式,包括线程池,观察者模式,回调任务队列,都会使用这个技术。而如何实现各个任务的同步,进行相互协调和相互等待,确保数据的准确性就很关键了。
等待 如何监听该线程所需要的变量何时解锁呢?那么就是等待嘛,忙等待即不断空转查询,也成为自旋锁;延时等待即每次延时一段时间然后查询结果,以上两种情况的弊端都是显而易见的,另外就是条件变量实现
1 2 3 4 5 6 7 8 9 10 11 bool flag = false ;mutex m; void wait () { unique_lock<mutex> mutex{ m }; while (!flag){ mutex.unlock (); mutex.lock (); } }
条件变量 标准库在< condition_variable >中提供了两套条件变量,分别是std::condition_variable,std::condition_variable_any。其中condition_variable主要用于只支持一种锁:它只能配合 std::unique_lock< std::mutex > 使用。如果你传给它一个 std::lock_guard,或者 std::shared_lock,或者你自己写的锁,代码直接编译报错。底层的 mutex 必须是标准的 std::mutex。也正因如此,这个条件变量的性能很好,是零封装开销
另外是std::condition_variable_any,支持任意锁:它可以配合任何满足锁协议(BasicLockable)的锁使用。
它可以配合 std::unique_lock< std::mutex > 。它可以配合 std::shared_lock< std::shared_mutex >(核心用途 1:读写锁)。它可以配合 std::unique_lock< std::recursive_mutex >(递归锁)。它可以配合你自己写的一个类,只要这个类有 lock() 和 unlock() 方法。
下面以一个生产者-消费者模型实现来说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 class Thread_Safe_queue {private : mutable mutex m; condition_variable data_condition; queue<int > safe_queue; public : Thread_Safe_queue () = defalut; void push (int new_value) { unique_lock<mutex> lk{m}; safe_queue.push_back (new_value); lk.unlock (); data_condition.notify_one () } void pop (int & old_value) { unique_lock<mutex> lk{m}; data_condition.wait (lk,[this ]{return !safe_queue.empty ();}); old_value = safe_queue.front (); safe_queue.pop (); std::shared_ptr<int > pop () { std::unique_lock<std::mutex> lk{ m }; data_cond.wait (lk, [this ] {return !safe_queue.empty (); }); std::shared_ptr<int > res { std::make_shared <int >(data_queue.front ()) }; data_queue.pop (); return res; } bool empty () const { std::lock_guard<std::mutex> lk (m) ; return safe_queue.empty (); } } } void producer (Thread_Safe_Queue<int >& q) { for (int i = 0 ; i < 5 ; ++i) { q.push (i); } } void consumer (Thread_Safe_Queue<int >& q) { for (int i = 0 ; i < 5 ; ++i) { int value{}; q.pop (value); } } int main () { threadsafe_queue<int > q; std::thread producer_thread (producer, std::ref(q)) ; std::thread consumer_thread (consumer, std::ref(q)) ; producer_thread.join (); consumer_thread.join (); }
异步 同步通过锁(原子操作)机制实现了线程运行过程中的数据通信。而异步则通过future,promise等实现了获得线程运行的结果
future使用 C++中引入future是为了解决异步通信问题,可以将其看成是交换数据基本类型。它可以获得async的返回结果,也可以和promise使用。 future类包括future类和shared_future类,前者只能关联一个事件,后者可以关联多个事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int task (int n) { cout<<this_thread::get_id (); return n*n; } int main () { future<int > future = async (task,10 ); cout<<"main:" <<this_thread::get_id (); cout<<std::boolalpha<<future.valid (); cout<<future.get (); cout<<std::boolalpha<<future.valid (); }
async类 和thread类相同,async类也是通过广义函数指针传入来实现调用,使用std::ref来实现引用。默认情况下传输给async的参数会按照值复制或者移动进入线程内部存储中。
并且它和 std::thread 一样,内部会将保有的参数副本转换为右值表达式进行传递,这是为了那些只支持移动的类型,左值引用没办法引用右值表达式,所以如果不使用 std::ref,这里 void f(int&) 就会导致编译错误,如果是 void f(const int&) 则可以通过编译,不过引用的不是我们传递的局部对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 struct X { int operator () (int n) const { return n * n; } }; struct Y { int f (int n) const { return n * n; } }; void f1 (int & p) { std::cout << &p << '\n' ; }void f2 (const int & p) { std::cout << &p << '\n' ; }int main () { Y y; int n = 0 ; auto t1 = std::async (X{}, 10 ); auto t2 = std::async (&Y::f,&y,10 ); auto t3 = std::async ([] {}); auto t4 = std::async (f1, std::ref (n)); std::async (f1, n); std::async (f2, n); std::cout << &n << '\n' ; }
另外async还引入了创建锁的执行策略 ,包括
std::launch::async 在不同线程上执行异步任务。
std::launch::deferred 惰性求值,不创建线程,等待 future 对象调用 wait 或 get 成员函数的时候在本线程执行任务。
而如果不指定这个参数,那么由系统决定是否立刻创建线程执行任务
1 2 3 4 5 6 7 8 9 10 11 void f () { std::cout << std::this_thread::get_id () << '\n' ; } int main () { std::cout << std::this_thread::get_id () << '\n' ; auto f1 = std::async (std::launch::deferred, f); f1.wait (); auto f2 = std::async (std::launch::async,f); auto f3 = std::async (std::launch::deferred | std::launch::async, f); }
最后,如果不使用future接收async返回值,那么异步程序将变为同步,严格表述如下: 如果从 std::async 获得的 std::future 没有被移动或绑定到引用,那么在完整表达式结尾, std::future 的析构函数将阻塞,直到到异步任务完成 。因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数。
1 2 std::async (std::launch::async, []{ f (); }); std::async (std::launch::async, []{ g (); });
package_task 在async中,强硬的绑定了任务的设定以及开始,对于一些业务场景是不完美的,有时我们需要决定什么时候打包任务到新的线程,什么时候启动这个线程。(需要注意,std::launch::deferred虽然决定了启动时刻,但是其是在本线程执行任务,是阻塞的)
而使用std::package_task,步骤:
在主线程把任务打包好。
拿到 future。
把这个包(task)扔进线程池的任务队列(Queue)。
线程池里的某个空闲线程抢到了这个包,执行 task()。
结果自动弹回到你的 future 里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <iostream> #include <future> #include <thread> #include <chrono> int heavy_computation (int input) { std::cout << "Worker thread is performing heavy computation...\n" ; std::this_thread::sleep_for (std::chrono::seconds (2 )); return input * 2 ; } int main () { std::packaged_task<int (int ) > task (heavy_computation) ; std::future<int > result_future = task.get_future (); std::thread worker_thread (std::move(task), 10 ) ; std::cout << "Main thread is doing other work...\n" ; std::cout << "Main thread is waiting for the result...\n" ; int result = result_future.get (); std::cout << "The result is: " << result << std::endl; worker_thread.join (); return 0 ; }
promise 线程之间通过promise和future来传递一次性的数据。promise存在于一个线程中,并承诺将会提供一个值或异常交给future处理,在这个过程中,一般promise是生产者,future是消费者,他们都会链接着一个共享状态。核心工作流程是,首先在发起(消费者)线程中创建一个std::promise对象,然后将 这个对象move到生产者线程,接下来消费者线程在需要数据的时候调用future.get(),如果生产者产生了值,那么返回,反之则阻塞。其中,生产者通过promise.set_value()设置值
promise是一个纯手动管理的过程。你需要自己在线程函数里显式调用 set_value。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include <iostream> #include <thread> #include <future> #include <chrono> void worker_thread (std::promise<int > promise) { try { std::cout << "工作线程:正在进行复杂的计算..." << std::endl; std::this_thread::sleep_for (std::chrono::seconds (2 )); int result = 42 ; std::cout << "工作线程:计算完成,结果为 " << result << std::endl; promise.set_value (result); } catch (...) { try { promise.set_exception (std::current_exception ()); } catch (...) {} } } int main () { std::promise<int > promise; std::future<int > future = promise.get_future (); std::thread t (worker_thread, std::move(promise)) ; std::cout << "主线程:正在等待工作线程的结果..." << std::endl; try { int result = future.get (); std::cout << "主线程:从工作线程获取到的结果是 " << result << std::endl; } catch (const std::exception& e) { std::cout << "主线程:捕获到工作线程的异常:" << e.what () << std::endl; } t.join (); return 0 ; } 输出: 主线程:正在等待工作线程的结果... 工作线程:正在进行复杂的计算... (等待 2 秒) 工作线程:计算完成,结果为 42 主线程:从工作线程获取到的结果是 42
信号量 信号量在C++20中引入,可以用来限制并发数量。相比之下,信号量是一个更加轻量级的实现,更纯粹的信号通知,没有所有权的限制,提供了计数资源。但是灵活性差,等待条件仅有信号量。下面详细介绍操作
std::binary_semaphore C++ 提供了两个信号量类型:std::counting_semaphore 与 std::binary_semaphore,定义在 < semaphore > 中其中std::binary_semaphore只是std::counting_semaphore<1>的一个别名,也就是计数器只有0,1两个状态,但还是不同于mutex,但是首先Mutex具备所有权,谁上锁谁解锁,其次Mutex关注的是互斥 (Mutual Exclusion)。保护共享数据不被同时修改。Semaphore关注的是信号 (Signaling) 或 资源计数 (Resource Counting)。控制执行顺序或并发数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 std::binary_semaphore smph_signal_main_to_thread{ 0 }; std::binary_semaphore smph_signal_thread_to_main{ 0 }; void thread_proc () { smph_signal_main_to_thread.acquire (); std::cout << "[线程] 获得信号" << std::endl; std::this_thread::sleep_for (3 s); std::cout << "[线程] 发送信号\n" ; smph_signal_thread_to_main.release (); } int main () { std::jthread thr_worker{ thread_proc }; std::cout << "[主] 发送信号\n" ; smph_signal_main_to_thread.release (); smph_signal_thread_to_main.acquire (); std::cout << "[主] 获得信号\n" ; }
acquire 函数就是我们先前说的“等待”(原子地减少计数),release 函数就是”释放”(原子地增加计数)。
std::counting_semaphore 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <semaphore> #include <thread> #include <vector> #include <iostream> std::counting_semaphore<3> parking_lot (3 ) ; void car (int id) { std::cout << "车 " << id << " 在排队...\n" ; parking_lot.acquire (); std::cout << "车 " << id << " 进场停车 (占用资源)\n" ; std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "车 " << id << " 离开 (释放资源)\n" ; parking_lot.release (); } int main () { std::vector<std::thread> threads; for (int i = 0 ; i < 5 ; ++i) threads.emplace_back (car, i); for (auto & t : threads) t.join (); }
闩与屏障 闩 (latch) 与屏障 (barrier) 是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达。闩不能重复使用,而屏障则可以。 std::latch:单次使用的线程屏障 std::barrier:可复用的线程屏障 它们定义在标头 < latch > 与 < barrier >。 我的理解通过latch和barrier的引入可以使单次运行的线程尽可能多,提高Cpu利用率 以下是实例,实际很简单,和信号量很像
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 std::latch work_start{ 3 }; void work () { std::cout << "等待其它线程执行\n" ; work_start.wait (); std::cout << "任务开始执行\n" ; } int main () { std::jthread thread{ work }; std::this_thread::sleep_for (3 s); std::cout << "休眠结束\n" ; work_start.count_down (); work_start.count_down (2 ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 std::barrier barrier{ 10 , [n = 1 ]()mutable noexcept {std::cout << "\t第" << n++ << "轮结束\n" ; } }; void f (int start, int end) { for (int i = start; i <= end; ++i) { std::osyncstream{ std::cout } << i << ' ' ; barrier.arrive_and_wait (); std::this_thread::sleep_for (300 ms); } } int main () { std::vector<std::jthread> threads; for (int i = 0 ; i < 10 ; ++i) { threads.emplace_back (f, i * 10 + 1 , (i + 1 ) * 10 ); } }
std::osyncstream ,它是 C++20 引入的,此处是确保输出流在多线程环境中同步,避免除数据竞争,而且将不以任何方式穿插或截断