⑴问题形貌
在多线程环境下,原始队列会出现各种不可预料的问题。以两个线程同时写入为例,假设线程 A 和线程 B 同时对原始队枚举行写入操作。首先看原始队列的入队伪代码:void Enqueue(Node *node){m_Tail->next = node;m_Tail = node;},这个操作分为两步。当两个线程同时执行时,可能出现这样的环境:线程 A 执行完第一步m_Tail->next = nodeC后,线程 B 开始执行并完成了整个入队操作,接着线程 A 继续执行第二步m_Tail = nodeB,这就导致了 Tail 指针失去与队列的链接,后加的节点从 Head 开始就访问不到了。这种环境会使得队列的状态变得混乱,无法包管数据的正确存储和读取。 ⑵解决方法
为了解决上述问题,可以使用原子操作实现无锁同步。原子操作是不可分割的操作,CPU 的一个线程在执行原子操作时,不会被其他线程中断或抢占。其中,典范的原子操作有 Load / Store(读取与生存)、Test and Set(针对 bool 变量,假如为 true 则返回 true,假如为 false,则将变量置为 true 并返回 false)、Clear(将 bool 变量设为 false)、Exchange(将指定位置的值设置为传入值,并返回其旧值)等。
而 CAS(Compare And Swap)在实现无锁同步中起着关键作用。CAS 操作包含三个参数:一个内存地址 V、一个期望值 A 和一个新值 B。当执行 CAS 操作时,假如当前内存地址 V 中存储的值等于期望值 A,则将新值 B 写入该内存地址,并返回 true;否则,不做任何修改,并返回 false。在无锁队列中,可以使用 CAS 操作来确保对 Head 或 Tail 指针的读写操作是原子性的,从而制止多线程同时写入或读取时出现的指针混乱问题。比方,在入队操作中,可以使用 CAS 来确保在更新 Tail 指针时,不会被其他线程干扰。假如当前 Tail 指针指向的节点的_next指针与期望值不同等,说明有其他线程举行了写入操作,此时可以重新实行 CAS 操作,直到成功为止。这样就可以实现无锁队列的安全写入和读取操作。
2.3原子操作详解
原子操作是无锁队列实现的基础。在计算机科学中,原子操作是指不会被线程调度机制打断的操作,一旦开始,就会不绝运行到竣事,中间不会发生上下文切换到另一个线程的环境 。这意味着在多线程环境下,多个线程对共享资源的原子操作是互斥的,不会出现数据竞争或不同等的问题。
常见的原子操作类型包括原子读(Atomic Read)、原子写(Atomic Write)、原子加(Atomic Add)、原子减(Atomic Subtract)以及原子比较交换(Atomic Compare and Swap,即 CAS)等 。原子读和原子写操作包管了对内存中数据的读取和写入是原子性的,不会出现读取或写入一半数据的环境。而原子加和原子减操作则常用于对共享计数器等资源的操作,确保在多线程环境下计数器的增减操作是安全的。
在 C++ 中,原子操作可以通过<atomic>头文件来实现。<atomic>头文件提供了一系列的原子类型和原子操作函数,比方std::atomic<int>表现一个原子整型,std::atomic<bool>表现一个原子布尔型等。通过这些原子类型,我们可以方便地在多线程环境下举行原子操作,比方:
#include <iostream>
#include <atomic>
#include <thread>
std::atomic<int> counter(0);
void increment() {
for (int i = 0; i < 1000; ++i) {
counter++;
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final counter value: " << counter << std::endl;
return 0;
}
复制代码
在上述代码中,counter是一个原子整型变量,counter++操作是一个原子操作,因此在多线程环境下,两个线程对counter的递增操作不会出现数据竞争的问题,最终输出的结果是正确的。
2.4Compare - And - Swap(CAS)操作
CAS 操作是无锁队列实现的核心技能之一。它是一种原子的比较并交换操作,包含三个参数:内存位置(V)、预期原值(A)和新值(B) 。其工作原理是:首先检查内存位置 V 的值是否等于预期原值 A,假如相称,则将内存位置 V 的值更新为新值 B,否则不举行任何操作。整个过程是原子性的,即不会被其他线程干扰。
在无锁队列中,CAS 操作起着关键作用。以入队操作为例,假设当前有两个线程同时实行将元素入队。每个线程都会先获取队列尾指针的当前值(即预期原值 A),然后实行将新元素链接到尾指针的背面,并将尾指针更新为新元素的位置(即新值 B) 。在这个过程中,只有一个线程的 CAS 操作会成功,由于只有当尾指针的当前值等于该线程获取的预期原值 A 时,CAS 操作才会将尾指针更新为新值 B。假如另一个线程在第一个线程执行 CAS 操作之前已经更新了尾指针,那么第二个线程的 CAS 操作就会失败,由于此时尾指针的当前值已经不等于它获取的预期原值 A 了。
在 C++ 中,<atomic>头文件中的compare_exchange_weak和compare_exchange_strong函数提供了 CAS 操作的实现。compare_exchange_weak函数在某些环境下可能会出现 “伪失败”,即固然内存位置的值与预期原值相称,但函数仍旧返回失败,这是由于硬件实现的限制 。而compare_exchange_strong函数则包管不会出现 “伪失败”,但在某些平台上可能效率稍低。下面是一个使用compare_exchange_weak函数实现的简单示例:
#include <iostream>
#include <atomic>
std::atomic<int> value(5);
int main() {
int expected = 5;
int new_value = 10;
if (value.compare_exchange_weak(expected, new_value)) {
std::cout << "Value was not updated. Current value is " << value << std::endl;
}
return 0;
}
复制代码
在上述代码中,value.compare_exchange_weak(expected, new_value)实行将value的值从expected(初始值为 5)更新为new_value(值为 10)。假如value的值当前确实为 5,那么更新操作会成功,输出 “Value updated successfully to 10”;否则,输出 “Value was not updated. Current value is ” 加上当前value的值。
(1)GGC对CAS支持,GCC4.1+版本中支持CAS原子操作。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
复制代码
(2)Windows对CAS支持,Windows中使用Windows API支持CAS。
LONG InterlockedCompareExchange(
LONG volatile *Destination,
LONG ExChange,
LONG Comperand
);
复制代码
(3)C11对CAS支持,C11 STL中atomic函数支持CAS并可以跨平台。
template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
复制代码
别的原子操作如下:
Fetch-And-Add:一样平常用来对变量做+1的原子操作
Test-and-set:写值到某个内存位置并传回其旧值
2.5无锁队列的实现思路
无锁队列的实现通常基于链表或数组结构,并团结 CAS 操作来实现线程安全的入队和出队操作。
基于链表的无锁队列,每个节点包含数据和指向下一个节点的指针。入队操作时,线程首先创建一个新节点,然后通过 CAS 操作将新节点链接到队列的尾部。详细来说,线程先获取尾指针的当前值,实行将新节点的指针指向尾指针的下一个节点(初始时为nullptr),并通过 CAS 操作将尾指针更新为新节点 。假如 CAS 操作失败,说明尾指针在这期间被其他线程更新了,线程需要重新获取尾指针并再次实行。
出队操作时,线程首先检查头指针的下一个节点是否存在(由于头指针通常是一个哑节点,不存储现实数据)。假如存在,线程实行通过 CAS 操作将头指针更新为下一个节点,从而实现出队 。假如 CAS 操作失败,说明头指针在这期间被其他线程更新了,线程需要重新检查并实行。
下面是一个简化的基于链表的无锁队列的 C++ 实现示例:
} while (!head.compare_exchange_weak(oldHead, next));
result = std::move(next->data);
delete oldHead;
return true;
}
};
复制代码
在上述代码中,LockFreeQueue类实现了一个基于链表的无锁队列。enqueue方法用于将元素入队,dequeue方法用于将元素出队。在enqueue方法中,通过循环和 CAS 操作确保新节点能够正确地添加到队列尾部;在dequeue方法中,同样通过循环和 CAS 操作确保能够安全地从队列头部取出元素。
基于数组的无锁队列,通常采用循环缓冲区(Circular Buffer)的方式实现。数组被视为一个环形结构,通过两个指针(头指针和尾指针)来表现队列的开始和竣事位置 。入队时,线程通过 CAS 操作更新尾指针,并将元素放入相应位置;出队时,线程通过 CAS 操作更新头指针,并取出对应位置的元素。
下面是一个简化的基于数组的无锁队列的 C++ 实现示例:
在上述代码中,CircularLockFreeQueue类实现了一个基于数组的环形无锁队列。enqueue方法用于将元素入队,通过检查尾指针的下一个位置是否为头指针来判断队列是否已满,假如未满则将元素放入相应位置并更新尾指针;dequeue方法用于将元素出队,通过检查头指针是否等于尾指针来判断队列是否为空,假如不为空则取出对应位置的元素并更新头指针。
无论是基于链表还是数组的无锁队列,其核心头脑都是使用原子操作和 CAS 机制来确保在多线程环境下的安全和高效。但在现实应用中,还需要考虑诸如 ABA 问题(即一个值从 A 变为 B,再变回 A,CAS 操作可能会误判)等复杂环境,通常可以通过引入版本号或其他机制来解决 。
三、C++中无锁队列的实现方式
} while (!head.compare_exchange_weak(oldHead, next));
result = std::move(next->data);
delete oldHead;
return true;
}
};
复制代码
在上述代码中,LockFreeQueue类实现了一个基于链表的无锁队列。enqueue方法用于将元素入队,dequeue方法用于将元素出队。在enqueue方法中,通过循环和 CAS 操作确保新节点能够正确地添加到队列尾部;在dequeue方法中,同样通过循环和 CAS 操作确保能够安全地从队列头部取出元素。
3.3基于数组的无锁队列实现
基于数组的无锁队列通常采用循环缓冲区(Circular Buffer)的方式实现。这种方式将数组视为一个环形结构,通过两个指针(头指针和尾指针)来表现队列的开始和竣事位置,使用原子操作实现多线程安全的入队和出队操作。以下是一个简化的基于数组的无锁队列的 C++ 实现示例: