Skip to the content.

非阻塞数据结构

lock-free thread-safe stack

#include <atomic>

template <typename T>
class LockFreeStack {
 public:
  void push(const T& x) {
    Node* t = new Node(x);
    t->next = head_.load();
    while (!head_.compare_exchange_weak(t->next, t)) {
    }
  }

 private:
  struct Node {
    T v;
    Node* next = nullptr;
    Node(const T& x) : v(x) {}
  };

 private:
  std::atomic<Node*> head_;
};
template <typename T>
void LockFreeStack<T>::pop(T& res) {
  Node* t = head_.load();  // 未考虑头节点为空指针的情况
  while (!head_.compare_exchange_weak(t, t->next)) {
  }
  res = t->v;
}
#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack {
 public:
  void push(const T& x) {
    Node* t = new Node(x);
    t->next = head_.load();
    while (!head_.compare_exchange_weak(t->next, t)) {
    }
  }

  std::shared_ptr<T> pop() {  // 还未考虑释放原来的头节点指针
    Node* t = head_.load();
    while (t && !head_.compare_exchange_weak(t, t->next)) {
    }
    return t ? t->v : nullptr;
  }

 private:
  struct Node {
    std::shared_ptr<T> v;
    Node* next = nullptr;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

 private:
  std::atomic<Node*> head_;
};
#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack {
 public:
  void push(const T& x) {
    Node* t = new Node(x);
    t->next = head_.load();
    while (!head_.compare_exchange_weak(t->next, t)) {
    }
  }

  std::shared_ptr<T> pop() {
    ++pop_cnt_;
    Node* t = head_.load();
    while (t && !head_.compare_exchange_weak(t, t->next)) {
    }
    std::shared_ptr<T> res;
    if (t) {
      res.swap(t->v);
    }
    try_delete(t);
    return res;
  }

 private:
  struct Node {
    std::shared_ptr<T> v;
    Node* next = nullptr;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

 private:
  static void delete_list(Node* head) {
    while (head) {
      Node* t = head->next;
      delete head;
      head = t;
    }
  }

  void append_to_delete_list(Node* first, Node* last) {
    last->next = to_delete_list_;
    // 确保 last->next 为 to_delete_list_,再设置 first 为新的头节点
    while (!to_delete_list_.compare_exchange_weak(last->next, first)) {
    }
  }

  void append_to_delete_list(Node* head) {
    Node* last = head;
    while (Node* t = last->next) {
      last = t;
    }
    append_to_delete_list(head, last);
  }

  void try_delete(Node* head) {
    if (pop_cnt_ == 0) {
      return;
    }
    if (pop_cnt_ > 1) {
      append_to_delete_list(head, head);
      --pop_cnt_;
      return;
    }
    Node* t = to_delete_list_.exchange(nullptr);
    if (--pop_cnt_ == 0) {
      delete_list(t);
    } else if (t) {
      append_to_delete_list(t);
    }
    delete head;
  }

 private:
  std::atomic<Node*> head_;
  std::atomic<std::size_t> pop_cnt_;
  std::atomic<Node*> to_delete_list_;
};

Hazard Pointer(风险指针)

#include <atomic>
#include <stdexcept>
#include <thread>

static constexpr std::size_t MaxSize = 100;

struct HazardPointer {
  std::atomic<std::thread::id> id;
  std::atomic<void*> p;
};

static HazardPointer HazardPointers[MaxSize];

class HazardPointerHelper {
 public:
  HazardPointerHelper() {
    for (auto& x : HazardPointers) {
      std::thread::id default_id;
      if (x.id.compare_exchange_strong(default_id,
                                       std::this_thread::get_id())) {
        hazard_pointer = &x;  // 取一个未设置过的风险指针
        break;
      }
    }
    if (!hazard_pointer) {
      throw std::runtime_error("No hazard pointers available");
    }
  }

  ~HazardPointerHelper() {
    hazard_pointer->p.store(nullptr);
    hazard_pointer->id.store(std::thread::id{});
  }

  HazardPointerHelper(const HazardPointerHelper&) = delete;

  HazardPointerHelper operator=(const HazardPointerHelper&) = delete;

  std::atomic<void*>& get() { return hazard_pointer->p; }

 private:
  HazardPointer* hazard_pointer = nullptr;
};

std::atomic<void*>& hazard_pointer_for_this_thread() {
  static thread_local HazardPointerHelper t;
  return t.get();
}

bool is_existing(void* p) {
  for (auto& x : HazardPointers) {
    if (x.p.load() == p) {
      return true;
    }
  }
  return false;
}
#include <atomic>
#include <functional>
#include <memory>

#include "hazard_pointer.hpp"

template <typename T>
class LockFreeStack {
 public:
  void push(const T& x) {
    Node* t = new Node(x);
    t->next = head_.load();
    while (!head_.compare_exchange_weak(t->next, t)) {
    }
  }

  std::shared_ptr<T> pop() {
    std::atomic<void*>& hazard_pointer = hazard_pointer_for_this_thread();
    Node* t = head_.load();
    do {  // 外循环确保 t 为最新的头节点,循环结束后将头节点设为下一节点
      Node* t2;
      do {  // 循环至风险指针保存当前最新的头节点
        t2 = t;
        hazard_pointer.store(t);
        t = head_.load();
      } while (t != t2);
    } while (t && !head_.compare_exchange_strong(t, t->next));
    hazard_pointer.store(nullptr);
    std::shared_ptr<T> res;
    if (t) {
      res.swap(t->v);
      if (is_existing(t)) {
        append_to_delete_list(new DataToDelete{t});
      } else {
        delete t;
      }
      try_delete();
    }
    return res;
  }

 private:
  struct Node {
    std::shared_ptr<T> v;
    Node* next = nullptr;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

  struct DataToDelete {
    template <typename T>
    DataToDelete(T* p)
        : data(p), deleter([](void* p) { delete static_cast<T*>(p); }) {}

    ~DataToDelete() { deleter(data); }

    void* data = nullptr;
    std::function<void(void*)> deleter;
    DataToDelete* next = nullptr;
  };

 private:
  void append_to_delete_list(DataToDelete* t) {
    t->next = to_delete_list_.load();
    while (!to_delete_list_.compare_exchange_weak(t->next, t)) {
    }
  }

  void try_delete() {
    DataToDelete* cur = to_delete_list_.exchange(nullptr);
    while (cur) {
      DataToDelete* t = cur->next;
      if (!is_existing(cur->data)) {
        delete cur;
      } else {
        append_to_delete_list(new DataToDelete{cur});
      }
      cur = t;
    }
  }

 private:
  std::atomic<Node*> head_;
  std::atomic<std::size_t> pop_cnt_;
  std::atomic<DataToDelete*> to_delete_list_;
};

引用计数

std::shared_ptr<int> p(new int(42));
assert(std::atomic_is_lock_free(&p));
#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack {
 public:
  ~LockFreeStack() {
    while (pop()) {
    }
  }

  void push(const T& x) {
    auto t = std::make_shared<Node>(x);
    t->next = std::atomic_load(&head_);
    while (!std::atomic_compare_exchange_weak(&head_, &t->next, t)) {
    }
  }

  std::shared_ptr<T> pop() {
    std::shared_ptr<Node> t = std::atomic_load(&head_);
    while (t && !std::atomic_compare_exchange_weak(&head_, &t, t->next)) {
    }
    if (t) {
      std::atomic_store(&t->next, nullptr);
      return t->v;
    }
    return nullptr;
  }

 private:
  struct Node {
    std::shared_ptr<T> v;
    std::shared_ptr<Node> next;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

 private:
  std::shared_ptr<Node> head_;
};
#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack {
 public:
  ~LockFreeStack() {
    while (pop()) {
    }
  }

  void push(const T& x) {
    auto t = std::make_shared<Node>(x);
    t->next = head_.load();
    while (!head_.compare_exchange_weak(t->next, t)) {
    }
  }

  std::shared_ptr<T> pop() {
    std::shared_ptr<Node> t = head_.load();
    while (t && !head_.compare_exchange_weak(t, t->next.load())) {
    }
    if (t) {
      t->next = std::shared_ptr<Node>();
      return t->v;
    }
    return nullptr;
  }

 private:
  struct Node {
    std::shared_ptr<T> v;
    std::atomic<std::shared_ptr<Node>> next;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

 private:
  std::atomic<std::shared_ptr<Node>> head_;
};
assert(!std::atomic<std::shared_ptr<int>>{}.is_lock_free());
#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack {
 public:
  ~LockFreeStack() {
    while (pop()) {
    }
  }

  void push(const T& x) {
    ReferenceCount t;
    t.p = new Node(x);
    t.external_cnt = 1;
    t.p->next = head_.load();
    while (!head_.compare_exchange_weak(t.p->next, t)) {
    }
  }

  std::shared_ptr<T> pop() {
    ReferenceCount t = head_.load();
    while (true) {
      increase_count(t);  // 外部计数递增表示该节点正被使用
      Node* p = t.p;      // 因此可以安全地访问
      if (!p) {
        return nullptr;
      }
      if (head_.compare_exchange_strong(t, p->next)) {
        std::shared_ptr<T> res;
        res.swap(p->v);
        // 将外部计数减 2 后加到内部计数,减 2 是因为,
        // 节点被删除减 1,该线程无法再次访问此节点再减 1
        const int cnt = t.external_cnt - 2;
        if (p->inner_cnt.fetch_add(cnt) == -cnt) {
          delete p;  // 内外部计数和为 0
        }
        return res;
      }
      if (p->inner_cnt.fetch_sub(1) == 1) {
        delete p;  // 内部计数为 0
      }
    }
  }

 private:
  struct Node;

  struct ReferenceCount {
    int external_cnt;
    Node* p;
  };

  struct Node {
    std::shared_ptr<T> v;
    std::atomic<int> inner_cnt = 0;
    ReferenceCount next;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

  void increase_count(ReferenceCount& old_cnt) {
    ReferenceCount new_cnt;
    do {
      new_cnt = old_cnt;
      ++new_cnt.external_cnt;  // 访问 head_ 时递增外部计数,表示该节点正被使用
    } while (!head_.compare_exchange_strong(old_cnt, new_cnt));
    old_cnt.external_cnt = new_cnt.external_cnt;
  }

 private:
  std::atomic<ReferenceCount> head_;
};
#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack {
 public:
  ~LockFreeStack() {
    while (pop()) {
    }
  }

  void push(const T& x) {
    ReferenceCount t;
    t.p = new Node(x);
    t.external_cnt = 1;
    // 下面比较中 release 保证之前的语句都先执行,因此 load 可以使用 relaxed
    t.p->next = head_.load(std::memory_order_relaxed);
    while (!head_.compare_exchange_weak(t.p->next, t, std::memory_order_release,
                                        std::memory_order_relaxed)) {
    }
  }

  std::shared_ptr<T> pop() {
    ReferenceCount t = head_.load(std::memory_order_relaxed);
    while (true) {
      increase_count(t);  // acquire
      Node* p = t.p;
      if (!p) {
        return nullptr;
      }
      if (head_.compare_exchange_strong(t, p->next,
                                        std::memory_order_relaxed)) {
        std::shared_ptr<T> res;
        res.swap(p->v);
        // 将外部计数减 2 后加到内部计数,减 2 是因为,
        // 节点被删除减 1,该线程无法再次访问此节点再减 1
        const int cnt = t.external_cnt - 2;
        // swap 要先于 delete,因此使用 release
        if (p->inner_cnt.fetch_add(cnt, std::memory_order_release) == -cnt) {
          delete p;  // 内外部计数和为 0
        }
        return res;
      }
      if (p->inner_cnt.fetch_sub(1, std::memory_order_relaxed) == 1) {
        p->inner_cnt.load(std::memory_order_acquire);  // 只是用 acquire 来同步
        // acquire 保证 delete 在之后执行
        delete p;  // 内部计数为 0
      }
    }
  }

 private:
  struct Node;

  struct ReferenceCount {
    int external_cnt;
    Node* p = nullptr;
  };

  struct Node {
    std::shared_ptr<T> v;
    std::atomic<int> inner_cnt = 0;
    ReferenceCount next;
    Node(const T& x) : v(std::make_shared<T>(x)) {}
  };

  void increase_count(ReferenceCount& old_cnt) {
    ReferenceCount new_cnt;
    do {  // 比较失败不改变当前值,并可以继续循环,因此可以选择 relaxed
      new_cnt = old_cnt;
      ++new_cnt.external_cnt;  // 访问 head_ 时递增外部计数,表示该节点正被使用
    } while (!head_.compare_exchange_strong(old_cnt, new_cnt,
                                            std::memory_order_acquire,
                                            std::memory_order_relaxed));
    old_cnt.external_cnt = new_cnt.external_cnt;
  }

 private:
  std::atomic<ReferenceCount> head_;
};