Skip to the content.

内存模型基础

原子操作和原子类型

标准原子类型

struct A {
  int a[100];
};

struct B {
  int x, y;
};

assert(!std::atomic<A>{}.is_lock_free());
assert(std::atomic<B>{}.is_lock_free());
assert(std::atomic<int>{}.is_always_lock_free);
// LOCK-FREE PROPERTY
#define ATOMIC_BOOL_LOCK_FREE 2
#define ATOMIC_CHAR_LOCK_FREE 2
#ifdef __cpp_lib_char8_t
#define ATOMIC_CHAR8_T_LOCK_FREE 2
#endif // __cpp_lib_char8_t
#define ATOMIC_CHAR16_T_LOCK_FREE 2
#define ATOMIC_CHAR32_T_LOCK_FREE 2
#define ATOMIC_WCHAR_T_LOCK_FREE  2
#define ATOMIC_SHORT_LOCK_FREE  2
#define ATOMIC_INT_LOCK_FREE    2
#define ATOMIC_LONG_LOCK_FREE   2
#define ATOMIC_LLONG_LOCK_FREE  2
#define ATOMIC_POINTER_LOCK_FREE  2
namespace std {
using atomic_bool = atomic<bool>;
using atomic_char = std::atomic<char>;
}  // namespace std
namespace std {
using atomic_schar = std::atomic<signed char>;
using atomic_uchar = std::atomic<unsigned char>;
using atomic_uint = std::atomic<unsigned>;
using atomic_ushort = std::atomic<unsigned short>;
using atomic_ulong = std::atomic<unsigned long>;
using atomic_llong = std::atomic<long long>;
using atomic_ullong = std::atomic<unsigned long long>;
}  // namespace std
T operator=(T desired) noexcept;
T operator=(T desired) volatile noexcept;
atomic& operator=(const atomic&) = delete;
atomic& operator=(const atomic&) volatile = delete;
std::atomic<T>::store     // 替换当前值
std::atomic<T>::load      // 返回当前值
std::atomic<T>::exchange  // 替换值,并返回被替换前的值

// 与期望值比较,不等则将期望值设为原子值并返回 false
// 相等则将原子值设为目标值并返回 true
// 在缺少 CAS(compare-and-exchange)指令的机器上,weak 版本在相等时可能替换失败并返回 false
// 因此 weak 版本通常要求循环,而 strong 版本返回 false 就能确保不相等
std::atomic<T>::compare_exchange_weak
std::atomic<T>::compare_exchange_strong

std::atomic<T>::fetch_add        // 原子加法,返回相加前的值
std::atomic<T>::fetch_sub        // 原子减法,返回相减前的值
std::atomic<T>::fetch_and
std::atomic<T>::fetch_or
std::atomic<T>::fetch_xor
std::atomic<T>::operator++       // 前自增等价于 fetch_add(1) + 1
std::atomic<T>::operator++(int)  // 后自增等价于 fetch_add(1)
std::atomic<T>::operator--       // 前自减等价于 fetch_sub(1) - 1
std::atomic<T>::operator--(int)  // 后自减等价于 fetch_sub(1)
std::atomic<T>::operator+=       // fetch_add(x) + x
std::atomic<T>::operator-=       // fetch_sub(x) - x
std::atomic<T>::operator&=       // fetch_and(x) & x
std::atomic<T>::operator|=       // fetch_or(x) | x
std::atomic<T>::operator^=       // fetch_xor(x) ^ x
typedef enum memory_order {
  memory_order_relaxed,
  memory_order_consume,
  memory_order_acquire,
  memory_order_release,
  memory_order_acq_rel,
  memory_order_seq_cst
} memory_order;

void store(T desired, std::memory_order order = std::memory_order_seq_cst);
// store 的内存序只能是
// memory_order_relaxed、memory_order_release、memory_order_seq_cst
T load(std::memory_order order = std::memory_order_seq_cst);
// load 的内存序只能是
// memory_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_seq_cst

std::atomic_flag

std::atomic_flag x = ATOMIC_FLAG_INIT;

x.clear(std::memory_order_release);  // 将状态设为 false
// 不能为读操作语义:memory_order_consume、memory_order_acquire、memory_order_acq_rel

bool y = x.test_and_set();  // 将状态设为 true 且返回之前的值
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

class Spinlock {
 public:
  void lock() {
    while (flag_.test_and_set(std::memory_order_acquire)) {
    }
  }

  void unlock() { flag_.clear(std::memory_order_release); }

 private:
  std::atomic_flag flag_ = ATOMIC_FLAG_INIT;
};

Spinlock m;

void f(int n) {
  for (int i = 0; i < 100; ++i) {
    m.lock();
    std::cout << "Output from thread " << n << '\n';
    m.unlock();
  }
}

int main() {
  std::vector<std::thread> v;
  for (int i = 0; i < 10; ++i) {
    v.emplace_back(f, i);
  }
  for (auto& x : v) {
    x.join();
  }
}

其他原子类型

std::atomic<bool> x(true);
x = false;
bool y = x.load(std::memory_order_acquire);  // 读取 x 值返回给 y
x.store(true);                               // x 写为 true
y = x.exchange(false,
               std::memory_order_acq_rel);  // x 用 false 替换,并返回旧值给 y
bool expected = false;                      // 期望值
// 不等则将期望值设为 x 并返回 false,相等则将 x 设为目标值 true 并返回 true
// weak 版本在相等时也可能替换失败而返回 false,因此一般用于循环
while (!x.compare_exchange_weak(expected, true) && !expected) {
}
// 对于只有两种值的 std::atomic<bool> 来说显得有些繁琐
// 但对其他原子类型来说,这个影响就大了
class A {};
A a[5];
std::atomic<A*> p(a);   // p 为 &a[0]
A* x = p.fetch_add(2);  // p 为 &a[2],并返回原始值 a[0]
assert(x == a);
assert(p.load() == &a[2]);
x = (p -= 1);  // p 为 &a[1],并返回给 x,相当于 x = p.fetch_sub(1) - 1
assert(x == &a[1]);
assert(p.load() == &a[1]);
std::atomic<int> i(5);
int j = i.fetch_and(3);  // 101 & 011 = 001
assert(i == 1);
assert(j == 5);
#include <atomic>

class Spinlock {
 public:
  void lock() {
    int expected = 0;
    while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_release,
                                        std::memory_order_relaxed)) {
      expected = 0;
    }
  }

  void unlock() { flag_.store(0, std::memory_order_release); }

 private:
  std::atomic<int> flag_ = 0;
};
#include <atomic>

class SharedSpinlock {
 public:
  void lock() {
    int expected = 0;
    while (!flag_.compare_exchange_weak(expected, 1, std::memory_order_release,
                                        std::memory_order_relaxed)) {
      expected = 0;
    }
  }

  void unlock() { flag_.store(0, std::memory_order_release); }

  void lock_shared() {
    int expected = 0;
    while (!flag_.compare_exchange_weak(expected, 2, std::memory_order_release,
                                        std::memory_order_acquire) &&
           expected == 2) {
      expected = 0;
    }
    count_.fetch_add(1, std::memory_order_release);
  }

  void unlock_shared() {
    if (count_.fetch_sub(1, std::memory_order_release) == 1) {
      flag_.store(0, std::memory_order_release);
    }
  }

 private:
  std::atomic<int> flag_ = 0;
  std::atomic<int> count_ = 0;
};
#include <atomic>
#include <thread>

class Barrier {
 public:
  explicit Barrier(unsigned n) : count_(n), spaces_(n), generation_(0) {}

  void wait() {
    unsigned gen = generation_.load();
    if (--spaces_ == 0) {
      spaces_ = count_.load();
      ++generation_;
      return;
    }
    while (generation_.load() == gen) {
      std::this_thread::yield();
    }
  }

  void arrive() {
    --count_;
    if (--spaces_ == 0) {
      spaces_ = count_.load();
      ++generation_;
    }
  }

 private:
  std::atomic<unsigned> count_;       // 需要同步的线程数
  std::atomic<unsigned> spaces_;      // 剩余未到达 Barrier 的线程数
  std::atomic<unsigned> generation_;  // 所有线程到达 Barrier 的总次数
};
class A {
 public:
  virtual void f() {}
};

assert(!std::is_trivially_copyable_v<A>);
std::atomic<A> a;                 // 错误:A 不满足 trivially copyable
std::atomic<std::vector<int>> v;  // 错误
std::atomic<std::string> s;       // 错误
std::atomic<int> i(42);
int j = std::atomic_load(&i);  // 等价于 i.load()
std::atomic<int> i(42);
std::atomic_load_explicit(&i, std::memory_order_acquire);  // i.load(std::memory_order_acquire)
bool compare_exchange_weak(T& expected, T desired, std::memory_order success,
                           std::memory_order failure);

template <class T>
bool atomic_compare_exchange_weak(std::atomic<T>* obj,
                                  typename std::atomic<T>::value_type* expected,
                                  typename std::atomic<T>::value_type desired);

template <class T>
bool atomic_compare_exchange_weak_explicit(
    std::atomic<T>* obj, typename std::atomic<T>::value_type* expected,
    typename std::atomic<T>::value_type desired, std::memory_order succ,
    std::memory_order fail);
std::atomic_flag x = ATOMIC_FLAG_INIT;
bool y = std::atomic_flag_test_and_set_explicit(&x, std::memory_order_acquire);
std::atomic_flag_clear_explicit(&x, std::memory_order_release);
std::atomic<std::shared_ptr<int>> x;

同步操作和强制排序(enforced ordering)

std::vector<int> data;
std::atomic<bool> data_ready(false);

void read_thread() {
  while (!data_ready.load()) {  // 1 happens-before 2
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
  std::cout << data[0];  // 2
}

void write_thread() {
  data.emplace_back(42);  // 3 happens-before 4
  data_ready = true;      // 4 inter-thread happens-before 1
}

synchronizes-with

happens-before

#include <iostream>

void f(int x, int y) { std::cout << x << y; }

int g() {
  static int i = 0;
  return ++i;
}

int main() {
  f(g(), g());  // 无序调用 g,可能是 21 也可能是 12
  // 一般 C++ 默认使用 __cdecl 调用模式,参数从右往左入栈,就是21
}

inter-thread happens-before

strongly-happens-before

std::memory_order

typedef enum memory_order {
  memory_order_relaxed,  // 无同步或顺序限制,只保证当前操作原子性
  memory_order_consume,  // 标记读操作,依赖于该值的读写不能重排到此操作前
  memory_order_acquire,  // 标记读操作,之后的读写不能重排到此操作前
  memory_order_release,  // 标记写操作,之前的读写不能重排到此操作后
  memory_order_acq_rel,  // 仅标记读改写操作,读操作相当于 acquire,写操作相当于 release
  memory_order_seq_cst   // sequential consistency:顺序一致性,不允许重排,所有原子操作的默认选项
} memory_order;

Relaxed ordering

#include <atomic>
#include <thread>

std::atomic<int> x = 0;
std::atomic<int> y = 0;

void f() {
  int i = y.load(std::memory_order_relaxed);  // 1
  x.store(i, std::memory_order_relaxed);      // 2
}

void g() {
  int j = x.load(std::memory_order_relaxed);  // 3
  y.store(42, std::memory_order_relaxed);     // 4
}

int main() {
  std::thread t1(f);
  std::thread t2(g);
  t1.join();
  t2.join();
  // 可能执行顺序为 4123,结果 i == 42, j == 42
}
#include <atomic>
#include <thread>

std::atomic<int> x = 0;
std::atomic<int> y = 0;

void f() {
  i = y.load(std::memory_order_relaxed);  // 1
  if (i == 42) {
    x.store(i, std::memory_order_relaxed);  // 2
  }
}

void g() {
  j = x.load(std::memory_order_relaxed);  // 3
  if (j == 42) {
    y.store(42, std::memory_order_relaxed);  // 4
  }
}

int main() {
  std::thread t1(f);
  std::thread t2(g);
  t1.join();
  t2.join();
  // 结果不允许为i == 42, j == 42
  // 因为要产生这个结果,1 依赖 4,4 依赖 3,3 依赖 2,2 依赖 1
}
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>

std::atomic<int> x = 0;

void f() {
  for (int i = 0; i < 1000; ++i) {
    x.fetch_add(1, std::memory_order_relaxed);
  }
}

int main() {
  std::vector<std::thread> v;
  for (int i = 0; i < 10; ++i) {
    v.emplace_back(f);
  }
  for (auto& x : v) {
    x.join();
  }
  std::cout << x;  // 10000
}

Release-Consume ordering

#include <atomic>
#include <cassert>
#include <thread>

std::atomic<int*> x;
int i;

void producer() {
  int* p = new int(42);
  i = 42;
  x.store(p, std::memory_order_release);
}

void consumer() {
  int* q;
  while (!(q = x.load(std::memory_order_consume))) {
  }
  assert(*q == 42);  // 一定不出错:*q 带有 x 的依赖
  assert(i == 42);   // 可能出错也可能不出错:i 不依赖于 x
}

int main() {
  std::thread t1(producer);
  std::thread t2(consumer);
  t1.join();
  t2.join();
}

Release-Acquire ordering

#include <atomic>
#include <cassert>
#include <thread>

std::atomic<int*> x;
int i;

void producer() {
  int* p = new int(42);
  i = 42;
  x.store(p, std::memory_order_release);
}

void consumer() {
  int* q;
  while (!(q = x.load(std::memory_order_acquire))) {
  }
  assert(*q == 42);  // 一定不出错
  assert(i == 42);   // 一定不出错
}

int main() {
  std::thread t1(producer);
  std::thread t2(consumer);
  t1.join();
  t2.join();
}
#include <atomic>
#include <thread>

std::atomic<bool> x = false;
std::atomic<bool> y = false;
std::atomic<int> z = 0;

void write_x() {
  x.store(true,
          std::memory_order_release);  // 1 happens-before 3(由于 3 的循环)
}

void write_y() {
  y.store(true,
          std::memory_order_release);  // 2 happens-before 5(由于 5 的循环)
}

void read_x_then_y() {
  while (!x.load(std::memory_order_acquire)) {  // 3 happens-before 4
  }
  if (y.load(std::memory_order_acquire)) {  // 4
    ++z;
  }
}

void read_y_then_x() {
  while (!y.load(std::memory_order_acquire)) {  // 5 happens-before 6
  }
  if (x.load(std::memory_order_acquire)) {  // 6
    ++z;
  }
}

int main() {
  std::thread t1(write_x);
  std::thread t2(write_y);
  std::thread t3(read_x_then_y);
  std::thread t4(read_y_then_x);
  t1.join();
  t2.join();
  t3.join();
  t4.join();
  // z 可能为 0,134 y 为 false,256 x 为 false,但 12 之间没有关系
}

#include <atomic>
#include <cassert>
#include <thread>

std::atomic<bool> x = false;
std::atomic<bool> y = false;
std::atomic<int> z = 0;

void write_x_then_y() {
  x.store(true, std::memory_order_relaxed);  // 1 happens-before 2
  y.store(true,
          std::memory_order_release);  // 2 happens-before 3(由于 3 的循环)
}

void read_y_then_x() {
  while (!y.load(std::memory_order_acquire)) {  // 3 happens-before 4
  }
  if (x.load(std::memory_order_relaxed)) {  // 4
    ++z;
  }
}

int main() {
  std::thread t1(write_x_then_y);
  std::thread t2(read_y_then_x);
  t1.join();
  t2.join();
  assert(z.load() != 0);  // 顺序一定为 1234,z 一定不为 0
}
#include <atomic>
#include <cassert>

std::atomic<bool> x = false;
std::atomic<bool> y = false;
std::atomic<int> v[2];

void f() {
  // v[0]、v[1] 的设置没有先后顺序,但都 happens-before 1
  v[0].store(1, std::memory_order_relaxed);
  v[1].store(2, std::memory_order_relaxed);
  x.store(true,
          std::memory_order_release);  // 1 happens-before 2(由于 2 的循环)
}

void g() {
  while (!x.load(std::memory_order_acquire)) {  // 2:happens-before 3
  }
  y.store(true,
          std::memory_order_release);  // 3 happens-before 4(由于 4 的循环)
}

void h() {
  while (!y.load(
      std::memory_order_acquire)) {  // 4 happens-before v[0]、v[1] 的读取
  }
  assert(v[0].load(std::memory_order_relaxed) == 1);
  assert(v[1].load(std::memory_order_relaxed) == 2);
}
#include <atomic>
#include <cassert>

std::atomic<int> x = 0;
std::atomic<int> v[2];

void f() {
  v[0].store(1, std::memory_order_relaxed);
  v[1].store(2, std::memory_order_relaxed);
  x.store(1, std::memory_order_release);  // 1 happens-before 2(由于 2 的循环)
}

void g() {
  int i = 1;
  while (!x.compare_exchange_strong(
      i, 2,
      std::memory_order_acq_rel)) {  // 2 happens-before 3(由于 3 的循环)
    // x 为 1 时,将 x 替换为 2,返回 true
    // x 为 0 时,将 i 替换为 x,返回 false
    i = 1;  // 返回 false 时,x 未被替换,i 被替换为 0,因此将 i 重新设为 1
  }
}

void h() {
  while (x.load(std::memory_order_acquire) < 2) {  // 3
  }
  assert(v[0].load(std::memory_order_relaxed) == 1);
  assert(v[1].load(std::memory_order_relaxed) == 2);
}

Sequentially-consistent ordering

#include <atomic>
#include <cassert>
#include <thread>

std::atomic<bool> x = false;
std::atomic<bool> y = false;
std::atomic<int> z = 0;

// 要么 1 happens-before 2,要么 2 happens-before 1
void write_x() {
  x.store(true);  // 1 happens-before 3(由于 3 的循环)
}

void write_y() {
  y.store(true);  // 2 happens-before 5(由于 5 的循环)
}

void read_x_then_y() {
  while (!x.load()) {  // 3 happens-before 4
  }
  if (y.load()) {  // 4 为 false 则 1 happens-before 2
    ++z;
  }
}

void read_y_then_x() {
  while (!y.load()) {  // 5 happens-before 6
  }
  if (x.load()) {  // 6 如果返回 false 则一定是 2 happens-before 1
    ++z;
  }
}

int main() {
  std::thread t1(write_x);
  std::thread t2(write_y);
  std::thread t3(read_x_then_y);
  std::thread t4(read_y_then_x);
  t1.join();
  t2.join();
  t3.join();
  t4.join();
  assert(z.load() != 0);  // z 一定不为 0
  // z 可能为 1 或 2,12 之间必定存在 happens-before 关系
}

std::atomic_thread_fence

#include <atomic>
#include <cassert>
#include <thread>

std::atomic<bool> x, y;
std::atomic<int> z;

void f() {
  x.store(true, std::memory_order_relaxed);             // 1 happens-before 2
  std::atomic_thread_fence(std::memory_order_release);  // 2 synchronizes-with 3
  y.store(true, std::memory_order_relaxed);
}

void g() {
  while (!y.load(std::memory_order_relaxed)) {
  }
  std::atomic_thread_fence(std::memory_order_acquire);  // 3 happens-before 4
  if (x.load(std::memory_order_relaxed)) {              // 4
    ++z;
  }
}

int main() {
  x = false;
  y = false;
  z = 0;
  std::thread t1(f);
  std::thread t2(g);
  t1.join();
  t2.join();
  assert(z.load() != 0);  // 1 happens-before 4
}
#include <atomic>
#include <cassert>
#include <thread>

bool x = false;
std::atomic<bool> y;
std::atomic<int> z;

void f() {
  x = true;                                             // 1 happens-before 2
  std::atomic_thread_fence(std::memory_order_release);  // 2 synchronizes-with 3
  y.store(true, std::memory_order_relaxed);
}

void g() {
  while (!y.load(std::memory_order_relaxed)) {
  }
  std::atomic_thread_fence(std::memory_order_acquire);  // 3 happens-before 4
  if (x) {                                              // 4
    ++z;
  }
}

int main() {
  x = false;
  y = false;
  z = 0;
  std::thread t1(f);
  std::thread t2(g);
  t1.join();
  t2.join();
  assert(z.load() != 0);  // 1 happens-before 4
}