同步

同步实际上就是在任务队列中的处理方式,包括线程池,观察者模式,回调任务队列,都会使用这个技术。而如何实现各个任务的同步,进行相互协调和相互等待,确保数据的准确性就很关键了。

等待

如何监听该线程所需要的变量何时解锁呢?那么就是等待嘛,忙等待即不断空转查询,也成为自旋锁;延时等待即每次延时一段时间然后查询结果,以上两种情况的弊端都是显而易见的,另外就是条件变量实现

1
2
3
4
5
6
7
8
9
10
11
//忙等待
bool flag = false;
mutex m;
void wait(){
unique_lock<mutex> mutex{ m };
while(!flag){
mutex.unlock();
// this_thread::sleep_for(chrono::milliseconds(100)); 延时等待
mutex.lock();
}
}

条件变量

标准库在< condition_variable >中提供了两套条件变量,分别是std::condition_variable,std::condition_variable_any。其中condition_variable主要用于只支持一种锁:它只能配合 std::unique_lock< std::mutex > 使用。如果你传给它一个 std::lock_guard,或者 std::shared_lock,或者你自己写的锁,代码直接编译报错。底层的 mutex 必须是标准的 std::mutex。也正因如此,这个条件变量的性能很好,是零封装开销

另外是std::condition_variable_any,支持任意锁:它可以配合任何满足锁协议(BasicLockable)的锁使用。

它可以配合 std::unique_lock< std::mutex > 。它可以配合 std::shared_lock< std::shared_mutex >(核心用途 1:读写锁)。它可以配合 std::unique_lock< std::recursive_mutex >(递归锁)。它可以配合你自己写的一个类,只要这个类有 lock() 和 unlock() 方法。

下面以一个生产者-消费者模型实现来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Thread_Safe_queue{
private:
mutable mutex m;
condition_variable data_condition;
queue<int> safe_queue;
public:
Thread_Safe_queue() = defalut;
void push(int new_value){
unique_lock<mutex> lk{m};
safe_queue.push_back(new_value);
lk.unlock();//必须先解锁,然后通知
data_condition.notify_one()
}
//消费者
void pop(int& old_value){
// 1. 必须使用 unique_lock,因为它允许中途解锁(wait需要这个能力)
unique_lock<mutex> lk{m};
/**
* wait有两个版本
* void wiat(std::unique_Lock<mutex>& lock)
* void wait(std::unique_lock<std::mutex>& lock, Predicate pred);,=可以避免虚假唤醒
*/
// 2. 调用 wait
// 第二个参数是 predicate(谓词),等价于 while (!condition) { wait(); }
// 它的逻辑是:
// A. 先检查 lambda 返回值。如果不为空,直接继续(不用睡)。
// B. 如果为空,执行 wait():
// -> 释放锁 mtx (Unlock)
// -> 睡觉 (Sleep)
// -> 被唤醒后,重新抢锁 mtx (Relock)
// -> 抢到后,再次检查 lambda。如果还不为空,继续睡;如果不为空,往下走。
data_condition.wait(lk,[this]{return !safe_queue.empty();});//添加谓词可以避免操作系统的虚假唤醒
// 3. 能走到这里,说明:
// a. 线程被唤醒了
// b. 锁 mtx 已经被当前线程重新拿到了(数据安全!)
// c. 队列不为空(或者任务结束)
old_value = safe_queue.front();
safe_queue.pop();
//使用智能指针对pop封装一下
std::shared_ptr<int> pop() {
std::unique_lock<std::mutex> lk{ m };
data_cond.wait(lk, [this] {return !safe_queue.empty(); });
std::shared_ptr<int> res { std::make_shared<int>(data_queue.front()) };
data_queue.pop();
return res;
}
bool empty()const {
std::lock_guard<std::mutex> lk (m);
return safe_queue.empty();
}
}
}
void producer(Thread_Safe_Queue<int>& q) {
for (int i = 0; i < 5; ++i) {
q.push(i);
}
}
void consumer(Thread_Safe_Queue<int>& q) {
for (int i = 0; i < 5; ++i) {
int value{};
q.pop(value);
}
}
int main() {
threadsafe_queue<int> q;

std::thread producer_thread(producer, std::ref(q));
std::thread consumer_thread(consumer, std::ref(q));

producer_thread.join();
consumer_thread.join();
}

异步

同步通过锁(原子操作)机制实现了线程运行过程中的数据通信。而异步则通过future,promise等实现了获得线程运行的结果

future使用

C++中引入future是为了解决异步通信问题,可以将其看成是交换数据基本类型。它可以获得async的返回结果,也可以和promise使用。
future类包括future类和shared_future类,前者只能关联一个事件,后者可以关联多个事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int task(int n){
cout<<this_thread::get_id();
return n*n;
}
int main(){
// 一行代码启动任务,没有任何锁,也没有共享变量
// std::async 负责创建线程并运行函数
future<int> future = async(task,10);//启动异步线程返回future对象
cout<<"main:"<<this_thread::get_id();
/**
* .get()函数阻塞,直到返回结果,
* valid函数检查当前的future是否处于关联状态,即是否关联任务,如果还未关联(或者调用了get、set),都会返回false
*/
cout<<std::boolalpha<<future.valid();//boolalpha使future.valid()输出true、false。这条语句结果为true
cout<<future.get();
cout<<std::boolalpha<<future.valid();//false
}

async类

和thread类相同,async类也是通过广义函数指针传入来实现调用,使用std::ref来实现引用。默认情况下传输给async的参数会按照值复制或者移动进入线程内部存储中。

并且它和 std::thread 一样,内部会将保有的参数副本转换为右值表达式进行传递,这是为了那些只支持移动的类型,左值引用没办法引用右值表达式,所以如果不使用 std::ref,这里 void f(int&) 就会导致编译错误,如果是 void f(const int&) 则可以通过编译,不过引用的不是我们传递的局部对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct X{
int operator()(int n)const{
return n * n;
}
};
struct Y{
int f(int n)const{
return n * n;
}
};
void f1(int& p) { std::cout << &p << '\n'; }
void f2(const int& p){ std::cout << &p << '\n'; }
int main(){
Y y;
int n = 0;
auto t1 = std::async(X{}, 10);
auto t2 = std::async(&Y::f,&y,10);
auto t3 = std::async([] {});
//std::async 会先将参数“衰变(decay)”并拷贝到内部存储中。
//假设你在主线程有一个变量 n,而 std::async 在内部创建了一个副本,我们叫它 internal_n。
auto t4 = std::async(f1, std::ref(n));
std::async(f1, n); // Error! 无法通过编译,async如果想要传引用,必须使用ref,否则仍然拷贝n,
//这与传引用的目的不符。async不应该拷贝,应该报错
std::async(f2, n); // OK! 可以通过编译,不过引用的并非是局部的n,形参基于const保证不改变,因此async内部可以拷贝。
std::cout << &n << '\n';
}

另外async还引入了创建锁的执行策略,包括

  1. std::launch::async 在不同线程上执行异步任务。
  2. std::launch::deferred 惰性求值,不创建线程,等待 future 对象调用 wait 或 get 成员函数的时候在本线程执行任务。
  3. 而如果不指定这个参数,那么由系统决定是否立刻创建线程执行任务
1
2
3
4
5
6
7
8
9
10
11
void f(){
std::cout << std::this_thread::get_id() << '\n';
}

int main(){
std::cout << std::this_thread::get_id() << '\n';
auto f1 = std::async(std::launch::deferred, f);
f1.wait(); // 在 wait() 或 get() 调用时执行,不创建线程,在此线程执行任务
auto f2 = std::async(std::launch::async,f); // 创建线程执行异步任务
auto f3 = std::async(std::launch::deferred | std::launch::async, f); // 实现选择的执行方式
}

最后,如果不使用future接收async返回值,那么异步程序将变为同步,严格表述如下:
如果从 std::async 获得的 std::future 没有被移动或绑定到引用,那么在完整表达式结尾, std::future 的析构函数将阻塞,直到到异步任务完成。因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数。

1
2
std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
std::async(std::launch::async, []{ g(); }); // f() 完成前不开始

package_task

在async中,强硬的绑定了任务的设定以及开始,对于一些业务场景是不完美的,有时我们需要决定什么时候打包任务到新的线程,什么时候启动这个线程。(需要注意,std::launch::deferred虽然决定了启动时刻,但是其是在本线程执行任务,是阻塞的)

而使用std::package_task,步骤:

  1. 在主线程把任务打包好。
  2. 拿到 future。
  3. 把这个包(task)扔进线程池的任务队列(Queue)。
  4. 线程池里的某个空闲线程抢到了这个包,执行 task()。
  5. 结果自动弹回到你的 future 里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

// 1. 任务的定义:一个耗时的计算函数
int heavy_computation(int input) {
std::cout << "Worker thread is performing heavy computation...\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
return input * 2;
}

int main() {
// 2. 将任务打包进 packaged_task
// 模板参数 <int(int)> 表示这是一个返回 int,接受一个 int 参数的函数
std::packaged_task<int(int)> task(heavy_computation);

// 3. 从 task 中获取 future,这是我们获取结果的唯一凭证
std::future<int> result_future = task.get_future();

// 4. 决定如何执行这个 task
// 这里我们创建一个新线程来执行它。
// 注意:packaged_task 是 move-only 的,所以必须用 std::move 传给线程。
std::thread worker_thread(std::move(task), 10); // 传递参数 10 给 heavy_computation

// 主线程可以继续做其他事情
std::cout << "Main thread is doing other work...\n";

// 5. 当主线程需要结果时,调用 get() 等待并获取
std::cout << "Main thread is waiting for the result...\n";
int result = result_future.get(); // 线程会在这里阻塞,直到 worker_thread 执行完毕

std::cout << "The result is: " << result << std::endl;

// 6. 回收线程资源
worker_thread.join();

return 0;
}

promise

线程之间通过promise和future来传递一次性的数据。promise存在于一个线程中,并承诺将会提供一个值或异常交给future处理,在这个过程中,一般promise是生产者,future是消费者,他们都会链接着一个共享状态。核心工作流程是,首先在发起(消费者)线程中创建一个std::promise对象,然后将 这个对象move到生产者线程,接下来消费者线程在需要数据的时候调用future.get(),如果生产者产生了值,那么返回,反之则阻塞。其中,生产者通过promise.set_value()设置值

promise是一个纯手动管理的过程。你需要自己在线程函数里显式调用 set_value。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

// 一个模拟耗时计算的函数
void worker_thread(std::promise<int> promise)
{
try {
std::cout << "工作线程:正在进行复杂的计算..." << std::endl;
// 模拟耗时 2 秒
std::this_thread::sleep_for(std::chrono::seconds(2));

int result = 42;
std::cout << "工作线程:计算完成,结果为 " << result << std::endl;

// 履行承诺:设置值
promise.set_value(result);

} catch (...) {
// 如果发生异常,我们也应该处理(见下一个例子)
// 这里我们简单地设置一个异常
try {
promise.set_exception(std::current_exception());
} catch (...) {} // set_exception 本身也可能抛出...
}
}

int main() {
// 1. 创建一个 promise 对象,承诺提供一个 int
std::promise<int> promise;

// 2. 从 promise 获取 future 对象
std::future<int> future = promise.get_future();

// 3. 创建工作线程,并将 promise "移动" (move) 进去
// std::promise 是不可拷贝的,只能移动
std::thread t(worker_thread, std::move(promise));

std::cout << "主线程:正在等待工作线程的结果..." << std::endl;

// 4. 主线程调用 future.get() 等待结果
// 这一行将会阻塞,直到工作线程调用了 set_value()
try {
int result = future.get(); // 获取值
std::cout << "主线程:从工作线程获取到的结果是 " << result << std::endl;
} catch (const std::exception& e) {
std::cout << "主线程:捕获到工作线程的异常:" << e.what() << std::endl;
}

// 5. 等待线程执行完毕
t.join();

return 0;
}

输出:
主线程:正在等待工作线程的结果...
工作线程:正在进行复杂的计算...
(等待 2 秒)
工作线程:计算完成,结果为 42
主线程:从工作线程获取到的结果是 42

信号量

信号量在C++20中引入,可以用来限制并发数量。相比之下,信号量是一个更加轻量级的实现,更纯粹的信号通知,没有所有权的限制,提供了计数资源。但是灵活性差,等待条件仅有信号量。下面详细介绍操作

std::binary_semaphore

C++ 提供了两个信号量类型:std::counting_semaphore 与 std::binary_semaphore,定义在 < semaphore > 中其中std::binary_semaphore只是std::counting_semaphore<1>的一个别名,也就是计数器只有0,1两个状态,但还是不同于mutex,但是首先Mutex具备所有权,谁上锁谁解锁,其次Mutex关注的是互斥 (Mutual Exclusion)。保护共享数据不被同时修改。Semaphore关注的是信号 (Signaling) 或 资源计数 (Resource Counting)。控制执行顺序或并发数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 全局二元信号量对象
// 设置对象初始计数为 0
std::binary_semaphore smph_signal_main_to_thread{ 0 };
std::binary_semaphore smph_signal_thread_to_main{ 0 };

void thread_proc() {
smph_signal_main_to_thread.acquire();// smph_signal_main_to_thread-1
std::cout << "[线程] 获得信号" << std::endl;

std::this_thread::sleep_for(3s);

std::cout << "[线程] 发送信号\n";
smph_signal_thread_to_main.release();//smph_signal_thread_to_main-1
}

int main() {
std::jthread thr_worker{ thread_proc };

std::cout << "[主] 发送信号\n";
smph_signal_main_to_thread.release(); //smph_signal_main_to_thread+1

smph_signal_thread_to_main.acquire(); //smph_signal_thread_to_main-1
std::cout << "[主] 获得信号\n";
}

acquire 函数就是我们先前说的“等待”(原子地减少计数),release 函数就是”释放”(原子地增加计数)。

std::counting_semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <semaphore>
#include <thread>
#include <vector>
#include <iostream>

// 停车场只有 3 个车位
std::counting_semaphore<3> parking_lot(3);

void car(int id) {
std::cout << "车 " << id << " 在排队...\n";

parking_lot.acquire();

std::cout << "车 " << id << " 进场停车 (占用资源)\n";
std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "车 " << id << " 离开 (释放资源)\n";

parking_lot.release();
}

int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) threads.emplace_back(car, i);
for (auto& t : threads) t.join();
}

闩与屏障

闩 (latch) 与屏障 (barrier) 是线程协调机制,允许任何数量的线程阻塞直至期待数量的线程到达。闩不能重复使用,而屏障则可以。
std::latch:单次使用的线程屏障
std::barrier:可复用的线程屏障
它们定义在标头 < latch > 与 < barrier >。
我的理解通过latch和barrier的引入可以使单次运行的线程尽可能多,提高Cpu利用率
以下是实例,实际很简单,和信号量很像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::latch work_start{ 3 };//只减少

void work(){
std::cout << "等待其它线程执行\n";
work_start.wait(); // 等待计数为 0
std::cout << "任务开始执行\n";
}

int main(){
std::jthread thread{ work };
std::this_thread::sleep_for(3s);
std::cout << "休眠结束\n";
work_start.count_down(); // 默认值是 1 减少计数 1
work_start.count_down(2); // 传递参数 2 减少计数 2
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::barrier barrier{ 10,
[n = 1]()mutable noexcept {std::cout << "\t第" << n++ << "轮结束\n"; }
};

void f(int start, int end){
for (int i = start; i <= end; ++i) {
std::osyncstream{ std::cout } << i << ' ';
barrier.arrive_and_wait(); // 减少计数并等待 解除阻塞时就重置计数并调用函数对象

std::this_thread::sleep_for(300ms);
}
}

int main(){
std::vector<std::jthread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(f, i * 10 + 1, (i + 1) * 10);
}
}

std::osyncstream ,它是 C++20 引入的,此处是确保输出流在多线程环境中同步,避免除数据竞争,而且将不以任何方式穿插或截断


本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。