线程安全问题
1 2 3 int count = 0 ;count++; count--;
后置++
有多个动作:
取出count的值为tmp
计算count + 1
count + 1
的结果赋给count
返回tmp
对应于多条指令。
传统地解决,可以用加锁的方式
1 2 3 4 { lock_guard<std::mutex> guard (mtx) ; count++; }
但是,互斥锁是比较耗费资源的,如果临界区的代码比较轻量级,那么传统mutex锁相对而言就比较小题大做了。
现在有新机制解决:指令级并发。
如果想让这么多动作在一条指令内做完的话,需要让处理器支持这样的操作,而不用锁机制,因此这种叫做无锁结构。
原子类型
定义于<atomic>
原子类型,封装了一个值,保证其的访问不会导致数据竞争,并且可用于同步不同线程之间的内存访问。
atomic_flag
初始化可以赋值为ATOMIC_FLAG_INIT
,意为无状态,对应false。
test_and_set()
:置位为有状态,对应true,并返回调用之前的状态。这是一个原子操作,不会被其他线程干扰。
clear()
重置flag为无状态
atomic_flag
做自旋锁
以下就是一个用atomic_flag
做忙等待的例子:
初始flag为无状态
调用一次test_and_set()
置位flag为有状态
while中一直调用test_and_set()
,测试它的状态,直到为无状态时,退出循环,结束程序。
1 2 3 4 5 6 7 8 9 10 11 12 #include <atomic> int main () { std::atomic_flag flag = ATOMIC_FLAG_INIT; flag.test_and_set (); while (flag.test_and_set ()) { continue ; } return 0 ; }
给定一个atomic_flag
,初始为无状态,同时启动10个线程,操作之前,调用test_and_set
,如果测试为无状态,说明没有其他线程在操作,就可以进行操作。
操作完毕后,clear
重置flag为无状态。下一个线程就可以探测到无状态,开始它的操作。
此时,atomic_flag
就相当于自旋锁的作用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 std::atomic_flag lock_output = ATOMIC_FLAG_INIT; std::counting_semaphore<10 > sema{ 0 }; void worker (int v) { while (lock_output.test_and_set ()) {} std::cout << "thread #" << v << std::endl; lock_output.clear (); } int main () { std::jthread t[10 ]; for (int i = 0 ; i < 10 ; ++i) { t[i] = std::jthread (&worker, i + 1 ); } sema.release (10 ); using namespace std::chrono_literals; std::this_thread::sleep_for (5 s); return 0 ; }
输出:
1 2 3 4 5 6 7 8 9 10 thread #1 thread #5 thread #4 thread #7 thread #2 thread #10 thread #6 thread #3 thread #9 thread #8
atomic
1 template <class T > struct atomic ;
原子对象的主要特征是,从不同线程访问值不会导致数据竞争(即,这样做是明确定义的行为,访问顺序正确)。
通常,对于所有其他对象,如果同时访问同一对象而导致数据争用,则该操作将被视为未定义行为。
原子对象能够通过指定不同的内存顺序 来同步对其线程中其他非原子对象的访问 。
Relaxed Ordering的问题
1 2 3 4 5 6 r1 = y.load (std::memory_order_relaxed); x.store (r1, std::memory_order_relaxed); r2 = x.load (std::memory_order_relaxed); y.store (42 , std::memory_order_relaxed);
标记为 memory_order_relaxed
的原子操作不是同步操作。它们不会在并发内存访问中强加顺序,它们只保证原子性和修改顺序的一致性。
例如,x 和 y 初始为零。
以上程序就会允许产生 r1 == r2 == 42
,在线程 1 内,A 在 B 之前被排序,并且在线程 2 内,C 在 D 之前被排序。
但是没有什么可以阻止 D 在 A 之前 修改了 y,并且 B 在 C 之前 修改 x 。
D 对 y 的副作用对于线程 1 中的 load A 是可见的,B 对 x 的副作用对于线程 2 中的 load C 是可见的。
特别是,如果在线程 2 中 D 在 C 之前完成,这可能会发生,这可能是在运行时发生的,或者由于编译器重新排序导致的。
实际上,A、B是不能调换的,因为编译器会看到,同在线程1中,B中的 r1
的值依赖于上一句的A对 r1
的操作。
而C、D之间就没有依赖了,因为 D 操作的是 42
,是个常量。
所以,经过线程并发、内存重排,可能的执行顺序有:
A B C D 此时 x、y、r1、r2的值:0、0、0、42
A B D C 此时 x、y、r1、r2的值:0、42、0、0
A C D B 此时 x、y、r1、r2的值:0、42、0、0
A D C B 此时 x、y、r1、r2的值:0、42、0、0
C A D B 此时 x、y、r1、r2的值:0、42、0、0
D A C B 此时 x、y、r1、r2的值:42、42、42、42 => 这时便出现了r1 == r2 == 42
C D A B 此时 x、y、r1、r2的值:42、42、42、0
D C A B 此时 x、y、r1、r2的值:0、42、42、0
怎么解决呢?
见下文
内存顺序
原子操作只是提供了不同线程读写的同步。
但是没有提供操作顺序的同步 。
比如:线程1修改原子值a,线程2读取原子值a。
两个线程各自的读写操作,确实是保证了不会有脏值。
但是线程1、线程2的顺序没有做控制,
如果线程2想要读出旧值,但是线程1在线程2读值前进行写操作,还是会导致线程2读出脏值。
因此需要提供内存顺序机制。
下面举一个类似的例子,怎么通过原子变量+内存顺序控制,让consumer确保producer执行完毕后,再 load 出 b 的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void producer (void ) { a.store (true , std::memory_order::relaxed); b.store (true , std::memory_order::release); } void consumer (void ) { while (!b.load (std::memory_order::acquire)) {} if (a.load (std::memory_order::relaxed)) ++c; } int main () { a = false ; b = false ; c = 0 ; using namespace std::chrono::literals; std::jthread th2 (consumer) ; std::this_thread::sleep_for (2 s); std::jthread th (producer) ; th.join (); th2. join (); }
类型
含义
relaxed
CPU和编译器可以重新排序变量顺序。 这是一种松散的内存顺序,不保证不同线程中的内存访问对原子操作进行排序。
consume(废弃)
针对某个原子变量的访问指令(store)重排到此指令(load)前。即自己load排到针对某个变量的release操作后 。(在C++26
中被废弃了,推荐用acquire)
acquire
所有访问指令(store)排到此指令(load)前。即自己load排到所有release操作后 。
release
所有访问指令(load)排到此指令(store)之后。即自己store排到所有consume、acquire操作之前。 扮演了一个同步点(synchronization point)的角色。
acq_rel
The operation loads acquiring and stores releasing。 该操作可能扮演两种角色。 比如std::atomic::exchange
操作,两个变量交换: 需要load值,可能需要等待release; 需要store值,即产生release,需要给其他acquire、consume通知。
seq_cst
sequentially consistent,意思是该操作以顺序一致的方式排序。一旦所有可能对其他线程产生可见副作用的内存访问已经发生,则所有操作使用此内存顺序。 这是最严格的内存顺序,保证了在非原子内存访问中线程交互之间的意外副作用最小。 对于consume和acquire的load,顺序一致的store操作被认为是release操作。
lock_free测试
测试以确定原子模板包含的类型是否支持无锁。
1 2 3 4 5 int main () { std::wcout << std::boolalpha << std::atomic<int >{}.is_lock_free () << std::endl; std::wcout << std::boolalpha << std::atomic<int >{}.is_always_lock_free << std::endl; }
经过测试后发现,不超过8字节(64位)的数据结构,并且没有虚函数表的数据结构,是支持无锁的。
1 2 3 4 5 6 7 8 9 10 struct A {int x, y; A () {}}struct B {int a[2 ];}struct C {int a[3 ];}int main () { std::wcout << std::boolalpha << std::atomic<A>{}.is_lock_free () << std::endl; std::wcout << std::boolalpha << std::atomic<B>{}.is_lock_free () << std::endl; std::wcout << std::boolalpha << std::atomic<C>{}.is_lock_free () << std::endl; }
无锁编程的原理
CAS(Compare-And-Swap)
CAS在执行事务时把整个数据都拷贝到应用中,在数据更新提交的时候比较数据库中的数据与新数据,如果两个数据一模一样则表示没有冲突可以直接提交,如果有冲突就要交给业务逻辑去解决。(具有ABA问题:用版本号或时间戳解决)
现代CPU提供CAS(Compare-And-Swap) 等原子指令,可在单个指令周期内完成类似以下的判断函数:
1 2 3 4 5 6 7 8 9 bool CAS (T* ptr, T old_val, T new_val) { if (*ptr == old_val) { *ptr = new_val; return true ; } return false ; }
以上类似的操作,实际指令执行期间不会被中断,避免了数据竞争。
比如,用这个特性,实现一个自旋锁:
1 2 3 4 5 6 int TestAndSet (int *old_ptr, int new) { int old = *old_ptr; *old_ptr = new; return old; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 typedef struct lock_t { int flag; } lock_t ; void init (lock_t *lock) { lock->flag = 0 ; } void lock (lock_t *lock) { while (TestAndSet(&lock->flag, 1 ) == 1 ) {} } void unlock (lock_t *lock) { lock->flag = 0 ; }
内存顺序
1 2 3 4 5 6 7 8 9 std::atomic<int > flag (0 ) ;data = 42 ; flag.store (1 , std::memory_order_release); while (flag.load (std::memory_order_acquire) != 1 ) ; read (data);
无锁数据结构的实现模式
”读-修改-写“循环(Read-Modify-Write Loop)
这是实现无锁的关键思维之一:乐观并发控制 :先执行操作,提交前,验证数据(tail)未被修改,如果未被修改,再提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 bool CAS (Node* old_ptr, Node* new_ptr) { if (this == old_ptr) { this = new_ptr; return true ; } return false ; } void push (T value) { Node* new_node = new Node (value); Node* old_tail; do { old_tail = tail.load (); new_node->next = old_tail; } while (!tail.CAS (old_tail, new_node)); }
帮助机制(Helping Mechanism)
当线程A发现线程B的操作未完成时,主动协助推进(如无锁队列中帮其他线程移动tail指针)。
分离并发关注点(如头尾指针分离)
将数据结构拆分为多个可独立更新的部分,减少竞争点。
这是实现无锁的关键思维之二:局部性原理(Locality Principle) :通过数据分片(如分槽队列)减少缓存行冲突:
对齐64位。
1 2 3 4 struct alignas (64 ) PaddedAtomic{ std::atomic<int > count; };
ABA问题(乐观锁问题)
乐观锁会出现这种问题。
乐观锁是对于数据冲突保持一种乐观态度,操作数据时不会对操作的数据进行加锁(这使得多个任务可以并行的对数据进行操作),只有到数据提交的时候才通过一种机制来验证数据是否存在冲突(一般实现方式是通过加版本号然后进行版本号的对比方式实现)
特点:乐观锁是一种并发类型的锁,其本身不对数据进行加锁。而是通过业务实现锁的功能。
不对数据进行加锁就意味着允许多个请求同时访问数据,同时也省掉了对数据加锁和解锁的过程。
这种方式节省了悲观锁加锁的操作,所以可以一定程度的的提高操作的性能。
不过在并发非常高的情况下,会导致大量的请求冲突,冲突导致大部分操作无功而返而浪费资源。
所以在高并发的场景下,乐观锁的性能却反而不如悲观锁。
问题场景
比如说线程一从数据库中取出库存数 3,这时候线程二也从数据库中取出库存数 3。
线程二进行了一些操作变成了 2 。但是然后线程二在线程一拿到操作权之前,又将库存数变成了 3 。
这时候:线程一进行 CAS 操作发现数据库中仍然是 3,然后线程一操作成功。
尽管线程一的 CAS 操作成功,但是不代表这个过程就是没有问题的。
解决方案
使用版本戳来对数据进行标记,数据每发生一次修改,版本号就增加1。某条数据在提交的时候,如果数据库中的版本号与自己的一致,就说明数据没有发生修改,否则就认为是脏数据,需要处理。
使用带版本号的指针:
1 2 3 4 5 struct VersionedPtr { Node* ptr; uint64_t version; };
内存回收挑战
方法
原理
适用场景
Hazard Pointers
线程本地记录"危险指针"(有的也叫风险指针),延迟删除其他线程可能访问的内存
Linux内核常用
Epoch-Based Reclaim
将内存标记为待回收,当所有线程都不持有旧epoch时才删除
NVIDIA库常用
RCU(Read-Copy-Update)
读操作无同步,写操作创建副本延迟回收
Linux内核链表
MPMC无锁队列
Node的设计(伪共享预防、ABA问题解决)
伪共享预防 :alignas(64)
和填充确保 头尾 在不同缓存行
ABA问题防护 :通过version
版本号区分指针复用
1 2 3 4 5 6 7 8 9 10 11 12 13 struct Node { T* data; std::atomic<Node*> next; char padding[64 - sizeof (T*) - sizeof (std::atomic<Node*>)]; }; struct VersionedPtr { Node* ptr; uint64_t version; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <atomic> #include <memory> template <typename T>class LockFreeMPMCQueue {private : struct Node { Node () : data (nullptr ), next (nullptr ) { } T* data; std::atomic<Node*> next; char padding[64 - sizeof (T*) - sizeof (std::atomic<Node*>)]; }; struct alignas (16 ) VersionedPtr { Node* ptr; uint64_t version; }; struct { alignas (64 ) std::atomic<VersionedPtr> head; char padding[64 ]; }; alignas (64 ) std::atomic<VersionedPtr> tail; public : };
队列初始化(哨兵dummy头节点)
作用 :保证非空队列始终存在头节点,避免头尾指针竞态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { public : LockFreeMPMCQueue () { Node* dummy = new Node (); VersionedPtr init = {dummy, 0 }; head.store (init, std::memory_order_relaxed); tail.store (init, std::memory_order_relaxed); } };
入队操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 bool Enqueue (T value) { Node* new_node = new Node (); try { new_node->data = new T (std::move (value)); } catch (...) { delete new_node; return false ; } VersionedPtr current_tail; while (true ) { current_tail = tail.load (std::memory_order_acquire); Node* real_tail = current_tail.ptr; Node* next = (real_tail->next).load (std::memory_order_acquire); if (current_tail.ptr != tail.load (std::memory_order_acquire).ptr) continue ; if (next == nullptr ) { VersionedPtr new_ptr{new_node, current_tail.version + 1 }; if (real_tail->next.compare_exchange_weak ( next, new_node, std::memory_order_release, std::memory_order_relaxed)) { break ; } } else { VersionedPtr other_ptr{next, current_tail.version + 1 }; tail.compare_exchange_strong ( current_tail, other_ptr, std::memory_order_release, std::memory_order_relaxed); } } VersionedPtr new_tail{new_node, current_tail.version + 1 }; tail.compare_exchange_strong ( current_tail, new_tail, std::memory_order_release, std::memory_order_relaxed); return true ; }
compare_exchange_weak
和strong
1 2 3 4 5 6 bool compare_exchange_weak (T& expected, T desired, std::memory_order success, std::memory_order failure) noexcept ;bool compare_exchange_strong (T& expected, T desired, std::memory_order success, std::memory_order failure) noexcept ;
两个函数都是:
Atomically compares the value representation (since C++20) of *this
with that of expected. If those are bitwise-equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in *this
into expected (performs load operation).
比较*this
和expected
的值,若相等,则把*this
替换为desired
(执行read-modify-write operation
,即:读-修改-写),返回真。否则,把*this
实际的值,存到expected
中(执行load operation
),返回假。
参数:
compare_exchange_weak
和compare_exchange_strong
都在后面有两个内存顺序的参数,第一个内存顺序指的是,成功时,做read‑modify‑write operation
的内存顺序;第二个内存顺序指的是,失败时,做load operation
的内存顺序。
与 compare_exchange_strong 不同的是,这个弱版本允许通过返回 false
来虚假地失败,即使*expected
确实与 obj 中包含的值相等。
对于某些循环算法来说,这可能是可接受的行为,并且可能在某些平台上导致显著更好的性能。
对于这些虚假的失败,函数返回false
,但不修改预期。
出队操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 bool Dequeue (T& result) { VersionedPtr current_head; Node* real_tail; Node* next; while (true ) { current_head = head.load (std::memory_order_acquire); Node* real_head = current_head.ptr; real_tail = tail.load (std::memory_order_acquire).ptr; next = (real_head->next).load (std::memory_order_acquire); if (current_head.ptr != head.load (std::memory_order_acquire).ptr) continue ; if (real_head == real_tail) { if (next == nullptr ) { return false ; } VersionedPtr new_tail{next, tail.load ().version + 1 }; tail.compare_exchange_strong ( tail.load (), new_tail, std::memory_order_release, std::memory_order_relaxed); } else { T* data_ptr = next->data; VersionedPtr new_head{next, current_head.version + 1 }; if (head.compare_exchange_strong ( current_head, new_head, std::memory_order_release, std::memory_order_relaxed)) { result = std::move (*data_ptr); delete data_ptr; delete real_head; return true ; } } } }
析构
1 2 3 4 5 6 7 8 9 10 11 12 ~LockFreeMPMCQueue () { while (Node* node = head.load ().ptr) { head.store ({node->next, 0 }, std::memory_order_relaxed); delete node->data; delete node; } }
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 #include <atomic> #include <memory> template <typename T>class LockFreeMPMCQueue {private : struct Node { Node () : data (nullptr ), next (nullptr ) {} T* data; std::atomic<Node*> next; char padding[64 - sizeof (T*) - sizeof (std::atomic<Node*>)]; }; struct alignas (16 ) VersionedPtr { Node* ptr; uint64_t version; }; struct { alignas (64 ) std::atomic<VersionedPtr> head; char padding[64 ]; }; alignas (64 ) std::atomic<VersionedPtr> tail; public : LockFreeMPMCQueue () { Node* dummy = new Node (); VersionedPtr init = {dummy, 0 }; head.store (init, std::memory_order_relaxed); tail.store (init, std::memory_order_relaxed); } ~LockFreeMPMCQueue () { while (Node* node = head.load ().ptr) { head.store ({node->next, 0 }, std::memory_order_relaxed); delete node->data; delete node; } } bool Enqueue (T value) { Node* new_node = new Node (); try { new_node->data = new T (std::move (value)); } catch (...) { delete new_node; return false ; } VersionedPtr current_tail; while (true ) { current_tail = tail.load (std::memory_order_acquire); Node* real_tail = current_tail.ptr; Node* next = real_tail->next.load (std::memory_order_acquire); if (current_tail.ptr != tail.load (std::memory_order_acquire).ptr) continue ; if (next == nullptr ) { VersionedPtr new_ptr{new_node, current_tail.version + 1 }; if (real_tail->next.compare_exchange_weak ( next, new_node, std::memory_order_release, std::memory_order_relaxed)) { break ; } } else { VersionedPtr new_ptr{next, current_tail.version + 1 }; tail.compare_exchange_strong ( current_tail, new_ptr, std::memory_order_release, std::memory_order_relaxed); } } VersionedPtr new_tail{new_node, current_tail.version + 1 }; tail.compare_exchange_strong ( current_tail, new_tail, std::memory_order_release, std::memory_order_relaxed); return true ; } bool Dequeue (T& result) { VersionedPtr current_head; Node* real_tail; Node* next; while (true ) { current_head = head.load (std::memory_order_acquire); Node* real_head = current_head.ptr; real_tail = tail.load (std::memory_order_acquire).ptr; next = real_head->next.load (std::memory_order_acquire); if (current_head.ptr != head.load (std::memory_order_acquire).ptr) continue ; if (real_head == real_tail) { if (next == nullptr ) { return false ; } VersionedPtr new_tail{next, tail.load ().version + 1 }; tail.compare_exchange_strong ( tail.load (), new_tail, std::memory_order_release, std::memory_order_relaxed); } else { T* data_ptr = next->data; VersionedPtr new_head{next, current_head.version + 1 }; if (head.compare_exchange_strong ( current_head, new_head, std::memory_order_release, std::memory_order_relaxed)) { result = std::move (*data_ptr); delete data_ptr; delete real_head; return true ; } } } } };
内存回收(危险指针)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 thread_local std::vector<Node*> hp_records (2 ) ; void RetireNode (Node* old) { if (!IsPointerHazard (old)) { delete old; } else { retired_list.push_back (old); } }
无锁是忙等待吗?
无锁编程并不等同于忙等待(Busy-Waiting),其核心在于「非阻塞」,而非具体等待方式。
「非阻塞」的定义
Lock-Free :至少一个线程能前进
Wait-Free :所有线程都能在有限步完成
延迟与吞吐的权衡
忙等待 :牺牲CPU,以降低延迟(高频交易系统)
阻塞等待 :加大延迟,以加大吞吐(Web服务器连接池)
无锁编程可根据竞争强度动态选择等待策略 :
低竞争时短暂自旋(利用CPU流水线)
中竞争时退避+主动让出CPU
高竞争时进入操作系统阻塞队列
核心目标是以最小开销维持「至少一个线程前进」的非阻塞特性 ,而非强制忙等待。
实际无锁设计中可通过多种策略避免忙等:
操作系统级阻塞等待
1 2 3 4 5 6 7 8 while (!atomic_var.compare_exchange_weak (...)){ syscall (SYS_futex, &atomic_var, FUTEX_WAIT, ...); } atomic_var.wait (old_val, std::memory_order_relaxed);
定时退避策略
1 2 3 4 5 6 7 8 9 10 11 12 13 int retries = 0 ;while (!CAS (ptr, old_val, new_val)){ if (retries++ > MAX_SPIN) { std::this_thread::yield (); retries = 0 ; } else { _mm_pause(); } }
队列化竞争机制(排队锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 struct Waiter { std::atomic<Waiter*> next; };std::atomic<Waiter*> tail_{nullptr }; void lock () { Waiter w; w.next = nullptr ; Waiter* prev = tail_.exchange (&w, std::memory_order_acq_rel); if (prev) { prev->next = &w; while (w.next.load () != &w) {} } } void unlock () { Waiter* w = ...; if (!w->next) { if (tail_.compare_exchange_strong (w, nullptr )) return ; } while (!w->next.load ()) {} w->next.load ()->next.store (w->next.load ()->next, ...); }
对比
策略
实现方式
适用场景
CPU占用
延迟
忙等待
while(!CAS)
极低延迟场景(<100ns)
100%核心
极低
主动退让
std::this_thread::yield()
用户态竞争适中
< 30%
微秒级
操作系统阻塞
futex
/ atomic::wait
高竞争/长等待
~0%
毫秒级
队列化调度
MCS锁等排队机制
严格公平性要求
随队列转移
亚毫秒级
工业级案例
Linux内核Futex
首次CAS竞争失败后,通过FUTEX_WAIT
系统调用挂起线程
解锁时通过FUTEX_WAKE
唤醒等待线程
Java的AQS(Abstract Queued Synchronizer)
1 2 3 4 5 6 7 8 9 final boolean acquireQueued (...) { for (;;) { if (tryAcquire(arg)) return true ; if (shouldParkAfterFailedAcquire()) LockSupport.park(this ); } }
C++20原子等待
1 2 3 4 5 6 std::atomic<int > flag (0 ) ;while (!flag.wait (1 , std::memory_order_seq_cst)); flag.store (2 ); flag.notify_all ();
总线锁和缓存锁
总线锁(Bus Lock)
当CPU执行带LOCK
前缀的指令(如CMPXCHG
)时,会通过芯片组发出硬件信号 ,独占整个内存总线(Bus) 。此时其他CPU的所有内存访问请求将被阻塞,直到当前操作完成。
1 2 3 4 5 sequenceDiagram CPU核心A->>内存总线: LOCK#信号(总线锁定) 内存总线->>所有CPU: 阻塞其他内存请求 CPU核心A->>内存: 执行原子操作(如CAS) 内存总线->>所有CPU: 解锁
特点
全局性锁定 :锁定期间所有内存操作均被阻塞(包括非竞争数据)
性能开销大 :原子操作串行化,多核性能急剧下降
兼容性强 :早期x86处理器的唯一选择(如80486)
缓存锁(Cache Locking)—— MESI优化
核心:缓存一致性协议(Cache Coherence)
现代CPU通过MESI协议 (Modified/Exclusive/Shared/Invalid)维护多核缓存一致性:
1 2 3 4 5 graph LR Modified -->|写回| Invalid Exclusive -->|其他核读| Shared Shared -->|写| Modified Invalid -->|写| Exclusive
缓存锁的触发条件(以Intel CPU为例)
当原子操作访问的数据满足:
对齐在缓存行内 (通常64字节对齐)
目标地址未跨缓存行 (Non-split Access)
CPU支持缓存锁定技术 (几乎所有现代处理器)
工作流程
1 2 3 4 5 6 7 8 9 10 1. 核0 执行原子操作(如X++): - 发出RFO(Request For Ownership)消息 - 其他核心将X的缓存行置为Invalid - 核0 将缓存行置为Modified状态 - 执行修改(未触发总线锁) 2. 核1 尝试操作X: - 发现缓存失效(Invalid) - 从核0 的缓存中读取最新数据(缓存行状态转为Shared)
避免缓存冲突
内存对齐避免总线锁
1 2 3 4 5 6 7 8 9 10 11 12 struct Unaligned { char padding[62 ]; std::atomic<int > x; }; struct alignas (64 ) Aligned{ std::atomic<int > x; };
检测方法:Linux下perf c2c
可检测缓存行冲突(False Sharing)
写竞争下的性能差异
当两个CPU核心频繁写同一缓存行 时:
缓存锁场景 :缓存行在Modified
↔Invalid
状态间震荡,产生大量RFO消息
解决方案 :伪共享隔离(False Sharing Elimination)
1 2 3 4 5 6 struct PerCoreCounter { alignas (64 ) std::atomic<int > value; }; PerCoreCounter counters[CPU_CORES];
特殊场景总线锁不可避免
1 2 3 LOCK XCHG [mem], reg ; 显式LOCK前缀 CMPXCHG16B m128 ; 128位跨缓存行操作 一个未对齐的LOCK操作 ; 如对跨64字节边界的int操作
对比
特征
总线锁
缓存锁
锁定范围
整个内存总线
单个缓存行
性能影响
全局停顿,性能损失严重
仅影响涉及特定缓存行的操作
触发条件
LOCK
前缀指令
内存对齐且未跨缓存行的原子操作
实现技术
硬件信号硬阻塞
MESI缓存一致性协议
现代应用
仅作兜底(如跨缓存行操作)
99%原子操作的默认实现
能耗
高(总线开关)
低(仅缓存状态切换)