马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
本文是 C++ Concurrency in Action 的总结
通过原子变量实现 spinlock
- class spinlock_mutex {
- std::atomic_flag flag;
- public:
- spinlock_mutex(): flag(ATOMIC_FLAG_INIT) {}
- void lock() {
- while (flag.test_and_set(std::memory_order_acquire));
- }
- void unlock() {
- flag.clear(std::memory_order_release);
- }
- };
复制代码 lock 这里利用 acquire 内存序的缘故因由是 lock 须要确保临界区里的读写须要看到上一个线程在 unlock 之前的修改
unlock 利用 release 表现临界区里的修改须要对下一个持锁线程可见
这里值得留意的是这个着实不是无锁编程,固然没有利用 mutex ,固然它不会被 block (线程进入 D 状态)
lock-free 的意思是如果有多个线程在利用一个共享的数据,在有限的step内里,此中有一个线程肯定能完成这个利用。
从这个界说上看,我们的 spinlock 如果有一个线程 lock 后 panic 了,那其他的线程同样被壅闭无法继续完成相应的利用。因此不是 lock-free
但是这个例子着实用到了无锁编程常常利用到的原子变量和其对应的利用。这也分析了用原子变量不代表就是无锁
实现一个 lock-free stack
一个最简化版的 stack
- template <typename T>
- class LockFreeStack {
- private:
- struct Node {
- T data_;
- Node* next;
- Node(T const& data) : data_(data) {}
- };
- std::atomic<Node*> head{nullptr};
- public:
- void push(T const& data) {
- Node* node = new Node(data);
- node->next = head.load();
- while (!head.compare_exchange_weak(node->next, node));
- }
- bool pop(T& result) {
- Node* old_head = head.load();
- while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
- if (!old_head) {
- return false;
- }
- result = old_head->data_;
- return true;
- }
- };
复制代码 pop 这里不能仅靠 load ,由于 pop 须要确保差别线程拿到的值须要不一样,以是须要利用 compare_exchange_weak 来包管如果节点被其他线程取了以后重新获取节点。
这里尚有一点小标题,pop 出来的节点利用了拷贝数据来返回,但是拷贝过程中大概发生非常,那如许的话这个数据就会丢失。以是可以利用智能指针来处置惩罚这个标题。如下:- template <typename T>
- class LockFreeStack {
- private:
- struct Node {
- std::shared_ptr<T> data_;
- Node* next;
- Node(T const& data) : data_(std::make_shared<T>(data)) {}
- };
- std::atomic<Node*> head{nullptr};
- public:
- void push(T const& data) {
- Node* node = new Node(data);
- node->next = head.load();
- while (!head.compare_exchange_weak(node->next, node));
- }
- std::shared_ptr<T> pop() {
- Node* old_head = head.load();
- while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
- return old_head ? old_head->data_ : std::shared_ptr<T>{};
- }
- };
复制代码 现在的代码仍旧有一个标题,pop 出节点后没有开释内存。由于在多线程环境下,你无法确定这个节点是否被其他线程持有,以是不能安全的删除。shared_ptr 只管理了 data,没有管理 Node
修复内存走漏
通过 pop 的线程数制止竞争
- template <typename T>
- class LockFreeStack {
- private:
- struct Node {
- //...
- };
- std::atomic<Node*> head{nullptr};
- std::atomic<unsigned> threads_in_pop{0};
- std::atomic<Node*> to_be_deleted{nullptr};
- static void delete_nodes(Node* node) {
- while (node) {
- auto next = node->next;
- delete node;
- node = next;
- }
- }
- void chain_pending_nodes(Node* nodes) {
- Node* last = nodes;
- while (Node* const next = last->next) {
- last = next;
- }
- chain_pending_nodes(nodes, last);
- }
- void chain_pending_nodes(Node* first, Node* last) {
- last->next = to_be_deleted.load();
- while (!to_be_deleted.compare_exchange_weak(last->next, first));
- }
- void try_reclaim(Node* node) {
- if (threads_in_pop == 1) {
- Node* nodes_to_delete = to_be_deleted.exchange(nullptr);
- if (!--threads_in_pop) {
- delete_nodes(nodes_to_delete);
- } else if (nodes_to_delete) {
- chain_pending_nodes(nodes_to_delete);
- }
- delete node;
- } else {
- if (node) {
- chain_pending_nodes(node);
- }
- --threads_in_pop;
- }
- }
- public:
- void push(T const& data) {
- //...
- }
- std::shared_ptr<T> pop() {
- threads_in_pop++;
- Node* old_head = head.load();
- while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
- std::shared_ptr<T> res;
- if (old_head) {
- res.swap(old_head->data_);
- }
- try_reclaim(old_head);
- return res;
- }
- };
复制代码 这个实现的焦点头脑是:如果没有其他线程在实行 pop,就可以安全的把节点删掉。
以是引入了 threads_in_pop 来监控 有几个线程,进入 pop 就加一,脱离就减一。
如果 threads_in_pop == 1 分析是只有一个线程,可以安全删除,但须要留意的是在 delete_nodes 前须要再判定一次,由于这中央有大概有其他线程进入 pop 然后利用了。
threads_in_pop == 1 的时间可以安全删掉当前pop出的节点,由于其他线程背面肯定不会访问到这个节点。
这个方案有个标题,在低并发的时间还能工作,但是高并发下面大概会失效。现在的方案是监控 pop 利用是否有其他线程,可以思量监控 节点是否有其他的线程在访问。
Hazard Pointer
hazard pointer 根本思绪是,如果一个线程要访问另一个线程大概想要删除的对象,它起首会设置一个伤害指针来指向该对象,告知另一个线程删除该对象确实会带来风险。一旦不再须要该对象,则扫除伤害指针。
Hazard Pointer 的工作原理可以压缩成三个利用:Protect(掩护)、Retire(退役)、Scan(扫描采取)。
- 掩护:线程声明我正在利用这个节点
- 退役:线程声明我已经把这个节点移除了,但是先不 free
- 扫描采取:查抄待删除链表内里的节点,如果没有线程在利用就开释
手写一个 hazard pointer
改进后的 pop 为例:- std::shared_ptr<T> pop() {
- std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();
- auto old_head = head.load();
- // protect
- do {
- Node* temp;
- do {
- temp = old_head;
- hp.store(old_head);
- old_head = head.load();
- } while (old_head != temp);
- } while (old_head &&
- !head.compare_exchange_strong(old_head, old_head->next));
- hp.store(nullptr);
- std::shared_ptr<T> res;
- if (old_head) {
- res.swap(old_head->data_);
- // retire
- if (outstanding_hazard_pointers_for(old_head)) {
- reclaim_later(old_head);
- } else {
- delete old_head;
- }
- // scan and delete
- delete_nodes_with_no_hazards();
- }
- return res;
- }
复制代码 获取 hazard pointer,hazard pointer 须要将线程和节点关联起来,而且生存到一个全部线程都可以访问到的地方:- unsigned const MAX_HAZARD_POINTERS = 100;
- struct HazardPointer {
- std::atomic<std::thread::id> id;
- std::atomic<void*> pointer;
- };
- HazardPointer hazard_pointers[MAX_HAZARD_POINTERS];
- class HpOwner {
- HazardPointer* hp;
- public:
- HpOwner(HpOwner const&) = delete;
- HpOwner operator=(HpOwner const&) = delete;
- HpOwner() : hp(nullptr) {
- for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {
- std::thread::id old_id;
- if (hazard_pointers[i].id.compare_exchange_strong(
- old_id, std::this_thread::get_id())) {
- hp = &hazard_pointers[i];
- break;
- }
- }
- if (!hp) {
- throw std::runtime_error("No hazard pointers available");
- }
- }
- std::atomic<void*>& get_pointer() { return hp->pointer; }
- ~HpOwner() {
- hp->pointer.store(nullptr);
- hp->id.store(std::thread::id());
- }
- };
- std::atomic<void*>& get_hazard_pointer_for_current_thread() {
- thread_local static HpOwner hazard;
- return hazard.get_pointer();
- }
复制代码 判定某个节点是否被其他线程持有:- bool outstanding_hazard_pointers_for(void* p) {
- for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {
- if (hazard_pointers[i].pointer.load() == p) {
- return true;
- }
- }
- return false;
- }
复制代码 retire 这个节点,也就是把这个节点插入一个待删除的链表内里:- template <typename T>
- void do_delete(void* p) {
- delete static_cast<T*>(p);
- }
- struct DataToReclaim {
- void* data;
- std::function<void(void*)> deleter;
- DataToReclaim* next;
- template <typename T>
- DataToReclaim(T* p) : data(p), deleter(&do_delete<T>), next(nullptr) {}
- ~DataToReclaim() { deleter(data); }
- };
- std::atomic<DataToReclaim*> nodes_to_reclaim;
- void add_to_reclaim_list(DataToReclaim* node) {
- node->next = nodes_to_reclaim.load();
- while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
- }
- template <typename T>
- void reclaim_later(T* data) {
- add_to_reclaim_list(new DataToReclaim(data));
- }
复制代码 扫描采取:- void delete_nodes_with_no_hazards() {
- auto current = nodes_to_reclaim.exchange(nullptr);
- while (current) {
- auto next = current->next;
- if (!outstanding_hazard_pointers_for(current->data)) {
- delete current;
- } else {
- add_to_reclaim_list(current);
- }
- current = next;
- }
- }
复制代码 这段代码是比力须要留意的:- //...
- do {
- Node* temp;
- do {
- temp = old_head;
- hp.store(old_head);
- old_head = head.load();
- } while (old_head != temp);
- } while (old_head &&
- !head.compare_exchange_strong(old_head, old_head->next));
- //...
复制代码 这里有两层循环:1. 内层循环,作用是确保 hp 掩护的节点照旧当前的 head。在获取 old_head (temp = old_head)和 hp.store(old_head) 之间大概存在竞态 2. 外层节点就是一个很正常的 pop 节点所做的利用,如果发现 old_head 被修改了则须要重新去获取头节点。这里由于失败会重新处置惩罚内层循环,以是 spurious failure 的代价比力昂贵,在这里用了 exchange_strong
现在这个版本的实现着实有很大的性能标题。由于每一次 pop 都会去扫描待删除链表,扫描是须要占用 CPU 资源的,而且差别线程去访问一个全局的链表是有竞争的。以是有两种办理办法:1. 空间换时间,不须要每次都去扫描,只有当链表里的节点凌驾肯定命目才须要删除 2. 每个线程有自己的链表,不须要去竞争同一个。
Hazard Pointer 内存上界
Hazard Pointer 是有明白的内存上界的。在恣意时候,体系中未被采取的退役节点数最多为:N * (R + K * N) N 是线程数,K 是每个线程的 hazard pointer 数目, R 是触发 scan 的域值。
这个上界的寄义是:每个线程最多持有 R 个未被 scan 的退役节点,加上最多有 K×N 个节点被其他线程的 hazard pointer 掩护着无法开释。通常 R 被设置为 2×K×N,以是上界约为 N × 3KN = 3KN²。对于 64 线程、K=2 的 M-S queue 场景,上界是 3×2×64² = 24576 个节点。如果每个节点 64 字节,这是约 1.5 MB 的"耽误开释"开销——在绝大多数场景下完全可以担当。
Hazard Pointer 为什么是安全的
Safety Invariant: 节点 P 被开释,当且仅当:(1) P 在某个线程的 retired_list 中;且 (2) 实行 scan 时,P 不在任何线程的 hazard pointer 中。
条件 (1) 要求 P 已经被从数据结构中逻辑删除(否则不会被 retire)。条件 (2) 要求没有线程声明正在利用 P。这两个条件一起包管了:如果一个线程正在利用 P(它的 hazard pointer 中有 P),P 就不会被开释。
这个稳定量的证明焦点在于 protect-then-validate 协议:如果一个线程 T 通过 hp.store(P)+ 验证 P 仍在数据结构中这两步都乐成了,那么在 T 扫除它的 hazard pointer 之前,P 肯定不会被开释——由于任何实行 scan 的线程在网络 hazard pointer 时肯定会看到 T 的声明(acquire-release 语义包管了可见性)。
看一个场景:
场景设定:队列状态 Head → A → B → C,线程 T1 实行 dequeue(目的是摘掉 A),线程 T2 也实行 dequeue 而且还会触发 scan。最伤害的交错是:T1 正在掩护 A 的过程中,T2 已经把 A retire 了而且开始 scan。
时候T1 (dequeuer)T2 (dequeuer + scanner)A 的状态t0head = Head.load()
→ 得到 A-在队列中t1hp[0].store(A)-在队列中,被T1掩护t2-head = Head.load()
→ 得到 A在队列中,被T1掩护t3-hp[0].store(A)被T1和T2掩护t4-验证 head == Head → 通过同上t5-next = A->next
→ 得到 B同上t6-CAS(Head, A, B)
→ 乐成逻辑删除
,仍被T1和T2掩护t7-hp[0].clear(); hp[1].clear()逻辑删除,仅被T1掩护t8-retire(A)
→ A进入retired_list退役,仅被T1掩护t9-scan()
开始:网络全部 hazard pointer退役,仅被T1掩护t10-scan 读取 T1.hp[0] → 发现值为 A退役,scan知道被掩护t11-A 在 protected_set 中,跳过不开释退役,安全生存t12验证 head == Head → 失败-退役,仅被T1掩护t13hp[0].clear()
→ 重新开始-退役,无人掩护t14-(下次 scan 发现 A 无掩护)退役,无人掩护t15-delete A
→ 安全开释已开释t9-t10 是整个协议的安全关键。 在 t9,T2 的 scan 开始网络全部线程的 hazard pointer。它读取 T1 的 hp[0],发现值为 A。这个读取利用 memory_order_acquire,共同 T1 在 t1 时 hp[0].store(A)利用的 memory_order_release,构成了一个 acquire-release 同步——T2 在 t10 看到的 A,肯定是 T1 在 t1 或之后存入的值,不会看到更早的 nullptr。完备的实现
- #include #include #include #include #include unsigned const MAX_HAZARD_POINTERS = 100;
- struct HazardPointer {
- std::atomic<std::thread::id> id;
- std::atomic<void*> pointer;
- };
- HazardPointer hazard_pointers[MAX_HAZARD_POINTERS];
- class HpOwner {
- HazardPointer* hp;
- public:
- HpOwner(HpOwner const&) = delete;
- HpOwner operator=(HpOwner const&) = delete;
- HpOwner() : hp(nullptr) {
- for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {
- std::thread::id old_id;
- if (hazard_pointers[i].id.compare_exchange_strong(
- old_id, std::this_thread::get_id())) {
- hp = &hazard_pointers[i];
- break;
- }
- }
- if (!hp) {
- throw std::runtime_error("No hazard pointers available");
- }
- }
- std::atomic<void*>& get_pointer() { return hp->pointer; }
- ~HpOwner() {
- hp->pointer.store(nullptr);
- hp->id.store(std::thread::id());
- }
- };
- std::atomic<void*>& get_hazard_pointer_for_current_thread() {
- thread_local static HpOwner hazard;
- return hazard.get_pointer();
- }bool outstanding_hazard_pointers_for(void* p) {
- for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {
- if (hazard_pointers[i].pointer.load() == p) {
- return true;
- }
- }
- return false;
- }template <typename T>
- void do_delete(void* p) {
- delete static_cast<T*>(p);
- }
- struct DataToReclaim {
- void* data;
- std::function<void(void*)> deleter;
- DataToReclaim* next;
- template <typename T>
- DataToReclaim(T* p) : data(p), deleter(&do_delete<T>), next(nullptr) {}
- ~DataToReclaim() { deleter(data); }
- };
- std::atomic<DataToReclaim*> nodes_to_reclaim;
- void add_to_reclaim_list(DataToReclaim* node) {
- node->next = nodes_to_reclaim.load();
- while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
- }
- template <typename T>
- void reclaim_later(T* data) {
- add_to_reclaim_list(new DataToReclaim(data));
- }void delete_nodes_with_no_hazards() {
- auto current = nodes_to_reclaim.exchange(nullptr);
- while (current) {
- auto next = current->next;
- if (!outstanding_hazard_pointers_for(current->data)) {
- delete current;
- } else {
- add_to_reclaim_list(current);
- }
- current = next;
- }
- }template class LockFreeStack { private: struct Node { std::shared_ptr data_; Node* next; Node(T const& data) : data_(std::make_shared(data)) {} }; std::atomic head{nullptr}; public: void push(T const& data) { Node* node = new Node(data); node->next = head.load(); while (!head.compare_exchange_weak(node->next, node)); } std::shared_ptr pop() { std::atomic& hp = get_hazard_pointer_for_current_thread(); auto old_head = head.load(); do { Node* temp; do { temp = old_head; hp.store(old_head); old_head = head.load(); } while (old_head != temp); } while (old_head && !head.compare_exchange_strong(old_head, old_head->next)); hp.store(nullptr); std::shared_ptr res; if (old_head) { res.swap(old_head->data_); if (outstanding_hazard_pointers_for(old_head)) { reclaim_later(old_head); } else { delete old_head; } delete_nodes_with_no_hazards(); } return res; }};int main() { LockFreeStack stack; constexpr int count = 10; std::atomic popped{0}; std::thread t([&] { for (int i = 0; i < count; ++i) { stack.push(i); } }); std::thread t1([&] { while (popped.load() < count) { auto value = stack.pop(); if (value) { popped.fetch_add(1); std::cout internal_count.fetch_sub(1) == 1) { delete ptr; } } }};
复制代码 这里利用了两个计数,一个外部计数和一个内部计数。外部计数是和指向节点的指针绑定在一起的,而内部计数是在节点内部。外部计数负责“安全拿到节点”,内部计数负责“节点摘下后耽误采取”
increase_head_count(old_head); 起首取头节点,而且立即给头节点的 external_count 加 1,表现线程正在利用这个指针不能立即 delete。
if (head.compare_exchange_strong(old_head, ptr->next)) 如果当前 head 仍旧是我刚刚增长过引用计数的 old_head,就把 head 改成 ptr->next。
int const count_increase = old_head.external_count - 2; 这里减掉两个引用计数,分别是:1. 当火线程通过 increase_head_count 增长的 2. head 自己就持有的谁人外部引用
ptr->internal_count.fetch_add(count_increase) == -count_increase 剩下的外部引用要转移到内部去,如果加上后便是0就可以删除这个节点。
else if (ptr->internal_count.fetch_sub(1) == 1) 这个分支是节点被其他线程取走了,减去内部计数。 以是如果许多线程都CAS失败了,内部引用就大概为负数。比及末了乐成 pop 的线程把外部计数归并进 internal_count 则会抵消。
以是,内部计数为负数的本质是: 失败线程先把“我不消了”记到 internal_count 里;乐成线程稍后把 external_count 转进来,两边抵消。负数只是中央状态。
Epoch-Based Reclamation
TODO
实现一个 lock-free queue
TODO
lock-free 的利益和弊端
利益:
- 进步并发,如果有锁一些线程会被 block,无锁下每个线程照旧连续运行
- robustness,整个体系不受持锁线程的影响,就算持锁线程 panic 了,体系仍旧可以继续运行
弊端:
- 编程难度大。没有锁的掩护,你没办法制止多个线程同时访问某个数据,你必须自己来确保数据结构的 invariants 在并发下 仍旧创建
- 制止 live lock。死锁是两个线程相互期待卡死的环境,无锁编程不会出现这种环境,但是会出现 live lock. live lock 是两个线程没有卡死,都在积极处置惩罚数据,但是没办法继续往前推进。
- 低沉体系的性能(看起来特别反直觉)。原子变量利用的性能是小于非原子变量的。而且原子变量须要在多个线程之间同步,这对 cpu 缓存非常不友爱(由于线程大概在差别的cpu核上,以是同步就须要让cache invalid)。如果在竞争很低的环境下乃至不如 mutex(mutex 有快路径,可以制止进入 futex wait)
live lock 的例子:- template <typename T>
- class LockFreeStack {
- private:
- struct Node;
- struct CountedNodePtr {
- int external_count;
- Node* ptr;
- };
- struct Node {
- std::shared_ptr<T> data_;
- std::atomic<int> internal_count;
- CountedNodePtr next;
- Node(T const& data) : data_(std::make_shared<T>(data)), internal_count(0) {}
- };
- std::atomic<CountedNodePtr> head;
- void increase_head_count(CountedNodePtr& old_counter) {
- CountedNodePtr new_counter;
- do {
- new_counter = old_counter;
- new_counter.external_count++;
- } while(!head.compare_exchange_strong(old_counter, new_counter));
- old_counter.external_count = new_counter.external_count;
- }
- public:
- LockFreeStack() : head(CountedNodePtr{0, nullptr}) {}
- void push(T const& data) {
- CountedNodePtr new_node;
- new_node.external_count = 1;
- new_node.ptr = new Node(data);
- new_node.ptr->next = head.load();
- while (!head.compare_exchange_weak(new_node.ptr->next, new_node));
- }
- std::shared_ptr<T> pop() {
- auto old_head = head.load();
- while(true) {
- increase_head_count(old_head);
- auto ptr = old_head.ptr;
- if (!ptr) {
- return std::shared_ptr<T>{};
- }
- if (head.compare_exchange_strong(old_head, ptr->next)) {
- std::shared_ptr<T> res;
- res.swap(ptr->data_);
- int const count_increase = old_head.external_count - 2;
- if (ptr->internal_count.fetch_add(count_increase) == -count_increase) {
- delete ptr;
- }
- return res;
- } else if (ptr->internal_count.fetch_sub(1) == 1) {
- delete ptr;
- }
- }
- }
- };
复制代码 会发生:- // thread 1
- mutex1.lock();
- if (!mutex2.try_lock()) {
- mutex1.unlock();
- retry();
- }
- // thread 2
- mutex1.lock();
- if (!mutex2.try_lock()) {
- mutex1.unlock();
- retry();
- }
复制代码 线程没有卡死,但是仍旧不停重试。
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |