最开始读的时候读的是周全的版本,翻译的真是一点都不周全,耽误了一天,也没看懂什么,后面看到了一些好的文章,这里贴链接(卢瑟国国王真不是盖的
现代C++并发编程教程

线程管理

The first

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iosetream>
#include <thread>
using namespace std;
class BackGround{
public:
void operator()() const
{
do_somthing();
}
}
int main(){
BackGround function;
thread my_thread(function);//传入广义函数指针
/**
* 包括多种形式
* 例如常规的function()-> function
* 函数对象,上面所做的重载了()的background类
* lamda表达式
* 指向成员函数的指针,thread member_function_thread(&Class::do_function,&my_class)
*/
//不能写成thread my_thread(BackGround()),由于C++的二义性,BackGround()不会被解析成一个临时对象,而是一个指向函数的指针,该函数无参数且返回值为 background_task,使用下面两种形式替代。
// std::thread my_thread((background_task())); // 1
// std::thread my_thread{background_task()};
//

}
  • 当前环境的支持并发数
    使用int n = std::thread::hardware_concurrency();可以得到

生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
do_something(i); // 1. 潜在访问隐患:悬空引用
}
}
};
void oops()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach(); // 2. 不等待线程结束
} //栈销毁,产生悬空指针

使用join或者detach函数来管理线程是非常粗暴的,尽管通过将join放入catch语句中可以保证其不被异常摧毁(书上给出的第二个例子)。但是维护起来较稳困难,如果引入另外的try-catch机制,可能会因为忘记join导致机制失效。最好的方式肯定是RAII,一般来说对于一个难以管控的生命周期,最好的方式就是设计成类,像智能指针一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class thread_guard
{
std::thread& t;
public:
// 构造函数获取线程资源
explicit thread_guard(std::thread& t_) : t(t_) {}

// 析构函数在对象销毁时自动调用
~thread_guard()
{
// 确保线程仍然可以被 join,仅能被调用一次
if (t.joinable())
{
t.join();
}
}

// 禁止拷贝构造和拷贝赋值,防止资源管理权混乱
thread_guard(const thread_guard&) = delete;
thread_guard& operator=(const thread_guard&) = delete;
};

线程传参

  • 拷贝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void print_data(int id, double value, const std::string& message)
{
std::cout << "线程 " << id << ": " << message << " (值为: " << value << ")" << std::endl;
}

int main()
{
int thread_id = 1;
double data_value = 3.14;
std::string msg = "Hello from thread!";

// 启动线程,直接在后面附上参数
std::thread t(print_data, thread_id, data_value, msg);

t.join();

return 0;
}
  • 引用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void modify_value(int& val) {
std::cout << "子线程:开始修改值..." << std::endl;
val = 100;
std::cout << "子线程:修改完成。" << std::endl;
}

int main()
{
int my_value = 10;
std::cout << "主线程:启动前, my_value = " << my_value << std::endl;

// 使用 std::ref 将 my_value 的引用传递给线程
std::thread t(modify_value, std::ref(my_value));

t.join();

std::cout << "主线程:结束后, my_value = " << my_value << std::endl;

return 0;
}
  • 指针
1
2
3
4
5
6
7
8
9
10
void modify_via_pointer(int* p_val) {
if (p_val) {
*p_val = 200;
}
}

int my_value = 10;
std::thread t(modify_via_pointer, &my_value);
t.join();
// 此时 my_value 变为 200
  • 移动语义对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void process_data(std::unique_ptr<int> p)
{
std::cout << "子线程:接收到数据 " << *p << std::endl;
*p += 10;
std::cout << "子线程:处理后数据 " << *p << std::endl;
}

int main()
{
auto my_ptr = std::make_unique<int>(42);

// 使用 std::move 转移 my_ptr 的所有权
std::thread t(process_data, std::move(my_ptr));

// 转移后,main 函数中的 my_ptr 变为空指针
if (!my_ptr) {
std::cout << "主线程:my_ptr 的所有权已被移交。" << std::endl;
}

t.join();
return 0;
}

std::this_thread命名空间

  • get_id()
1
2
3
4
5
6
7
int main(){
cont<<this::thread::get_id()<<endl;
thread t{ []{
cout<<this_thread::get_id()<<endl;
}}
t.join();
}
  • sleep_for
1
2
3
4
using namespace chrono_literals;
int main(){
this_thread::sleep_for(3s);
}
  • yield
1
2
3
while (!isDone()){
std::this_thread::yield();
}
  • sleep_until
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int main() {
// 获取当前时间点
auto now = std::chrono::system_clock::now();

// 设置要等待的时间点为当前时间点之后的5秒
auto wakeup_time = now + 5s;

// 输出当前时间
auto now_time = std::chrono::system_clock::to_time_t(now);
std::cout << "Current time:\t\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;

// 输出等待的时间点
auto wakeup_time_time = std::chrono::system_clock::to_time_t(wakeup_time);
std::cout << "Waiting until:\t\t" << std::put_time(std::localtime(&wakeup_time_time), "%H:%M:%S") << std::endl;

// 等待到指定的时间点
std::this_thread::sleep_until(wakeup_time);

// 输出等待结束后的时间
now = std::chrono::system_clock::now();
now_time = std::chrono::system_clock::to_time_t(now);
std::cout << "Time after waiting:\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;
}

转移所有权

首先thread是可移动但是不可被拷贝的。其次我认为帖子中两个例子给的很好,见下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int main() {
std::thread t{ [] {
std::cout << std::this_thread::get_id() << '\n';
} };
std::cout << t.joinable() << '\n'; // 线程对象 t 当前关联了活跃线程 打印 1
std::thread t2{ std::move(t) }; // 将 t 的线程资源的所有权移交给 t2
std::cout << t.joinable() << '\n'; // 线程对象 t 当前没有关联活跃线程 打印 0
//t.join(); // Error! t 没有线程资源
t2.join(); // t2 当前持有线程资源
}


int main() {
std::thread t; // 默认构造,没有关联活跃线程
std::cout << t.joinable() << '\n'; // 0
std::thread t2{ [] {} };
t = std::move(t2); // 转移线程资源的所有权到 t
std::cout << t.joinable() << '\n'; // 1,joinable代表的是资源是否存在,而不是任务是否存在。
t.join();

t2 = std::thread([] {});//这个临时对象是一个右值,所以移动赋值语句会被自动调用,无需使用move
t2.join();
}

std::thread f(){
std::thread t{ [] {} };
return t;//使用移动构造语句返回所有权
}
int main(){
std::thread rt = f();
rt.join();
}

void f(std::thread t){
t.join();
}
int main(){
std::thread t{ [] {} };
f(std::move(t));
f(std::thread{ [] {} });
}

这里贴上thread源码解析

实现一个joining_thread

即实现自动管理线程析构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class joining_thread{
thread t;
public:
joining_thread() = default;
template<typename Callable, typename... Args>
explicit joining_thread(Callable&& func, Args&&...args) :
t{ std::forward<Callable>(func), std::forward<Args>(args)... } {}
explicit joining_thread(std::thread t_)noexcept : t{ std::move(t_) } {}
joining_thread(joining_thread&& other)noexcept : t{ std::move(other.t) } {}

joining_thread& operator=(std::thread&& other)noexcept {
if (joinable()) { // 如果当前有活跃线程,那就先执行完
join();
}
t = std::move(other);
return *this;
}
~joining_thread(){
if(joinable()){
join();
}
}
void swap(joining_thread& other)noexcept {
t.swap(other.t);
}
std::thread::id get_id()const noexcept {
return t.get_id();
}
bool joinable()const noexcept {
return t.joinable();
}
void join() {
t.join();
}
void detach() {
t.detach();
}
std::thread& data()noexcept {
return t;
}
const std::thread& data()const noexcept {
return t;
}
}

jthread实现

通过引入thread和stop_source成员实现,自动调用join函数,停止功能实现。、

对于停止功能,C++ 的 std::jthread 提供的线程停止功能并不同于常见的 POSIX 函数 pthread_cancel。pthread_cancel 是一种发送取消请求的函数,但并不是强制性的线程终止方式。目标线程的可取消性状态和类型决定了取消何时生效。当取消被执行时,进行清理和终止线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using namespace std::literals::chrono_literals;

void f(std::stop_token stop_token, int value){
while (!stop_token.stop_requested()){ // 检查是否已经收到停止请求
std::cout << value++ << ' ' << std::flush;
std::this_thread::sleep_for(200ms);
}
std::cout << std::endl;
}

int main(){
std::jthread thread{ f, 1 }; // 打印 1..15 大约 3 秒
std::this_thread::sleep_for(3s);
// jthread 的析构函数调用 request_stop() 和 join()。
}

void _Try_cancel_and_join() noexcept {
if (_Impl.joinable()) {
_Ssource.request_stop();
_Impl.join();
}
}
~jthread() {
_Try_cancel_and_join();
}

jthread类提供三个函数用于线程停止

  • get_stop_source:返回与 jthread 对象关联的 std::stop_source,允许从外部请求线程停止。
  • get_stop_token:返回与 jthread 对象停止状态关联的 std::stop_token,允许检查是否有停止请求。
  • request_stop:请求线程停止。

共享数据

  1. 首先是一段常规的互斥锁设计,基于mutex类中的lock函数的同步阻塞机制实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <mutex>
std::mutex m;
void f(){
m.lock();
std::cout<<std::this_thread::get_id()<<'\n';
m.unclock();
}
int main(){
std::vector<std::thread> threads;
for (std::size_t i = 0; i < 10; ++i)
threads.emplace_back(f);

for (auto& thread : threads)
thread.join();
}
  1. 接下来是基于lock_gard类
1
2
3
4
void f() {
std::lock_guard<std::mutex> lc{ m };
std::cout << std::this_thread::get_id() << '\n';
}

这是一个RAII类设计,初始化上锁,析构解锁。所以也就可以知道,使用这种方法只有当离开了作用域之后,才会解锁(析构)。很明显如果作用域太大,这样的粒度就会很大,文章中提供了这样的一个实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::mutex m;

void add_to_list(int n, std::list<int>& list) {
std::vector<int> numbers(n + 1);
std::iota(numbers.begin(), numbers.end(), 0);
int sum = std::accumulate(numbers.begin(), numbers.end(), 0);

{
std::lock_guard<std::mutex> lc{ m };
list.push_back(sum);
}
}
void print_list(const std::list<int>& list){
std::lock_guard<std::mutex> lc{ m };
for(const auto& i : list){
std::cout << i << ' ';
}
std::cout << '\n';
}
  1. 同样的C++自然会提供一个非阻塞方式的获得锁,实例如下
1
2
3
4
5
6
7
8
9
10
std::mutex m;
void thread_fuction(int id){
if(m.try_lock()){
cout<<"获得锁";
std::this_thread::sleep_for(std::chrono::millisecongd(100));
m.unlock();
}else{
cout<<"获得锁失败";
}
}
  1. unique_lock
    这是一个灵活的锁,相较于lock_guard,它并不需要使用{}限定作用域来缩小粒度。这个类内部具备lock函数、try_lock()、unlock()、其中lock函数的解析见下一部分。首先介绍这个类的部分源码(参考文章中贴出来的部分
1
2
3
4
5
6
7
8
9
10
11
private:
_Mutex* _Pmtx = nullptr;//互斥量指针
bool _Owns = false;//所有权
public:
unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock
~unique_lock() noexcept {
if (_Owns) {//具有所有权,才能进行unlock
_Pmtx->unlock();
}
}

下面是使用

1
2
3
4
5
6
7
8
9
10
11
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
lhs.m.lock();
rhs.m.lock();
std::unique_lock<std::mutex> lock1{ lhs.m, std::adopt_lock };//defer_lock->不获得互斥体的所有权。
//adopt_lock->不上锁但是具备所有权
std::unique_lock<std::mutex> lock2{ rhs.m, std::adopt_lock };
swap(lhs.object, rhs.object);
lock1.unlock();
lock2.unlock();//灵活解锁,控制粒度
}

unique_lock还能实现在不同的作用域传递互斥量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <mutex>
#include <fmt/core.h>

std::unique_lock<std::mutex>get_lock(){
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk{ some_mutex };
return lk;
}
void process_data(){
std::unique_lock<std::mutex> lk{ get_lock() };
// 执行一些任务...
}

int main(){
process_data();
fmt::print("End\n");
}
  1. shared_time_mutex和shared_mutex
    普通的,简单的mutex是不能够满足并发需求的,例如读多写少情况,而读操作是线程安全的。
    std::shared_mutex 同样支持 std::lock_guard、std::unique_lock。和 std::mutex 做的一样,保证写线程的独占访问。而那些无需修改数据结构的读线程,可以使用 std::shared_lockstd::shared_mutex 获取访问权,多个线程可以一起读取。shared_time_mutex相较于另一个提供了超时操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Settings {
private:
std::map<std::string, std::string> data_;
mutable std::shared_mutex mutex_; // “M&M 规则”:mutable 与 mutex 一起出现

public:
void set(const std::string& key, const std::string& value) {
std::lock_guard<std::shared_mutex> lock{ mutex_ };
data_[key] = value;
}

std::string get(const std::string& key) const {
std::shared_lock<std::shared_mutex> lock(mutex_);//读操作,共享锁,允许多个线程
//所提的::unique_lock<std::shared_mutex> lock(mutex_)//写操作,独享
auto it = data_.find(key);
return (it != data_.end()) ? it->second : ""; // 如果没有找到键返回空字符串
}
};
  1. recursive_mutex,不良设计
    这是为了解决递归调用过程中重复加锁问题的,需要注意,加了多少次锁,就要节多少次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex mtx;

void recursive_function(int count) {
// 递归函数,每次递归都会锁定互斥量
mtx.lock();
std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
if (count > 0) {
recursive_function(count - 1); // 递归调用
}
mtx.unlock(); // 解锁互斥量
}

int main() {
std::thread t1(recursive_function, 3);
std::thread t2(recursive_function, 2);

t1.join();
t2.join();
}
  1. 格外注意应该避免出现暴露锁内数据,见下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class data{
int a[];
string b[];
public:
void do_something();
};
class Data_wrapper{
data d;
mutex m;
public:
template<class Func>
void process_data(Func func){
lock_guard<mutex> lv{m};
func(data);

}
};
data *p = nullptr;
void malicious_function(data& d){
p = &d;
}
Data_wrapper d;

void foo(){
d.process_data(malicious_function); // 传递了一个恶意的函数
p->do_something(); // 在无保护的情况下访问保护数据
}

死锁

  1. 死锁问题发生在多个互斥量同时存在,并且争夺对象的时候。下面是一个简单例子,不同的加锁顺序导致了互相等待。
1
2
3
4
5
6
7
8
9
10
mutex m1;
mutex m2;
void swap_turn1(){
lock_guard<mutex> lc1 {m1};
lock_guard<mutex> lc2 {m2};
}
void swap_turn2(){
lock_guard<mutex> lc1 {m2};
lock_guard<mutex> lc2 {m1};
}
  1. 对此C++提供了std::lock(std::mutex,std::mutex)函数和std::scope_lock(std::mutex,std::mutex)
1
2
3
4
5
6
7
8
9
void swap(CirSource &first,CirSource &second)
{
std::lock(first.dataLock,secode.dataLock);
std::lock_guard<std::mutex> lockf(first.dataLock,std::adopt_lock);
std::lock_guard<std::mutex> locks(second.dataLock,std::adopt_lock);
int temp = first.n1;
first.n1 = second.n1;
second.n1 = temp;
}

对lock的解析来自这篇文章
,文中提到其实现逻辑时首先对一把锁进行加锁,然后对另外一把锁使用try_lock,如果加锁成功则成功返回,如果加锁失败则通过__libcpp_thread_yield进行一次线程切换之后,在通过相反的顺序进行尝试加锁。周而复始对锁进行尝试的加锁。在lock完成上锁后,需要传输adopt_lock参数,guard_lock仅仅时对锁的所有权的转换,而不进行实际加锁操作。

scope_lock则是将上边的部分封装到类,实现RAII。

  1. 避免死锁
  • 避免死锁最基本的原则毫无疑问就是同一个线程智能持有一个锁。

首先,死锁需要具备四个条件:

互斥:资源不能共享(std::mutex 天然满足)。
持有并等待 (Hold and Wait):一个线程至少持有一个资源,并且正在等待获取另一个被其他线程持有的资源。
非抢占:资源不能被强制夺走,只能由持有者自愿释放(std::mutex 满足)。
循环等待 (Circular Wait):存在一个线程等待链,每个线程都在等待下一个线程持有的资源。

不同线程由于互斥量的存在,不可能同时把持一段数据的。考虑下面的场景

线程1:
成功锁住 mutex_A (持有)。
尝试去锁 mutex_B (等待)。

线程2:
成功锁住 mutex_B (持有)。
尝试去锁 mutex_A (等待)。

如果应用我们提到的原则,那么线程1必须释放A才能获取B,自然避免死锁。

但是同样毫无疑问,真实的业务场景怎么可能这么简单,最常见的是需要对数据进行拷贝或者赋值,那么就需要对原来变量以及操作变量同时上锁,这就需要使用上面提供的一些RAII类,他们只会有两个返回,而绝不会只锁住一半。

成功锁住所有的锁
抛出异常并释放原来锁

下面是另外的解决需要持有多个锁的时候,注意事项

  • 避免在持有锁的时候调用外部代码

指的是上面说的格外注意应该避免出现暴露锁内数据

  • 使用固定顺序获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BankAccount {
public:
double balance;
std::mutex m;
// ...
};

// 一个线程安全的转账函数
void transfer(BankAccount& from, BankAccount& to, double amount) {

// 为了避免死锁,我们必须按固定顺序锁住 from.m 和 to.m
// 我们约定:总是先锁地址较小的那个互斥量

if (&from.m < &to.m) {
// from 的地址小,先锁 from,再锁 to
std::lock_guard<std::mutex> lock_from(from.m);
std::lock_guard<std::mutex> lock_to(to.m);

from.balance -= amount;
to.balance += amount;
} else {
// to 的地址小,先锁 to,再锁 from
std::lock_guard<std::mutex> lock_to(to.m);
std::lock_guard<std::mutex> lock_from(from.m);

from.balance -= amount;
to.balance += amount;
}
}

其他保护共享数据方法

保护共享数据并非必须使用互斥量,互斥量只是其中一种常见的方式而已,对于一些特殊的场景,也有专门的保护方式,比如对于共享数据的初始化过程的保护。我们通常就不会用互斥量,这会造成很多的额外开销。

  1. 首先应该提到单例模式的初始化,对于C++11以上,静态变量的初始化是线程安全的因此可以利用static保护共享对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class FormToolBox {
public:
static FormToolBox& GetInstance() {
// 这行代码在多线程环境下是安全的,且只执行一次
static FormToolBox instance;
return instance;
}
FormToolBox(const FormToolBox&) = delete;
FormToolBox& operator=(const FormToolBox&) = delete;
private:
FormToolBox() {
std::cout << "FormToolBox instance created." << std::endl;
}
~FormToolBox() = default;
};
  1. 常规双检锁
    这个应该少用,例如reset函数并不是原子性的,而是分成三步,分配内存,移动指针,实例对象。有可能会导致do_something在移动指针后就执行,会报错。
1
2
3
4
5
6
7
8
9
void f(){
if(!ptr){ // 1
std::lock_guard<std::mutex> lk{ m };
if(!ptr){ // 2
ptr.reset(new some); // 3
}
}
ptr->do_something(); // 4
}
  1. call_once安全方法
1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some> ptr;
std::once_flag resource_flag;

void init_resource(){
ptr.reset(new some);
}

void foo(){
std::call_once(resource_flag, init_resource); // 线程安全的一次初始化
ptr->do_something();
}

线程变量

这里推荐一篇文章.

同步

同步实际上就是在任务队列中的处理方式,包括线程池,观察者模式,回调任务队列,都会使用这个技术。而如何实现各个任务的同步,进行相互协调和相互等待,确保数据的准确性就很关键了。

等待

等待分成忙等待,延时等待,以及条件变量

1
2
3
4
5
6
7
8
9
10
11
//忙等待
bool flag = false;
mutex m;
void wait(){
unique_lock<mutex> mutex{ m };
while(!flag){
mutex.unlock();
// this_thread::sleep_for(chrono::milliseconds(100)); 延时等待
mutex.lock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//条件变量
mutex m;
/**
* 存在两套条件变量类型,ocndition_variable和condition_variable_any
* 前者用于unique_lock<mutex>,后者用于可基本锁定的锁,适用广但是性能开销大
*/
condition_variable cv;
bool flag = false;
void wait(){
unique_lock<mutex> lk{m};
/**
* wait有两个版本
* void wiat(std::unique_Lock<mutex>& lock)
* void wait(std::unique_lock<std::mutex>& lock, Predicate pred);,=可以避免虚假唤醒
*/
cv.wait(lk,[]{return wait;});//cv被唤醒,并且wait = ture->cout
cout<<"执行"
}
void simulate_doing(){
this_thread::sleep_for(chrono::seconds(5));
{
lock_guard<mutex> lk(m);
flag = true;
}
cv.notify_one();//唤醒一个等待条件变量的线程
}

线程安全的队列

生产者-消费者模型实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Thread_Safe_queue{
private:
mutable mutex m;
condition_variable data_condition;
queue<int> safe_queue;
public:
Thread_Safe_queue() = defalut;
void push(int new_value){

unique_lock<mutex> lk{m};
safe_queue.push_back(new_value);
lk.unlock();
data_condition.notify_one()
}
}
void pop(int& old_value){
unique_lock<mutex> lk{m};
data_condition.wait(lk,[this]{return !safe_queue.empty();});
old_value = safe_queue.front();
safe_queue.pop();
std::shared_ptr<int> pop() {
std::unique_lock<std::mutex> lk{ m };
data_cond.wait(lk, [this] {return !safe_queue.empty(); });
std::shared_ptr<int> res { std::make_shared<int>(data_queue.front()) };
data_queue.pop();
return res;
}
bool empty()const {
std::lock_guard<std::mutex> lk (m);
return safe_queue.empty();
}
}
void producer(Thread_Safe_Queue<int>& q) {
for (int i = 0; i < 5; ++i) {
q.push(i);
}
}
void consumer(Thread_Safe_Queue<int>& q) {
for (int i = 0; i < 5; ++i) {
int value{};
q.pop(value);
}
}

异步

future使用

C++中引入future是为了解决异步通信问题,可以将其看成是交换数据基本类型。它可以获得async的返回结果,也可以和promise使用。
future类包括future类和shared_future类,前者只能关联一个事件,后者可以关联多个事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int task(int n){
cout<<this_thread::get_id();
return n*n;
}
int main(){
future<int> future = async(task,10);//启动异步线程返回future对象
cout<<"main:"<<this_thread::get_id();
/**
* .get()函数阻塞,直到返回结果,
* valid函数检查当前的future是否处于关联状态,即是否关联任务,如果还未关联(或者调用了get、set),都会返回false
*/
cout<<boolalpha<<future.valid();//boolalpha使输出true、false。这条语句结果为true
cout<<future.get();
cout<<boolalpha<<future.valid();//false
}

promise和future

线程之间通过promise和future来传递一次性的数据。promise存在于一个线程中,并承诺将会提供一个值或异常交给future处理,在这个过程中,一般promise是生产者,future是消费者,他们都会链接着一个共享状态。核心工作流程是,首先在发起(消费者)线程中创建一个std::promise对象,然后将 这个对象move到生产者线程,接下来消费者线程在需要数据的时候调用future.get(),如果生产者产生了值,那么返回,反之则阻塞。其中,生产者通过promise.set_value()设置值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

// 一个模拟耗时计算的函数
void worker_thread(std::promise<int> promise)
{
try {
std::cout << "工作线程:正在进行复杂的计算..." << std::endl;
// 模拟耗时 2 秒
std::this_thread::sleep_for(std::chrono::seconds(2));

int result = 42;
std::cout << "工作线程:计算完成,结果为 " << result << std::endl;

// 履行承诺:设置值
promise.set_value(result);

} catch (...) {
// 如果发生异常,我们也应该处理(见下一个例子)
// 这里我们简单地设置一个异常
try {
promise.set_exception(std::current_exception());
} catch (...) {} // set_exception 本身也可能抛出...
}
}

int main() {
// 1. 创建一个 promise 对象,承诺提供一个 int
std::promise<int> promise;

// 2. 从 promise 获取 future 对象
std::future<int> future = promise.get_future();

// 3. 创建工作线程,并将 promise "移动" (move) 进去
// std::promise 是不可拷贝的,只能移动
std::thread t(worker_thread, std::move(promise));

std::cout << "主线程:正在等待工作线程的结果..." << std::endl;

// 4. 主线程调用 future.get() 等待结果
// 这一行将会阻塞,直到工作线程调用了 set_value()
try {
int result = future.get(); // 获取值
std::cout << "主线程:从工作线程获取到的结果是 " << result << std::endl;
} catch (const std::exception& e) {
std::cout << "主线程:捕获到工作线程的异常:" << e.what() << std::endl;
}

// 5. 等待线程执行完毕
t.join();

return 0;
}

输出:
主线程:正在等待工作线程的结果...
工作线程:正在进行复杂的计算...
(等待 2 秒)
工作线程:计算完成,结果为 42
主线程:从工作线程获取到的结果是 42

async类

和thread类相同,async类也是通过广义函数指针传入来实现调用,使用std::ref来实现引用。默认情况下传输给async的参数会按照值复制或者移动进入线程内部存储中。

并且它和 std::thread 一样,内部会将保有的参数副本转换为右值表达式进行传递,这是为了那些只支持移动的类型,左值引用没办法引用右值表达式,所以如果不使用 std::ref,这里 void f(int&) 就会导致编译错误,如果是 void f(const int&) 则可以通过编译,不过引用的不是我们传递的局部对象。

  • async的执行还要考虑几种执行方式
1
2
3
4
5
6
7
8
9
10
11
void f(){
std::cout << std::this_thread::get_id() << '\n';
}

int main(){
std::cout << std::this_thread::get_id() << '\n';
auto f1 = std::async(std::launch::deferred, f);
f1.wait(); // 在 wait() 或 get() 调用时执行,不创建线程
auto f2 = std::async(std::launch::async,f); // 创建线程执行异步任务
auto f3 = std::async(std::launch::deferred | std::launch::async, f); // 实现选择的执行方式
}
  • 如果从 std::async 获得的 std::future 没有被移动或绑定到引用,那么在完整表达式结尾, std::future 的析构函数将阻塞,直到到异步任务完成。因为临时对象的生存期就在这一行,而对象生存期结束就会调用调用析构函数。
1
2
std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
std::async(std::launch::async, []{ g(); }); // f() 完成前不开始
  • 被移动的 std::future 没有所有权,失去共享状态,不能调用 get、wait 成员函数。
1
2
3
auto t = std::async([] {});
std::future<void> future{ std::move(t) };
t.wait(); // Error! 抛出异常

package_task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

// 1. 任务的定义:一个耗时的计算函数
int heavy_computation(int input) {
std::cout << "Worker thread is performing heavy computation...\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
return input * 2;
}

int main() {
// 2. 将任务打包进 packaged_task
// 模板参数 <int(int)> 表示这是一个返回 int,接受一个 int 参数的函数
std::packaged_task<int(int)> task(heavy_computation);

// 3. 从 task 中获取 future,这是我们获取结果的唯一凭证
std::future<int> result_future = task.get_future();

// 4. 决定如何执行这个 task
// 这里我们创建一个新线程来执行它。
// 注意:packaged_task 是 move-only 的,所以必须用 std::move 传给线程。
std::thread worker_thread(std::move(task), 10); // 传递参数 10 给 heavy_computation

// 主线程可以继续做其他事情
std::cout << "Main thread is doing other work...\n";

// 5. 当主线程需要结果时,调用 get() 等待并获取
std::cout << "Main thread is waiting for the result...\n";
int result = result_future.get(); // 线程会在这里阻塞,直到 worker_thread 执行完毕

std::cout << "The result is: " << result << std::endl;

// 6. 回收线程资源
worker_thread.join();

return 0;
}

限时等待

时钟

阻塞调用线程有些时候需要限定线程等待的时间,常见有两种指定超时的方法,时间点和时间段,即sleep_for\sleep_until

C++11 的 库提供了三种主要的时钟类型:system_clock(系统时钟),代表实际世界的挂钟时间;steady_clock(稳定时钟),一个单调递增、不可被调整的计数时钟,常用于测量时间间隔;以及 high_resolution_clock(高精度时钟),提供最高可用精度的时钟,用于需要精确时间度量的场景

每个时钟类型都提供了四种不同的信息。

  • 当前时间
  • 时间类型
  • 时钟节拍
  • 稳定时钟
    当前时钟可以通过静态成员now获取,例如,std::chrono::system_clock::now() 会返回系统的当前时间。特定的时间点则可以通过 time_point 来指定。system_clock::now() 的返回类型就是 time_point。
  1. system_clock使用
1
2
3
4
5
6
7
8
9
10
11
12
#include <iostream>
#include <chrono>
#include <ctime>

int main() {
// 获取当前时间点
std::chrono::system_clock::time_point now_tp = std::chrono::system_clock::now();

// 转换为 time_t 用于打印
std::time_t now_c = std::chrono::system_clock::to_time_t(now_tp);
std::cout << "system_clock: " << std::ctime(&now_c);
}
  1. steady_Clock使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <chrono>
#include <thread>

int main() {
auto start = std::chrono::steady_clock::now();

// 模拟一段耗时操作
std::this_thread::sleep_for(std::chrono::seconds(1));

auto end = std::chrono::steady_clock::now();

// 计算时间差
std::chrono::duration<double> elapsed_seconds = end - start;
std::cout << "Elapsed time: " << elapsed_seconds.count() << "s\n";
}
  1. high_resolution_clock使用
    这种时钟提供当前系统中可能的最小时钟节拍周期,但是C++标准中并没有规定其具体实现,一般使用steady_CLock替代

本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

undefined