最开始读的时候读的是周全的版本,翻译的真是一点都不周全,耽误了一天,也没看懂什么,后面看到了一些好的文章,这里贴链接(卢瑟国国王真不是盖的
现代C++并发编程教程
并发编程由几个板块组成:
- 线程管理 (thread)
- 互斥与同步 (mutex, condition_variable)
- 异步结果 (future, async, promise, packaged_task)
- 防死锁策略 (scoped_lock)
当然这里的列举不全面,接下来希望在一系列文章探讨清楚这件事情。
线程管理
The first
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <iosetream> #include <thread> using namespace std; class BackGround{ public: BackGround(int id):m_id(id); void operator()() const { do_somthing(); } int m_id; } int main(){ BackGround function(11); thread my_thread(function);
}
|
- 当前环境的支持并发数
使用int n = std::thread::hardware_concurrency();可以得到
生命周期管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| struct func { int& _i; func(int& i) : _i(i) {} void operator() () { for (unsigned j=0 ; j<1000000 ; ++j) { do_something(_i); } } }; void oops() { int some_local_state=0; func my_func(some_local_state); std::thread my_thread(my_func); my_thread.detach(); }
|
使用join或者detach函数来管理线程是非常粗暴的,尽管通过将join放入catch语句中可以保证其不被异常摧毁(书上给出的第二个例子)。但是维护起来较稳困难,如果引入另外的try-catch机制,可能会因为忘记join导致机制失效。最好的方式肯定是RAII,一般来说对于一个难以管控的生命周期,最好的方式就是设计成类,像智能指针一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class scoped_thread { std::thread t; public: explicit scoped_thread(std::thread t_) : t(std::move(t_)) { if (!t.joinable()) { throw std::logic_error("No thread"); } }
~scoped_thread() { if (t.joinable()) { t.join(); } }
scoped_thread(const scoped_thread&) = delete; scoped_thread& operator=(const scoped_thread&) = delete; int main (){ scope_thread t(std::thread([](){ })) } };
|
而对于C++20以后可以直接使用std::jthread代替scoped_thread
线程传参
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void print_data(int id, double value, const std::string& message) { std::cout << "线程 " << id << ": " << message << " (值为: " << value << ")" << std::endl; }
int main() { int thread_id = 1; double data_value = 3.14; std::string msg = "Hello from thread!";
std::thread t(print_data, thread_id, data_value, msg);
t.join();
return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void modify_value(int& val) { std::cout << "子线程:开始修改值..." << std::endl; val = 100; std::cout << "子线程:修改完成。" << std::endl; }
int main() { int my_value = 10; std::cout << "主线程:启动前, my_value = " << my_value << std::endl; std::thread t(modify_value, std::ref(my_value)); t.join(); std::cout << "主线程:结束后, my_value = " << my_value << std::endl;
return 0; }
|
1 2 3 4 5 6 7 8 9 10
| void modify_via_pointer(int* p_val) { if (p_val) { *p_val = 200; } }
int my_value = 10; std::thread t(modify_via_pointer, &my_value); t.join();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void process_data(std::unique_ptr<int> p) { std::cout << "子线程:接收到数据 " << *p << std::endl; *p += 10; std::cout << "子线程:处理后数据 " << *p << std::endl; }
int main() { auto my_ptr = std::make_unique<int>(42);
std::thread t(process_data, std::move(my_ptr));
if (!my_ptr) { std::cout << "主线程:my_ptr 的所有权已被移交。" << std::endl; }
t.join(); return 0; }
|
std::this_thread命名空间
1 2 3 4 5 6 7
| int main(){ cont<<this::thread::get_id()<<endl; thread t{ []{ cout<<this_thread::get_id()<<endl; }} t.join(); }
|
1 2 3 4
| using namespace chrono_literals; int main(){ this_thread::sleep_for(3s); }
|
1 2 3 4
| while (!isDone()){ std::this_thread::yield(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| int main() { auto now = std::chrono::system_clock::now();
auto wakeup_time = now + 5s;
auto now_time = std::chrono::system_clock::to_time_t(now); std::cout << "Current time:\t\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl;
auto wakeup_time_time = std::chrono::system_clock::to_time_t(wakeup_time); std::cout << "Waiting until:\t\t" << std::put_time(std::localtime(&wakeup_time_time), "%H:%M:%S") << std::endl;
std::this_thread::sleep_until(wakeup_time);
now = std::chrono::system_clock::now(); now_time = std::chrono::system_clock::to_time_t(now); std::cout << "Time after waiting:\t" << std::put_time(std::localtime(&now_time), "%H:%M:%S") << std::endl; }
|
转移所有权
首先thread是可移动但是不可被拷贝的。其次我认为帖子中两个例子给的很好,见下面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| int main() { std::thread t{ [] { std::cout << std::this_thread::get_id() << '\n'; } }; std::cout << t.joinable() << '\n'; std::thread t2{ std::move(t) }; std::cout << t.joinable() << '\n'; t2.join(); }
int main() { std::thread t; std::cout << t.joinable() << '\n'; std::thread t2{ [] {} }; t = std::move(t2); std::cout << t.joinable() << '\n'; t.join(); t2 = std::thread([] {}); t2.join(); }
std::thread f(){ std::thread t{ [] {} }; return t; } int main(){ std::thread rt = f(); rt.join(); }
void f(std::thread t){ t.join(); } int main(){ std::thread t{ [] {} }; f(std::move(t)); f(std::thread{ [] {} }); }
|
这里贴上thread源码解析
实现一个joining_thread
即实现自动管理线程析构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| class joining_thread{ thread t; public: joining_thread() = default; template<typename Callable, typename... Args> explicit joining_thread(Callable&& func, Args&&...args) : t{ std::forward<Callable>(func), std::forward<Args>(args)... } {} explicit joining_thread(std::thread t_)noexcept : t{ std::move(t_) } {} joining_thread(joining_thread&& other)noexcept : t{ std::move(other.t) } {}
joining_thread& operator=(std::thread&& other)noexcept { if (joinable()) { join(); } t = std::move(other); return *this; } ~joining_thread(){ if(joinable()){ join(); } } void swap(joining_thread& other)noexcept { t.swap(other.t); } std::thread::id get_id()const noexcept { return t.get_id(); } bool joinable()const noexcept { return t.joinable(); } void join() { t.join(); } void detach() { t.detach(); } std::thread& data()noexcept { return t; } const std::thread& data()const noexcept { return t; } }
|
jthread实现
是上面建议实现的joining——thread的增强版。
主要是为了解决 std::thread 的两个痛点:
- 忘记 Join 的风险:std::thread 析构时如果还是 joinable 状态,程序会崩溃。std::jthread 在析构时会自动调用 join()。
- 缺乏原生停止机制:std::thread 没有标准的“停止”或“中断”线程的方法。std::jthread 内置了协作式停止令牌(Stop Token)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| using namespace std::literals::chrono_literals;
void f(std::stop_token stop_token, int value){ while (!stop_token.stop_requested()){ std::cout << value++ << ' ' << std::flush; std::this_thread::sleep_for(200ms); } std::cout << std::endl; }
int main(){ std::jthread thread{ f, 1 };
std::this_thread::sleep_for(std::chrono::seconds(2)); t.request_stop(); }
|
另外还有两种停止线程方案
- 立即唤醒沉睡线程并停止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include <iostream> #include <thread> #include <condition_variable> #include <mutex>
std::mutex mtx; std::condition_variable_any cv; bool ready = false;
void waiter(std::stop_token stoken) { std::unique_lock lk(mtx); bool success = cv.wait(lk, stoken, []{ return ready; });
if (stoken.stop_requested()) { std::cout << "等待被取消,线程退出。\n"; return; } std::cout << "收到数据: " << ready << "\n"; }
int main() { std::jthread t(waiter);
std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "请求线程停止...\n"; t.request_stop();
return 0; }
|
- 为了紧急打断那些听不见stop_token 的阻塞操作(如系统 IO),stop_callback 在 request_stop() 被调用的那一瞬间(或者注册时如果已经 stop 了)立即在发起停止的线程(通常是主线程)中执行。
1 2 3 4 5 6 7 8 9 10 11 12
| void worker(std::stop_token stoken) { std::stop_callback callback(stoken, []{ std::cout << "【回调触发】监测到停止信号!\n"; });
while (!stoken.stop_requested()) { std::this_thread::sleep_for(std::chrono::seconds(1)); } }
|
另外jthread还有一个API是get_stop_resource,可以获得子线程的std::stop_source 对象