最开始读的时候读的是周全的版本,翻译的真是一点都不周全,耽误了一天,也没看懂什么,后面看到了一些好的文章,这里贴链接(卢瑟国国王真不是盖的
现代C++并发编程教程

并发编程由几个板块组成:

  • 线程管理 (thread)
  • 互斥与同步 (mutex, condition_variable)
  • 异步结果 (future, async, promise, packaged_task)
  • 防死锁策略 (scoped_lock)
    当然这里的列举不全面,接下来希望在一系列文章探讨清楚这件事情。

线程管理

The first

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iosetream>
#include <thread>
using namespace std;
class BackGround{
public:
BackGround(int id):m_id(id);
void operator()() const
{
do_somthing();
}
int m_id;
}
int main(){
BackGround function(11);
thread my_thread(function);//传入广义函数指针
/**
* 包括多种形式
* 例如常规的function()-> function
* 函数对象,上面所做的重载了()的background类
* lamda表达式
* 指向成员函数的指针,thread member_function_thread(&Class::do_function,&my_class)
*/
//不能写成thread my_thread(BackGround(11)),由于C++的二义性,BackGround(11)不会被解析成一个临时对象,因此如果非要在创建thread同时创建仿函数对象,那么使用{}或者多加一对括号
// std::thread my_thread((background(11)));
// std::thread my_thread{background(11)};
// 另外如果希望thread直接修改类内对象应该传入ref
// #include <functional>
// thread t{ref(background(11))}

}
  • 当前环境的支持并发数
    使用int n = std::thread::hardware_concurrency();可以得到

生命周期管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct func
{
int& _i;
func(int& i) : _i(i) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
do_something(_i); // 1. 潜在访问隐患:悬空引用
}
}
};
void oops()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach(); // 2. 不等待线程结束
} //栈销毁,产生悬空指针

使用join或者detach函数来管理线程是非常粗暴的,尽管通过将join放入catch语句中可以保证其不被异常摧毁(书上给出的第二个例子)。但是维护起来较稳困难,如果引入另外的try-catch机制,可能会因为忘记join导致机制失效。最好的方式肯定是RAII,一般来说对于一个难以管控的生命周期,最好的方式就是设计成类,像智能指针一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class scoped_thread {
std::thread t; // 直接持有线程对象,
public:
// 构造函数接收一个右值引用(移动语义)
explicit scoped_thread(std::thread t_) : t(std::move(t_)) {
if (!t.joinable()) {
throw std::logic_error("No thread");
}
}

~scoped_thread() {
if (t.joinable()) {
t.join();
}
}

// 禁止拷贝
scoped_thread(const scoped_thread&) = delete;
scoped_thread& operator=(const scoped_thread&) = delete;
int main (){
scope_thread t(std::thread([](){ }))
}
};

而对于C++20以后可以直接使用std::jthread代替scoped_thread

线程传参

  • 拷贝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void print_data(int id, double value, const std::string& message)
{
std::cout << "线程 " << id << ": " << message << " (值为: " << value << ")" << std::endl;
}

int main()
{
int thread_id = 1;
double data_value = 3.14;
std::string msg = "Hello from thread!";

// 启动线程,直接在后面附上参数
std::thread t(print_data, thread_id, data_value, msg);

t.join();

return 0;
}
  • 引用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void modify_value(int& val) {
std::cout << "子线程:开始修改值..." << std::endl;
val = 100;
std::cout << "子线程:修改完成。" << std::endl;
}

int main()
{
int my_value = 10;
std::cout << "主线程:启动前, my_value = " << my_value << std::endl;

// 使用 std::ref 将 my_value 的引用传递给线程
std::thread t(modify_value, std::ref(my_value));

t.join();

std::cout << "主线程:结束后, my_value = " << my_value << std::endl;

return 0;
}
  • 指针
1
2
3
4
5
6
7
8
9
10
void modify_via_pointer(int* p_val) {
if (p_val) {
*p_val = 200;
}
}

int my_value = 10;
std::thread t(modify_via_pointer, &my_value);
t.join();
// 此时 my_value 变为 200
  • 移动语义对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void process_data(std::unique_ptr<int> p)
{
std::cout << "子线程:接收到数据 " << *p << std::endl;
*p += 10;
std::cout << "子线程:处理后数据 " << *p << std::endl;
}

int main()
{
auto my_ptr = std::make_unique<int>(42);

// 使用 std::move 转移 my_ptr 的所有权
std::thread t(process_data, std::move(my_ptr));

// 转移后,main 函数中的 my_ptr 变为空指针
if (!my_ptr) {
std::cout << "主线程:my_ptr 的所有权已被移交。" << std::endl;
}

t.join();
return 0;
}

std::this_thread命名空间

  • get_id()
1
2
3
4
5
6
7
int main(){
cont<<this::thread::get_id()<<endl;
thread t{ []{
cout<<this_thread::get_id()<<endl;
}}
t.join();
}
  • sleep_for
1
2
3
4
using namespace chrono_literals;
int main(){
this_thread::sleep_for(3s);
}
  • yield
1
2
3
4
while (!isDone()){
//将线程由运行态转为就绪态,并放入就绪队列队尾
std::this_thread::yield();
}
  • sleep_until
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int main() {
// 获取当前时间点
auto now = std::chrono::system_clock::now();

// 设置要等待的时间点为当前时间点之后的5秒
auto wakeup_time = now + 5s;

// 输出当前时间
auto now_time = std::chrono::system_clock::to_time_t(now);
std::cout << "Current time:\t\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;

// 输出等待的时间点
auto wakeup_time_time = std::chrono::system_clock::to_time_t(wakeup_time);
std::cout << "Waiting until:\t\t" << std::put_time(std::localtime(&wakeup_time_time), "%H:%M:%S") << std::endl;

// 等待到指定的时间点
std::this_thread::sleep_until(wakeup_time);

// 输出等待结束后的时间
now = std::chrono::system_clock::now();
now_time = std::chrono::system_clock::to_time_t(now);
std::cout << "Time after waiting:\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;
}

转移所有权

首先thread是可移动但是不可被拷贝的。其次我认为帖子中两个例子给的很好,见下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int main() {
std::thread t{ [] {
std::cout << std::this_thread::get_id() << '\n';
} };
std::cout << t.joinable() << '\n'; // 线程对象 t 当前关联了活跃线程 打印 1
std::thread t2{ std::move(t) }; // 将 t 的线程资源的所有权移交给 t2
std::cout << t.joinable() << '\n'; // 线程对象 t 当前没有关联活跃线程 打印 0
//t.join(); // Error! t 没有线程资源
t2.join(); // t2 当前持有线程资源
}


int main() {
std::thread t; // 默认构造,没有关联活跃线程
std::cout << t.joinable() << '\n'; // 0
std::thread t2{ [] {} };
t = std::move(t2); // 转移线程资源的所有权到 t
std::cout << t.joinable() << '\n'; // 1,joinable代表的是资源是否存在,而不是任务是否存在。
t.join();

t2 = std::thread([] {});//这个临时对象是一个右值,所以移动赋值语句会被自动调用,无需使用move
t2.join();
}

std::thread f(){
std::thread t{ [] {} };
return t;//使用移动构造语句返回所有权
}
int main(){
std::thread rt = f();
rt.join();
}

void f(std::thread t){
t.join();
}
int main(){
std::thread t{ [] {} };
f(std::move(t));
f(std::thread{ [] {} });
}

这里贴上thread源码解析

实现一个joining_thread

即实现自动管理线程析构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class joining_thread{
thread t;
public:
joining_thread() = default;
template<typename Callable, typename... Args>
explicit joining_thread(Callable&& func, Args&&...args) :
t{ std::forward<Callable>(func), std::forward<Args>(args)... } {}
explicit joining_thread(std::thread t_)noexcept : t{ std::move(t_) } {}
joining_thread(joining_thread&& other)noexcept : t{ std::move(other.t) } {}

joining_thread& operator=(std::thread&& other)noexcept {
if (joinable()) { // 如果当前有活跃线程,那就先执行完
join();
}
t = std::move(other);
return *this;
}
~joining_thread(){
if(joinable()){
join();
}
}
void swap(joining_thread& other)noexcept {
t.swap(other.t);
}
std::thread::id get_id()const noexcept {
return t.get_id();
}
bool joinable()const noexcept {
return t.joinable();
}
void join() {
t.join();
}
void detach() {
t.detach();
}
std::thread& data()noexcept {
return t;
}
const std::thread& data()const noexcept {
return t;
}
}

jthread实现

是上面建议实现的joining——thread的增强版。
主要是为了解决 std::thread 的两个痛点:

  1. 忘记 Join 的风险:std::thread 析构时如果还是 joinable 状态,程序会崩溃。std::jthread 在析构时会自动调用 join()。
  2. 缺乏原生停止机制:std::thread 没有标准的“停止”或“中断”线程的方法。std::jthread 内置了协作式停止令牌(Stop Token)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
using namespace std::literals::chrono_literals;
//常规方案使用轮询来检查是否停止
//线程函数第一个参数必须为std::stop_token
void f(std::stop_token stop_token, int value){
while (!stop_token.stop_requested()){ // 检查是否已经收到停止请求
std::cout << value++ << ' ' << std::flush;
std::this_thread::sleep_for(200ms);
}
std::cout << std::endl;
}

int main(){
//打印 1..15 大约 3 秒
//stop_token会自动被传递
std::jthread thread{ f, 1 };

std::this_thread::sleep_for(std::chrono::seconds(2));
t.request_stop();//第一种方法:使用stop_token手动请求停止,需要注意的是,这种唤醒方式不能立刻唤醒沉睡线程
}//第二种方法:或者通过析构RAII对象停止线程

另外还有两种停止线程方案

  1. 立即唤醒沉睡线程并停止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>

std::mutex mtx;
std::condition_variable_any cv; // 注意必须用 _any
bool ready = false;

void waiter(std::stop_token stoken) {
std::unique_lock lk(mtx);

// 这里的 wait 接受 stoken
// 如果 stoken 收到停止请求,wait 会立即返回 false,而不再死等
bool success = cv.wait(lk, stoken, []{ return ready; });

if (stoken.stop_requested()) {
std::cout << "等待被取消,线程退出。\n";
return;
}

std::cout << "收到数据: " << ready << "\n";
}

int main() {
std::jthread t(waiter);

std::this_thread::sleep_for(std::chrono::seconds(1));

// 我们不设置 ready = true,而是直接请求停止
std::cout << "请求线程停止...\n";
t.request_stop(); // 这会直接打断 cv.wait()

return 0;
}
  1. 为了紧急打断那些听不见stop_token 的阻塞操作(如系统 IO),stop_callback 在 request_stop() 被调用的那一瞬间(或者注册时如果已经 stop 了)立即在发起停止的线程(通常是主线程)中执行。
1
2
3
4
5
6
7
8
9
10
11
12
void worker(std::stop_token stoken) {
// 注册一个回调:一旦 stoken.request_stop() 被调用,这个 lambda 就会执行
std::stop_callback callback(stoken, []{
std::cout << "【回调触发】监测到停止信号!\n";
// 这里可以执行一些强制唤醒操作,比如关闭 socket fd
// close(socket_fd);
});

while (!stoken.stop_requested()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

另外jthread还有一个API是get_stop_resource,可以获得子线程的std::stop_source 对象


本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。