共享数据
锁
首先解释锁的基本原理,m.lock() 内部是如何保证原子性的,这涉及到硬件指令和操作系统调用。
- 用户态:原子指令 (CAS)
std::mutex 内部通常有一个 std::atomic 或类似的标志位(比如 0 代表空闲,1 代表占用)。 lock() 不会简单地写 if (flag == 0) flag = 1;,因为这两步不是原子的。
它会使用 CPU 提供的原子指令,如 CAS (Compare-And-Swap) 或 Test-And-Set。
指令逻辑:“查看内存地址 X 的值,如果是 0,就把它改成 1。告诉我修改成功没有。”
这一整套操作在硬件层面是不可打断的。
- 内核态:系统调用 (Syscall)
如果 CAS 操作失败(说明锁被别人占了),std::mutex 不会一直傻傻地空转(那是自旋锁 spinlock 的行为)。
它会发起一个系统调用(System Call):
Linux 下:通常使用 futex (Fast Userspace Mutex)。
Windows 下:通常使用 SrwLock 或 CriticalSection。
这个系统调用会对操作系统说:“我没抢到锁,请把我这个线程挂起(Sleep/Block),并放入等待队列。” 此时,线程进入休眠状态,不再占用 CPU。
当持有锁的线程调用 unlock() 时,它会重置原子标志位,并再次发起系统调用:唤醒队列的下一个任务。
- 以下是一段常规的互斥锁设计,基于mutex类中的lock函数的同步阻塞机制实现的。
lock函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #include <mutex> std::mutex m; void f(){ m.lock(); std::cout<<std::this_thread::get_id()<<'\n'; m.unclock(); } int main(){ std::vector<std::thread> threads; for (std::size_t i = 0; i < 10; ++i) threads.emplace_back(f);
for (auto& thread : threads) thread.join(); }
|
lock_guard类
1 2 3 4
| void f() { std::lock_guard<std::mutex> lc{ m }; std::cout << std::this_thread::get_id() << '\n'; }
|
这是一个RAII类设计,初始化上锁,析构解锁。所以也就可以知道,使用这种方法只有当离开了作用域之后,才会解锁(析构)。很明显如果作用域太大,这样的粒度就会很大,文章中提供了这样的一个实现方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| std::mutex m;
void add_to_list(int n, std::list<int>& list) { std::vector<int> numbers(n + 1); std::iota(numbers.begin(), numbers.end(), 0); int sum = std::accumulate(numbers.begin(), numbers.end(), 0);
{ std::lock_guard<std::mutex> lc{ m }; list.push_back(sum); } } void print_list(const std::list<int>& list){ std::lock_guard<std::mutex> lc{ m }; for(const auto& i : list){ std::cout << i << ' '; } std::cout << '\n'; }
|
try_lock函数
1 2 3 4 5 6 7 8 9 10
| std::mutex m; void thread_fuction(int id){ if(m.try_lock()){ cout<<"获得锁"; std::this_thread::sleep_for(std::chrono::millisecongd(100)); m.unlock(); }else{ cout<<"获得锁失败"; } }
|
unique_lock类
这是一个灵活的锁,相较于lock_guard,它并不需要使用{}限定作用域来缩小粒度。这个类内部具备lock函数、try_lock()、unlock()、其中lock函数的解析见下一部分。首先介绍这个类的部分源码(参考文章中贴出来的部分
1 2 3 4 5 6 7 8 9 10 11
| private: _Mutex* _Pmtx = nullptr; bool _Owns = false; public: unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept : _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} ~unique_lock() noexcept { if (_Owns) { _Pmtx->unlock(); } }
|
下面是使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| void swap(X& lhs, X& rhs) { if (&lhs == &rhs) return; lhs.m.lock(); rhs.m.lock(); > > > > > > > > > > > > > > > std::unique_lock<std::mutex> lock1{ lhs.m, std::adopt_lock }; std::unique_lock<std::mutex> lock2{ rhs.m, std::adopt_lock }; swap(lhs.object, rhs.object); lock1.unlock(); lock2.unlock(); }
|
unique_lock是不可拷贝,但是可移动的,因此可以转移其所有权
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #include <mutex> #include <fmt/core.h>
std::unique_lock<std::mutex>get_lock(){ extern std::mutex some_mutex; std::unique_lock<std::mutex> lk{ some_mutex }; return lk; } void process_data(){ std::unique_lock<std::mutex> my_lock = get_lock(); }
int main(){ process_data(); fmt::print("End\n"); }
|
shared_time_mutex和shared_mutex锁
普通的,简单的mutex是不能够满足并发需求的,例如读多写少情况,而读操作是线程安全的。
std::shared_mutex 同样支持 std::lock_guard、std::unique_lock。和 std::mutex 做的一样,保证写线程的独占访问。而那些无需修改数据结构的读线程,可以使用 std::shared_lockstd::shared_mutex 获取访问权,多个线程可以一起读取。shared_time_mutex相较于另一个提供了超时操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class Settings { private: std::map<std::string, std::string> data_; mutable std::shared_mutex mutex_;
public: void set(const std::string& key, const std::string& value) { std::lock_guard<std::shared_mutex> lock{ mutex_ }; data_[key] = value; }
std::string get(const std::string& key) const { std::shared_lock<std::shared_mutex> lock(mutex_); auto it = data_.find(key); return (it != data_.end()) ? it->second : ""; } };
|
recursive_mutex锁
这是为了解决递归调用过程中重复加锁问题的,需要注意,加了多少次锁,就要节多少次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #include <iostream> #include <thread> #include <mutex>
std::recursive_mutex mtx;
void recursive_function(int count) { mtx.lock(); std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl; if (count > 0) { recursive_function(count - 1); } mtx.unlock(); }
int main() { std::thread t1(recursive_function, 3); std::thread t2(recursive_function, 2);
t1.join(); t2.join(); }
|
格外注意应该避免出现暴露锁内数据,见下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class data{ int a[]; string b[]; public: void do_something(); }; class Data_wrapper{ data d; mutex m; public: template<class Func> void process_data(Func func){ lock_guard<mutex> lv{m}; func(data);
} }; data *p = nullptr; void malicious_function(data& d){ p = &d; } Data_wrapper d;
void foo(){ d.process_data(malicious_function); p->do_something(); }
|
死锁
- 死锁问题发生在多个互斥量同时存在,并且争夺对象的时候。下面是一个简单例子,不同的加锁顺序导致了互相等待。
1 2 3 4 5 6 7 8 9 10
| mutex m1; mutex m2; void swap_turn1(){ lock_guard<mutex> lc1 {m1}; lock_guard<mutex> lc2 {m2}; } void swap_turn2(){ lock_guard<mutex> lc1 {m2}; lock_guard<mutex> lc2 {m1}; }
|
对此C++提供了std::lock(std::mutex,std::mutex)函数和std::scope_lock(std::mutex,std::mutex)
lock函数
1 2 3 4 5 6 7 8 9
| void swap(CirSource &first,CirSource &second) { std::lock(first.dataLock,secode.dataLock); std::lock_guard<std::mutex> lockf(first.dataLock,std::adopt_lock); std::lock_guard<std::mutex> locks(second.dataLock,std::adopt_lock); int temp = first.n1; first.n1 = second.n1; second.n1 = temp; }
|
对lock的解析来自这篇文章
,文中提到其实现逻辑时首先对一把锁进行加锁,然后对另外一把锁使用try_lock,如果加锁成功则成功返回,如果加锁失败则通过__libcpp_thread_yield进行一次线程切换之后,在通过相反的顺序进行尝试加锁。周而复始对锁进行尝试的加锁。在lock完成上锁后,需要传输adopt_lock参数,guard_lock仅仅时对锁的所有权的转换,而不进行实际加锁操作。
scoped_lock
scope_lock则是将上边的部分封装到类,实现RAII。std::scoped_lock = std::lock_guard (RAII 自动解锁) + std::lock() (多锁防死锁算法) + 语法糖 (自动推导类型)。
1 2 3 4 5 6 7 8
| void transfer_new(Account& from, Account& to, int amount) { std::scoped_lock lock(from.m, to.m); do_something(&from,&to,amount); }
|
- 避免死锁
- 避免死锁最基本的原则毫无疑问就是同一个线程智能持有一个锁。
首先,死锁需要具备四个条件:
互斥:资源不能共享(std::mutex 天然满足)。
持有并等待 (Hold and Wait):一个线程至少持有一个资源,并且正在等待获取另一个被其他线程持有的资源。
非抢占:资源不能被强制夺走,只能由持有者自愿释放(std::mutex 满足)。
循环等待 (Circular Wait):存在一个线程等待链,每个线程都在等待下一个线程持有的资源。
不同线程由于互斥量的存在,不可能同时把持一段数据的。考虑下面的场景
线程1:
成功锁住 mutex_A (持有)。
尝试去锁 mutex_B (等待)。
线程2:
成功锁住 mutex_B (持有)。
尝试去锁 mutex_A (等待)。
如果应用我们提到的原则,那么线程1必须释放A才能获取B,自然避免死锁。
但是同样毫无疑问,真实的业务场景怎么可能这么简单,最常见的是需要对数据进行拷贝或者赋值,那么就需要对原来变量以及操作变量同时上锁,这就需要使用上面提供的一些RAII类,他们只会有两个返回,而绝不会只锁住一半。
成功锁住所有的锁
抛出异常并释放原来锁
下面是另外的解决需要持有多个锁的时候,注意事项
- 避免在持有锁的时候调用外部代码
指用户传入的回调函数(Callback)、虚函数(Virtual Function)或者第三方库的函数。主要是你不知道这些代码内部会干什么。
也许它内部试图去锁你手里已经拿住的这个锁 -> 自死锁(Self-Deadlock,除非用 recursive_mutex)。也许它内部试图去锁另一个锁 B,而另一个线程正好拿着 B 等你的锁 -> ABBA 死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class Server { std::mutex m; std::vector<std::function<void()>> callbacks;
public: void add_callback(std::function<void()> cb) { std::lock_guard<std::mutex> lk(m); callbacks.push_back(cb); }
void do_work() { std::lock_guard<std::mutex> lk(m);
for (auto& cb : callbacks) cb(); } };
|
- 使用固定顺序获取锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class BankAccount { public: double balance; std::mutex m; };
void transfer(BankAccount& from, BankAccount& to, double amount) { if (&from.m < &to.m) { std::lock_guard<std::mutex> lock_from(from.m); std::lock_guard<std::mutex> lock_to(to.m); from.balance -= amount; to.balance += amount; } else { std::lock_guard<std::mutex> lock_to(to.m); std::lock_guard<std::mutex> lock_from(from.m); from.balance -= amount; to.balance += amount; } }
|
还有其他的策略,比如
3. 使用try_lock->back_off->try_lock策略
4. 尽量避免嵌套锁
5. 锁的粒度控制,粒度大不容易死锁,但是并行差;反之亦然
其他保护共享数据方法
保护共享数据并非必须使用互斥量,互斥量只是其中一种常见的方式而已,对于一些特殊的场景,也有专门的保护方式,比如对于共享数据的初始化过程的保护。我们通常就不会用互斥量,这会造成很多的额外开销。
- 首先应该提到单例模式的初始化,对于C++11以上,静态变量的初始化是线程安全的因此可以利用static保护共享对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class FormToolBox { public: static FormToolBox& GetInstance() { static FormToolBox instance; return instance; } FormToolBox(const FormToolBox&) = delete; FormToolBox& operator=(const FormToolBox&) = delete; private: FormToolBox() { std::cout << "FormToolBox instance created." << std::endl; } ~FormToolBox() = default; };
|
但是对于单例要少用,这是一个设计模式上面的问题,如果将各个状态都存在但李重,那么这一个类就会变成一锅粥,遑论什么模式了,如果想要做单元测试,一旦设计到单例,那么很可能还要开启功能B、C、D的调用。常规编码中还是传指针或者使用依赖注入
2. 常规双检锁
这个应该少用,例如reset函数并不是原子性的,而是分成三步,分配内存,移动指针,实例对象。有可能会导致do_something在移动指针后就执行,会报错。
1 2 3 4 5 6 7 8 9
| void f(){ if(!ptr){ std::lock_guard<std::mutex> lk{ m }; if(!ptr){ ptr.reset(new some); } } ptr->do_something(); }
|
- call_once安全方法
1 2 3 4 5 6 7 8 9 10 11
| std::shared_ptr<some> ptr; std::once_flag resource_flag;
void init_resource(){ ptr.reset(new some); }
void foo(){ std::call_once(resource_flag, init_resource); ptr->do_something(); }
|
线程变量
这里推荐一篇文章.文章介绍了多线程变量模型,以及TLS变量
另外注意使用operator new替换new,使用operator delete替换delete
thread_local
这是一个在C++11中提出的,类似文章中使用_thread定义线程局部变量的关键字。
除了C++11,各个编译器,系统也有不同的接口定义这样一个TLS变量
POSIX:使用 pthread_key_t 和相关的函数( pthread_key_create、pthread_setspecific、pthread_getspecific 和pthread_key_delete)>来管理线程局部存储。
Win32:使用 TLS(Thread Local Storage)机制,通过函数 TlsAlloc、TlsSetValue、TlsGetValue 和 TlsFree 来实现线程局部存储。
GCC:使用 __thread 。
MSVC:使用 __declspec(thread)。
CPU变量
除了推荐文章中提出的静态(全局)变量、线程变量等,还有一类CPU变量:
它在标准 C++ 中无对应抽象实现,是操作系统内核功能。它主要依赖于当前系统内核来进行使用,也无法跨平台。基本概念与线程变量类似:CPU 变量是为每个处理器单独分配的变量副本。
CPU 变量的使用是少见的,主要用于内核开发和追求极致性能的高并发场景,减少 CPU 同步开销。