共享数据

首先解释锁的基本原理,m.lock() 内部是如何保证原子性的,这涉及到硬件指令和操作系统调用。

  1. 用户态:原子指令 (CAS)
    std::mutex 内部通常有一个 std::atomic 或类似的标志位(比如 0 代表空闲,1 代表占用)。 lock() 不会简单地写 if (flag == 0) flag = 1;,因为这两步不是原子的。

它会使用 CPU 提供的原子指令,如 CAS (Compare-And-Swap) 或 Test-And-Set。

指令逻辑:“查看内存地址 X 的值,如果是 0,就把它改成 1。告诉我修改成功没有。”

这一整套操作在硬件层面是不可打断的。

  1. 内核态:系统调用 (Syscall)
    如果 CAS 操作失败(说明锁被别人占了),std::mutex 不会一直傻傻地空转(那是自旋锁 spinlock 的行为)。

它会发起一个系统调用(System Call):

Linux 下:通常使用 futex (Fast Userspace Mutex)。

Windows 下:通常使用 SrwLock 或 CriticalSection。

这个系统调用会对操作系统说:“我没抢到锁,请把我这个线程挂起(Sleep/Block),并放入等待队列。” 此时,线程进入休眠状态,不再占用 CPU。

当持有锁的线程调用 unlock() 时,它会重置原子标志位,并再次发起系统调用:唤醒队列的下一个任务。

  1. 以下是一段常规的互斥锁设计,基于mutex类中的lock函数的同步阻塞机制实现的。

lock函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <mutex>
std::mutex m;
void f(){
m.lock();
std::cout<<std::this_thread::get_id()<<'\n';
m.unclock();
}
int main(){
std::vector<std::thread> threads;
for (std::size_t i = 0; i < 10; ++i)
threads.emplace_back(f);

for (auto& thread : threads)
thread.join();
}

lock_guard类

1
2
3
4
void f() {
std::lock_guard<std::mutex> lc{ m };
std::cout << std::this_thread::get_id() << '\n';
}

这是一个RAII类设计,初始化上锁,析构解锁。所以也就可以知道,使用这种方法只有当离开了作用域之后,才会解锁(析构)。很明显如果作用域太大,这样的粒度就会很大,文章中提供了这样的一个实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::mutex m;

void add_to_list(int n, std::list<int>& list) {
std::vector<int> numbers(n + 1);
std::iota(numbers.begin(), numbers.end(), 0);
int sum = std::accumulate(numbers.begin(), numbers.end(), 0);

{
std::lock_guard<std::mutex> lc{ m };
list.push_back(sum);
}
}
void print_list(const std::list<int>& list){
std::lock_guard<std::mutex> lc{ m };
for(const auto& i : list){
std::cout << i << ' ';
}
std::cout << '\n';
}

try_lock函数

1
2
3
4
5
6
7
8
9
10
std::mutex m;
void thread_fuction(int id){
if(m.try_lock()){
cout<<"获得锁";//不能再次加锁!!
std::this_thread::sleep_for(std::chrono::millisecongd(100));
m.unlock();
}else{
cout<<"获得锁失败";
}
}

unique_lock类

这是一个灵活的锁,相较于lock_guard,它并不需要使用{}限定作用域来缩小粒度。这个类内部具备lock函数、try_lock()、unlock()、其中lock函数的解析见下一部分。首先介绍这个类的部分源码(参考文章中贴出来的部分

1
2
3
4
5
6
7
8
9
10
11
private:
_Mutex* _Pmtx = nullptr;//互斥量指针
bool _Owns = false;//所有权
public:
unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock
~unique_lock() noexcept {
if (_Owns) {//具有所有权,才能进行unlock
_Pmtx->unlock();
}
}

下面是使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
lhs.m.lock();
rhs.m.lock();
//unique_lock可以在构造时传入第二个参数,因此采取不同的策略具体如下
//1. adopt_lock构造函数不会去调用 m.lock(),它默认你已经锁好了,直接把状态设为“已拥有”。
//如果传入的 mutex 没被锁住,或者被别的线程锁住,程序会崩溃或产生未定义行为。
//主要用于将裸锁交给 unique_lock 管理,以便利用 RAII 自动释放
//2. defer_lock不调用 lock(),初始化状态为“未拥有”。
//构造函数什么都不做,下面是一个简要例程
>// void conditional_task(bool need_lock) {
>// // 先创建一个“未上锁”的管家
>// std::unique_lock<std::mutex> lock(m, std::defer_lock);
>
>// // 做一些不需要锁的初始化工作...
>
>// if (need_lock) {
>// lock.lock(); // 手动上锁
>// // ... 受保护操作 ...
>// }
>
>// // 退出作用域:
>// // 如果之前 lock() 了,析构时会 unlock()
>// // 如果没 lock(),析构时啥也不干
>// }
//3. try_lock尝试上锁,非阻塞。
//这与我们之前谈论的try_lock是相同的,不过是移入unique_lcok中了
std::unique_lock<std::mutex> lock1{ lhs.m, std::adopt_lock };
std::unique_lock<std::mutex> lock2{ rhs.m, std::adopt_lock };
swap(lhs.object, rhs.object);
lock1.unlock();
lock2.unlock();//灵活解锁,控制粒度
}

unique_lock是不可拷贝,但是可移动的,因此可以转移其所有权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <mutex>
#include <fmt/core.h>

std::unique_lock<std::mutex>get_lock(){
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk{ some_mutex };
return lk;// 移动构造:锁的所有权从函数内转移到了函数外
}
void process_data(){
std::unique_lock<std::mutex> my_lock = get_lock();
// 执行一些任务...
}

int main(){
process_data();
fmt::print("End\n");
}

shared_time_mutex和shared_mutex锁

普通的,简单的mutex是不能够满足并发需求的,例如读多写少情况,而读操作是线程安全的。
std::shared_mutex 同样支持 std::lock_guard、std::unique_lock。和 std::mutex 做的一样,保证写线程的独占访问。而那些无需修改数据结构的读线程,可以使用 std::shared_lockstd::shared_mutex 获取访问权,多个线程可以一起读取。shared_time_mutex相较于另一个提供了超时操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Settings {
private:
std::map<std::string, std::string> data_;
mutable std::shared_mutex mutex_; // “M&M 规则”:mutable 与 mutex 一起出现

public:
void set(const std::string& key, const std::string& value) {
std::lock_guard<std::shared_mutex> lock{ mutex_ };
data_[key] = value;
}

std::string get(const std::string& key) const {
std::shared_lock<std::shared_mutex> lock(mutex_);//读操作,共享锁,允许多个线程
//所提的::unique_lock<std::shared_mutex> lock(mutex_)//写操作,独享
auto it = data_.find(key);
return (it != data_.end()) ? it->second : ""; // 如果没有找到键返回空字符串
}
};

recursive_mutex锁

这是为了解决递归调用过程中重复加锁问题的,需要注意,加了多少次锁,就要节多少次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex mtx;

void recursive_function(int count) {
// 递归函数,每次递归都会锁定互斥量
mtx.lock();
std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
if (count > 0) {
recursive_function(count - 1); // 递归调用
}
mtx.unlock(); // 解锁互斥量
}

int main() {
std::thread t1(recursive_function, 3);
std::thread t2(recursive_function, 2);

t1.join();
t2.join();
}

格外注意应该避免出现暴露锁内数据,见下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class data{
int a[];
string b[];
public:
void do_something();
};
class Data_wrapper{
data d;
mutex m;
public:
template<class Func>
void process_data(Func func){
lock_guard<mutex> lv{m};
func(data);

}
};
data *p = nullptr;
void malicious_function(data& d){
p = &d;
}
Data_wrapper d;

void foo(){
d.process_data(malicious_function); // 传递了一个恶意的函数
p->do_something(); // 在无保护的情况下访问保护数据
}

死锁

  1. 死锁问题发生在多个互斥量同时存在,并且争夺对象的时候。下面是一个简单例子,不同的加锁顺序导致了互相等待。
1
2
3
4
5
6
7
8
9
10
mutex m1;
mutex m2;
void swap_turn1(){
lock_guard<mutex> lc1 {m1};
lock_guard<mutex> lc2 {m2};
}
void swap_turn2(){
lock_guard<mutex> lc1 {m2};
lock_guard<mutex> lc2 {m1};
}

对此C++提供了std::lock(std::mutex,std::mutex)函数和std::scope_lock(std::mutex,std::mutex)

lock函数

1
2
3
4
5
6
7
8
9
void swap(CirSource &first,CirSource &second)
{
std::lock(first.dataLock,secode.dataLock);
std::lock_guard<std::mutex> lockf(first.dataLock,std::adopt_lock);
std::lock_guard<std::mutex> locks(second.dataLock,std::adopt_lock);
int temp = first.n1;
first.n1 = second.n1;
second.n1 = temp;
}

对lock的解析来自这篇文章
,文中提到其实现逻辑时首先对一把锁进行加锁,然后对另外一把锁使用try_lock,如果加锁成功则成功返回,如果加锁失败则通过__libcpp_thread_yield进行一次线程切换之后,在通过相反的顺序进行尝试加锁。周而复始对锁进行尝试的加锁。在lock完成上锁后,需要传输adopt_lock参数,guard_lock仅仅时对锁的所有权的转换,而不进行实际加锁操作。

scoped_lock

scope_lock则是将上边的部分封装到类,实现RAII。std::scoped_lock = std::lock_guard (RAII 自动解锁) + std::lock() (多锁防死锁算法) + 语法糖 (自动推导类型)。

1
2
3
4
5
6
7
8
void transfer_new(Account& from, Account& to, int amount) {
// 一行代码搞定:
// 1. 自动推导类型 (不需要写 <std::mutex>)
// 2. 内部使用死锁避免算法,安全锁住 from.m 和 to.m
// 3. 退出作用域自动解锁
std::scoped_lock lock(from.m, to.m);
do_something(&from,&to,amount);
}
  1. 避免死锁
  • 避免死锁最基本的原则毫无疑问就是同一个线程智能持有一个锁。

首先,死锁需要具备四个条件:

互斥:资源不能共享(std::mutex 天然满足)。
持有并等待 (Hold and Wait):一个线程至少持有一个资源,并且正在等待获取另一个被其他线程持有的资源。
非抢占:资源不能被强制夺走,只能由持有者自愿释放(std::mutex 满足)。
循环等待 (Circular Wait):存在一个线程等待链,每个线程都在等待下一个线程持有的资源。

不同线程由于互斥量的存在,不可能同时把持一段数据的。考虑下面的场景

线程1:
成功锁住 mutex_A (持有)。
尝试去锁 mutex_B (等待)。

线程2:
成功锁住 mutex_B (持有)。
尝试去锁 mutex_A (等待)。

如果应用我们提到的原则,那么线程1必须释放A才能获取B,自然避免死锁。

但是同样毫无疑问,真实的业务场景怎么可能这么简单,最常见的是需要对数据进行拷贝或者赋值,那么就需要对原来变量以及操作变量同时上锁,这就需要使用上面提供的一些RAII类,他们只会有两个返回,而绝不会只锁住一半。

成功锁住所有的锁
抛出异常并释放原来锁

下面是另外的解决需要持有多个锁的时候,注意事项

  1. 避免在持有锁的时候调用外部代码

指用户传入的回调函数(Callback)、虚函数(Virtual Function)或者第三方库的函数。主要是你不知道这些代码内部会干什么。
也许它内部试图去锁你手里已经拿住的这个锁 -> 自死锁(Self-Deadlock,除非用 recursive_mutex)。也许它内部试图去锁另一个锁 B,而另一个线程正好拿着 B 等你的锁 -> ABBA 死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Server {
std::mutex m;
std::vector<std::function<void()>> callbacks;

public:
void add_callback(std::function<void()> cb) {
std::lock_guard<std::mutex> lk(m);
callbacks.push_back(cb);
}

void do_work() {
std::lock_guard<std::mutex> lk(m); // 1. 拿锁

// 2. 调用回调
//如果这个 callback 内部调用了 add_callback() 或者其他需要锁 m 的函数,
// 就会立刻死锁!
for (auto& cb : callbacks) cb();
}
};
  1. 使用固定顺序获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BankAccount {
public:
double balance;
std::mutex m;
// ...
};

// 一个线程安全的转账函数
void transfer(BankAccount& from, BankAccount& to, double amount) {

// 为了避免死锁,我们必须按固定顺序锁住 from.m 和 to.m
// 我们约定:总是先锁地址较小的那个互斥量

if (&from.m < &to.m) {
// from 的地址小,先锁 from,再锁 to
std::lock_guard<std::mutex> lock_from(from.m);
std::lock_guard<std::mutex> lock_to(to.m);

from.balance -= amount;
to.balance += amount;
} else {
// to 的地址小,先锁 to,再锁 from
std::lock_guard<std::mutex> lock_to(to.m);
std::lock_guard<std::mutex> lock_from(from.m);

from.balance -= amount;
to.balance += amount;
}
}

还有其他的策略,比如
3. 使用try_lock->back_off->try_lock策略
4. 尽量避免嵌套锁
5. 锁的粒度控制,粒度大不容易死锁,但是并行差;反之亦然

其他保护共享数据方法

保护共享数据并非必须使用互斥量,互斥量只是其中一种常见的方式而已,对于一些特殊的场景,也有专门的保护方式,比如对于共享数据的初始化过程的保护。我们通常就不会用互斥量,这会造成很多的额外开销。

  1. 首先应该提到单例模式的初始化,对于C++11以上,静态变量的初始化是线程安全的因此可以利用static保护共享对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class FormToolBox {
public:
static FormToolBox& GetInstance() {
// 这行代码在多线程环境下是安全的,且只执行一次
static FormToolBox instance;
return instance;
}
FormToolBox(const FormToolBox&) = delete;
FormToolBox& operator=(const FormToolBox&) = delete;
private:
FormToolBox() {
std::cout << "FormToolBox instance created." << std::endl;
}
~FormToolBox() = default;
};

但是对于单例要少用,这是一个设计模式上面的问题,如果将各个状态都存在但李重,那么这一个类就会变成一锅粥,遑论什么模式了,如果想要做单元测试,一旦设计到单例,那么很可能还要开启功能B、C、D的调用。常规编码中还是传指针或者使用依赖注入
2. 常规双检锁
这个应该少用,例如reset函数并不是原子性的,而是分成三步,分配内存,移动指针,实例对象。有可能会导致do_something在移动指针后就执行,会报错。

1
2
3
4
5
6
7
8
9
void f(){
if(!ptr){ // 1
std::lock_guard<std::mutex> lk{ m };
if(!ptr){ // 2
ptr.reset(new some); // 3
}
}
ptr->do_something(); // 4
}
  1. call_once安全方法
1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<some> ptr;
std::once_flag resource_flag;

void init_resource(){
ptr.reset(new some);
}

void foo(){
std::call_once(resource_flag, init_resource); // 线程安全的一次初始化
ptr->do_something();
}

线程变量

这里推荐一篇文章.文章介绍了多线程变量模型,以及TLS变量
另外注意使用operator new替换new,使用operator delete替换delete

thread_local

这是一个在C++11中提出的,类似文章中使用_thread定义线程局部变量的关键字。
除了C++11,各个编译器,系统也有不同的接口定义这样一个TLS变量

POSIX:使用 pthread_key_t 和相关的函数( pthread_key_create、pthread_setspecific、pthread_getspecific 和pthread_key_delete)>来管理线程局部存储。
Win32:使用 TLS(Thread Local Storage)机制,通过函数 TlsAlloc、TlsSetValue、TlsGetValue 和 TlsFree 来实现线程局部存储。
GCC:使用 __thread 。
MSVC:使用 __declspec(thread)。

CPU变量

除了推荐文章中提出的静态(全局)变量、线程变量等,还有一类CPU变量:

它在标准 C++ 中无对应抽象实现,是操作系统内核功能。它主要依赖于当前系统内核来进行使用,也无法跨平台。基本概念与线程变量类似:CPU 变量是为每个处理器单独分配的变量副本。
CPU 变量的使用是少见的,主要用于内核开发和追求极致性能的高并发场景,减少 CPU 同步开销。


本站由 Edison.Chen 创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。