马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
BRPC Work Stealing Queue
1 核心功能
无锁工作窃取队列(Lock-Free Work Stealing Queue),用于在 BRPC 的 bthread 用户态线程库中支持高效的任务调理。
设计目的:
- 高并发性能:允许线程本地快速存取任务,淘汰锁竞争。
- 负载均衡:空闲线程可“窃取”其他线程队列中的任务,提升 CPU 利用率。
2 关键数据布局
2.1 队列布局
- template <typename T>
- class WorkStealingQueue {
- private:
- std::atomic<size_t> _bottom; // 本地线程操作的队尾
- std::atomic<size_t> _top; // 其他线程窃取的队头
- T* _array; // 环形缓冲区
- size_t _capacity; // 队列容量
- };
复制代码
- 环形缓冲区:通过 _array 存储任务,容量固定(通常为 2 的幂次,便于位运算优化)。
- 原子指针:_bottom 和 _top 使用原子操作包管线程安全。
2.2 内存布局优化
- 缓存行对齐:_bottom 和 _top 大概被放置在差别缓存行,避免伪共享(False Sharing)。
- alignas(64) std::atomic<size_t> _bottom;
- alignas(64) std::atomic<size_t> _top;
复制代码 3 核心操作
3.1 本地线程操作(非线程安全)
- push(T item):向队尾插入任务。
- bool push(T item) {
- size_t b = _bottom.load(std::memory_order_relaxed);
- size_t t = _top.load(std::memory_order_acquire);
- if (b - t >= _capacity) return false; // 队列满
- _array[b % _capacity] = item;
- _bottom.store(b + 1, std::memory_order_release);
- return true;
- }
复制代码 - pop(T* item):从队尾取出任务(本地线程独占)。
- bool pop(T* item) {
- size_t b = _bottom.load(std::memory_order_relaxed) - 1;
- _bottom.store(b, std::memory_order_relaxed);
- std::atomic_thread_fence(std::memory_order_seq_cst);
- size_t t = _top.load(std::memory_order_relaxed);
- if (t <= b) { // 队列非空
- *item = _array[b % _capacity];
- if (t != b) return true; // 队列仍有元素
- // 最后一个元素,检查竞争
- if (_top.compare_exchange_strong(t, t + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed)) {
- _bottom.store(b + 1, std::memory_order_relaxed);
- return true;
- }
- _bottom.store(b + 1, std::memory_order_relaxed);
- return false;
- }
- _bottom.store(b + 1, std::memory_order_relaxed);
- return false;
- }
复制代码 3.2 窃取操作(线程安全)
- steal(T* item):从队头窃取任务(多线程并发安全)。
- bool steal(T* item) {
- size_t t = _top.load(std::memory_order_acquire);
- std::atomic_thread_fence(std::memory_order_seq_cst);
- size_t b = _bottom.load(std::memory_order_acquire);
- if (t >= b) return false; // 队列空
- *item = _array[t % _capacity];
- if (!_top.compare_exchange_strong(t, t + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed)) {
- return false; // 竞争失败
- }
- return true;
- }
复制代码 4 设计亮点
4.1 无锁原子操作
- compare_exchange_strong:用于更新 _top 指针,确保窃取操作的原子性。
- 内存序控制:
- push 使用 release 包管写入可见性。
- steal 使用 acquire 和 seq_cst 确保读取次序。
4.2 环形缓冲区优化
- 位运算代替取模:若容量为 2 的幂次,b % _capacity 可简化为 b & (_capacity - 1),提升性能。
- 快速失败查抄:队列满/空时直接返回,避免无效操作。
4.3 线程角色分离
- 本地线程(Owner):仅操作 _bottom,无需原子同步(relaxed 内存序)。
- 窃取线程(Thief):操作 _top,需严格同步(acquire/release)。
5 性能优化
- 低竞争场景高效:本地线程的 push/pop 几乎无原子开销。
- 批量窃取:可扩展为一次窃取多个任务,淘汰竞争频率。
- 动态扩容:当前实现容量固定,实际工程中可结合 std::vector 实现动态扩容。
6 应用场景
- bthread 调理器:每个 Worker 线程维护一个任务队列,空闲线程窃取任务。
- 并行计算:如 Fork-Join 模型中的任务分发。
- 事件驱动框架:处置惩罚异步 I/O 任务的负载均衡。
7 潜在问题与改进
7.1 ABA 问题
- 风险:若 _top 指针被多次修改后回到原值,compare_exchange_strong 大概错误成功。
- 解决方案:使用带版本号的指针(如 (top << 16) | version)。
7.2 容量限定
- 固定容量:队列满时需由调用方处置惩罚(如丢弃任务或动态扩容)。
- 改进方案:实现动态扩容(需原子地替换 _array 和 _capacity)。
7.3 伪共享
- 缓存行竞争:_bottom 和 _top 需对齐到差别缓存行(代码中大概已通过 alignas 实现)。
8 代码示例(简化版)
- template <typename T>
- class WorkStealingQueue {
- public:
- WorkStealingQueue(size_t capacity) :
- _bottom(0), _top(0), _capacity(next_pow2(capacity)) {
- _array = new T[_capacity];
- }
- bool push(T item) {
- size_t b = _bottom.load(std::memory_order_relaxed);
- if (b - _top.load(std::memory_order_acquire) >= _capacity)
- return false;
- _array[b & (_capacity - 1)] = item;
- _bottom.store(b + 1, std::memory_order_release);
- return true;
- }
- bool steal(T* item) {
- size_t t = _top.load(std::memory_order_acquire);
- size_t b = _bottom.load(std::memory_order_acquire);
- if (t >= b) return false;
- *item = _array[t & (_capacity - 1)];
- return _top.compare_exchange_strong(t, t + 1,
- std::memory_order_seq_cst,
- std::memory_order_relaxed);
- }
- };
复制代码 9 总结
work_stealing_queue.h 是 BRPC 高性能任务调理的核心组件,通过无锁环形缓冲区和原子操作实现高效的任务存取与窃取。其设计充分优化了本地线程操作的性能,同时通过精致的内存序控制包管窃取操作的线程安全。实用于高并发场景下的负载均衡,但需注意容量限定和 ABA 问题的潜在风险。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |