深入探讨C++多线程性能优化

打印 上一主题 下一主题

主题 911|帖子 911|积分 2733

深入探讨C++多线程性能优化

在当代软件开辟中,多线程编程已成为提升应用程序性能和相应速度的关键技术之一。尤其在C++领域,多线程编程不仅能充分利用多核处置处罚器的优势,还能显著提高计算密集型使命的服从。然而,多线程编程也带来了诸多挑战,特别是在性能优化方面。本文将深入探讨影响C++多线程性能的一些关键因素,比较锁机制与原子操作的性能。通过这些内容,盼望能为开辟者提供有代价的见解和实用的优化策略,助力于更高效的多线程编程实践。
先在开头给一个例子,你以为下面这段benchmark代码效果会是怎样的。这里的逻辑很简朴,将0-20000按线程切成n片,每个线程在一个Set里查找这个数字存不存在,存在则计数+1。
  1. #include <benchmark/benchmark.h>
  2. #include <iostream>
  3. #include <memory>
  4. #include <mutex>
  5. #include <unordered_set>
  6. constexpr int kSetSize = 10000;
  7. class MyBenchmark : public benchmark::Fixture {
  8. public:
  9.   void SetUp(const ::benchmark::State& state) override {
  10.     std::call_once(flag, [this]() {
  11.       s = std::make_shared<std::unordered_set<int>>();
  12.       for (int i = 0; i < kSetSize; i++) {
  13.         s->insert(i);
  14.       }
  15.     });
  16.   }
  17.   std::shared_ptr<std::unordered_set<int>> GetSet() { return s; }
  18. private:
  19.   std::shared_ptr<std::unordered_set<int>> s;
  20.   std::once_flag flag;
  21. };
  22. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
  23.   for (auto _ : state) {
  24.     int size_sum = kSetSize * 2;
  25.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  26.     int sum = 0;
  27.     int start = state.thread_index() * size_per_thread;
  28.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  29.     for (int i = start; i < end; i++) {
  30.       auto inst = GetSet();
  31.       if (inst->count(i) > 0) {
  32.         sum++;
  33.       }
  34.     }
  35.   }
  36. }
  37. // 注册基准测试,并指定线程数
  38. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
  39. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
  40. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
  41. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
  42. BENCHMARK_MAIN();
复制代码
跑出来的效果如下:

将使命切分成多个片断并交由多线程执行后,团体性能不仅没有提升,反而降落了,且性能与线程数成反比。那么,题目来了:导致这种效果的原因是什么?怎样才华实现合理的并行执行,从而低落CPU的执行时间?在接下来的部门,笔者将为你揭示答案。
影响多线程性能的因素

笔者以为,影响多线程性能的重要因素有以下两个:
 1.Lock Contention。
 2.Cache Coherency。
Lock Contention对应使用锁来处置处罚多线程同步题目的场景,而Cache Coherency则对应使用原子操作来处置处罚多线程同步题目的场景。
Lock Contention

在多线程环境中,多个线程同时实验获取同一个锁(Lock)时,会发生竞争征象,这就是所谓的锁竞争(Lock Contention)。锁竞争会导致线程或进程被阻塞,等待锁被释放,从而影响系统的性能和相应时间。大多数情况下,开辟人员会选择使用锁来办理线程间的同步题目,因此锁竞争题目也变得广为人知且轻易明白。由于锁的存在,位于临界区的代码在同一时候只能由一个线程执行。因此,优化的思路就是尽量制止多个线程同时访问同一资源。常见的优化方向有两种:
 1.淘汰临界区大小:临界区越小,这段代码的执行时间就越短,从而在团体程序运行时间中所占的比例也越小,冲突也就越少。
 2.对共享资源进行分桶操作:每个线程只会在某个桶上访问资源,理想情况下,每个线程都会访问不同的桶,这样就不会有冲突。
淘汰临界区大小需要开辟者对自己的代码进行细致思考,将不必要的操作放在临界区外,比方一些初始化和内存分配操作。
对共享资源进行分桶操作在工程实践中也非经常见。比方,LevelDB的LRUCache中,每个Key只会固定在一个桶上。如果hash函数充足优秀且数据分布充足随机,这种方法可以大大提高LRUCache的性能。
Cache Coherency

缓存一致性(Cache Coherency)是指在多处置处罚器系统中,确保各个处置处罚器的缓存中的数据保持一致的机制。由于当代计算机系统通常包罗多个处置处罚器,每个处置处罚器都有自己的缓存(如L1、L2、L3缓存),因此在并发访问共享内存时,可能会出现缓存数据不一致的题目。缓存一致性协议旨在办理这些题目,确保所有处置处罚器在访问共享内存时看到的是一致的数据。
当我们对一个共享变量进行写入操作时,实际上需要通过缓存一致性协议将该变量的更新同步到其他线程的缓存中,否则可能会读到不一致的值。实际上,这个同步过程的单位是一个缓存行(Cache Line),而且同步过程相对较慢,由于涉及到跨核通讯。
由此引申出两个严重影响性能的征象:
 1.Cache Ping-Pong。
 2.False Sharing。
Cache Ping-Pong

缓存乒乓效应(Cache Ping-Pong)是指在多处置处罚器系统中,多个处置处罚器频繁地对同一个缓存行(Cache Line)进行读写操作,导致该缓存行在不同处置处罚器的缓存之间频繁地来回通报。这种征象会导致系统性能降落,由于缓存行的频繁通报会引起大量的缓存一致性流量和处置处罚器间通讯开销。
讲到这里,其实就可以解释为什么开头那段代码会随着线程数量的增长而性能反而降落。代码中的变量 s 是一个共享资源,但它使用了 shared_ptr。在复制 shared_ptr 时,会引起引用计数的增长(计数+1),多个线程频繁对同一个缓存行进行读写操作,从而引发缓存乒乓效应,导致性能降落。最简朴的修改方式就是去掉 shared_ptr,代码如下,同时还可以得到我们预期的效果,即CPU时间随着线程数的增长而低落:
  1. #include <benchmark/benchmark.h>
  2. #include <iostream>
  3. #include <memory>
  4. #include <mutex>
  5. #include <unordered_set>
  6. constexpr int kSetSize = 10000;
  7. class MyBenchmark : public benchmark::Fixture {
  8. public:
  9.   void SetUp(const ::benchmark::State& state) override {
  10.     std::call_once(flag, [this]() {
  11.       for (int i = 0; i < kSetSize; i++) {
  12.         s.insert(i);
  13.       }
  14.     });
  15.   }
  16.   const std::unordered_set<int>& GetSet() { return s; }
  17. private:
  18.   std::unordered_set<int> s;
  19.   std::once_flag flag;
  20. };
  21. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
  22.   for (auto _ : state) {
  23.     int size_sum = kSetSize * 2;
  24.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  25.     int sum = 0;
  26.     int start = state.thread_index() * size_per_thread;
  27.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  28.     for (int i = start; i < end; i++) {
  29.       const auto& inst = GetSet();
  30.       if (inst.count(i) > 0) {
  31.         benchmark::DoNotOptimize(sum++);
  32.       }
  33.     }
  34.   }
  35. }
  36. // 注册基准测试,并指定线程数
  37. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
  38. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
  39. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
  40. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
  41. BENCHMARK_MAIN();
复制代码

False Sharing

伪共享(False Sharing)实际上是一种特殊的缓存乒乓效应(Cache Ping-Pong)。它指的是在多处置处罚器系统中,多个处置处罚器访问不同的数据,但这些数据恰好位于同一个缓存行中,导致该缓存行在不同处置处罚器的缓存之间频繁通报。尽管处置处罚器访问的是不同的数据,但由于它们共享同一个缓存行,仍旧会引发缓存一致性流量,导致性能降落。
为了更好地明白这一征象,我们可以对上面的代码进行一些修改。假设我们使用一个 vector<atomic> 来记录不同线程的 sum 值,这样固然不同线程修改的是不同的sum,但是照旧在一个缓存行上。使用 atomic 是为了强制触发缓存一致性协议,否则操作系统可能会进行优化,不会立即将修改反映到主存。
  1. #include <benchmark/benchmark.h>
  2. #include <atomic>
  3. #include <iostream>
  4. #include <memory>
  5. #include <mutex>
  6. #include <unordered_set>
  7. constexpr int kSetSize = 10000;
  8. class MyBenchmark : public benchmark::Fixture {
  9. public:
  10.   void SetUp(const ::benchmark::State& state) override {
  11.     std::call_once(flag, [this, &state]() {
  12.       for (int i = 0; i < kSetSize; i++) {
  13.         s.insert(i);
  14.       }
  15.       sums = std::vector<std::atomic<int>>(state.threads());
  16.     });
  17.   }
  18.   const std::unordered_set<int>& GetSet() { return s; }
  19.   std::vector<std::atomic<int>>& GetSums() { return sums; }
  20. private:
  21.   std::unordered_set<int> s;
  22.   std::once_flag flag;
  23.   std::vector<std::atomic<int>> sums;
  24. };
  25. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
  26.   for (auto _ : state) {
  27.     int size_sum = kSetSize;
  28.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  29.     int sum = 0;
  30.     int start = state.thread_index() * size_per_thread;
  31.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  32.     for (int i = start; i < end; i++) {
  33.       const auto& inst = GetSet();
  34.       if (inst.count(i) > 0) {
  35.         benchmark::DoNotOptimize(GetSums()[state.thread_index()]++);
  36.       }
  37.     }
  38.   }
  39. }
  40. // 注册基准测试,并指定线程数
  41. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
  42. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
  43. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
  44. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
  45. BENCHMARK_MAIN();
复制代码

可以看到,尽管不同线程没有使用同一个变量,但由于 sums 里面的元素共享同一个缓存行(Cache Line),同样会导致性能急剧降落。
针对这种情况,只要我们将 sums 中的元素隔离,使它们不在同一个缓存行上,就不会引发这个题目。一样平常来说,缓存行的大小为64字节,我们可以使用一个类填充到64个字节来实现隔离。优化后的代码如下:
  1. #include <benchmark/benchmark.h>
  2. #include <atomic>
  3. #include <iostream>
  4. #include <memory>
  5. #include <mutex>
  6. #include <unordered_set>
  7. constexpr int kSetSize = 10000;
  8. struct alignas(64) PaddedCounter {
  9.   std::atomic<int> value{0};
  10.   char padding[64 - sizeof(std::atomic<int>)];  // 填充到缓存行大小
  11. };
  12. class MyBenchmark : public benchmark::Fixture {
  13. public:
  14.   void SetUp(const ::benchmark::State& state) override {
  15.     std::call_once(flag, [this, &state]() {
  16.       for (int i = 0; i < kSetSize; i++) {
  17.         s.insert(i);
  18.       }
  19.       sums = std::vector<PaddedCounter>(state.threads());
  20.     });
  21.   }
  22.   const std::unordered_set<int>& GetSet() { return s; }
  23.   std::vector<PaddedCounter>& GetSums() { return sums; }
  24. private:
  25.   std::unordered_set<int> s;
  26.   std::once_flag flag;
  27.   std::vector<PaddedCounter> sums;
  28. };
  29. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {
  30.   for (auto _ : state) {
  31.     int size_sum = kSetSize;
  32.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  33.     int sum = 0;
  34.     int start = state.thread_index() * size_per_thread;
  35.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  36.     for (int i = start; i < end; i++) {
  37.       const auto& inst = GetSet();
  38.       if (inst.count(i) > 0) {
  39.         benchmark::DoNotOptimize(GetSums()[state.thread_index()].value++);
  40.       }
  41.     }
  42.   }
  43. }
  44. // 注册基准测试,并指定线程数
  45. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
  46. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
  47. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
  48. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);
  49. BENCHMARK_MAIN();
复制代码

Lock VS Atomic

Lock Atomic Benchmark

很多人都以为锁(lock)比原子操作(atomic)要更慢,那么实际上真的是这样吗?下面我们通过两个测试来进行对比。
公平起见,我们将使用一个基于 atomic 变量实现的自旋锁(SpinLock)与 std::mutex 进行性能对比。自旋锁的实现摘自 Folly 库。其原理是使用一个 atomic 变量来标志是否被占用,并使用 acquire-release 内存序来保证临界区的精确性。在冲突过大时,自旋锁会使用 sleep 让出 CPU。代码如下:
  1. #pragma once
  2. #include <atomic>
  3. #include <cstdint>
  4. class Sleeper {
  5.   static const uint32_t kMaxActiveSpin = 4000;
  6.   uint32_t spin_count_;
  7. public:
  8.   constexpr Sleeper() noexcept : spin_count_(0) {}
  9.   inline __attribute__((always_inline)) static void sleep() noexcept {
  10.     struct timespec ts = {0, 500000};
  11.     nanosleep(&ts, nullptr);
  12.   }
  13.   inline __attribute__((always_inline)) void wait() noexcept {
  14.     if (spin_count_ < kMaxActiveSpin) {
  15.       ++spin_count_;
  16. #ifdef __x86_64__
  17.       asm volatile("pause" ::: "memory");
  18. #elif defined(__aarch64__)
  19.       asm volatile("yield" ::: "memory");
  20. #else
  21.       // Fallback for other architectures
  22. #endif
  23.     } else {
  24.       sleep();
  25.     }
  26.   }
  27. };
  28. class SpinLock {
  29.   enum { FREE = 0, LOCKED = 1 };
  30. public:
  31.   constexpr SpinLock() : lock_(FREE) {}
  32.   inline __attribute__((always_inline)) bool try_lock() noexcept { return cas(FREE, LOCKED); }
  33.   inline __attribute__((always_inline)) void lock() noexcept {
  34.     Sleeper sleeper;
  35.     while (!try_lock()) {
  36.       do {
  37.         sleeper.wait();
  38.       } while (AtomicCast(&lock_)->load(std::memory_order_relaxed) == LOCKED);
  39.     }
  40.   }
  41.   inline __attribute__((always_inline)) void unlock() noexcept {
  42.     AtomicCast(&lock_)->store(FREE, std::memory_order_release);
  43.   }
  44. private:
  45.   inline __attribute__((always_inline)) bool cas(uint8_t compare, uint8_t new_val) noexcept {
  46.     return AtomicCast(&lock_)->compare_exchange_strong(compare, new_val, std::memory_order_acquire,
  47.                                                        std::memory_order_relaxed);
  48.   }
  49.   inline __attribute__((always_inline)) static std::atomic<uint8_t>* AtomicCast(uint8_t* value) {
  50.     return reinterpret_cast<std::atomic<uint8_t>*>(value);
  51.   }
  52. private:
  53.   uint8_t lock_;
  54. };
复制代码
在第一个benchmark中,我们测试了无竞争情况下的性能。也就是说,原子变量的CAS操作只会执行一次,不会进入 sleep 状态。在这种情况下,自旋锁(SpinLock)等价于一次原子 set 操作。
代码如下:
  1. #include <benchmark/benchmark.h>
  2. #include <atomic>
  3. #include <iostream>
  4. #include <memory>
  5. #include <mutex>
  6. #include <unordered_set>
  7. #include "spin_lock.h"
  8. constexpr int kSetSize = 10000;
  9. // struct alignas(64) PaddedCounter {
  10. //   std::atomic<int> value{0};
  11. //   char padding[64 - sizeof(std::atomic<int>)];  // 填充到缓存行大小
  12. // };
  13. struct alignas(64) PaddedCounterLock {
  14.   int value{0};
  15.   char padding[64 - sizeof(std::atomic<int>)];  // 填充到缓存行大小
  16. };
  17. class MyBenchmark : public benchmark::Fixture {
  18. public:
  19.   void SetUp(const ::benchmark::State& state) override {
  20.     std::call_once(flag, [this, &state]() {
  21.       for (int i = 0; i < kSetSize; i++) {
  22.         s.insert(i);
  23.       }
  24.       sums_atomic = std::vector<PaddedCounterLock>(state.threads());
  25.       sum_lock = std::vector<PaddedCounterLock>(state.threads());
  26.     });
  27.   }
  28.   const std::unordered_set<int>& GetSet() { return s; }
  29.   std::vector<PaddedCounterLock>& GetSumsAtomic() { return sums_atomic; }
  30.   std::vector<PaddedCounterLock>& GetSumLock() { return sum_lock; }
  31. private:
  32.   std::unordered_set<int> s;
  33.   std::once_flag flag;
  34.   std::vector<PaddedCounterLock> sums_atomic;
  35.   std::vector<PaddedCounterLock> sum_lock;
  36. };
  37. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {
  38.   SpinLock m;
  39.   for (auto _ : state) {
  40.     int size_sum = kSetSize;
  41.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  42.     int sum = 0;
  43.     int start = state.thread_index() * size_per_thread;
  44.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  45.     for (int i = start; i < end; i++) {
  46.       const auto& inst = GetSet();
  47.       if (inst.count(i) > 0) {
  48.         std::lock_guard<SpinLock> lg(m);
  49.         benchmark::DoNotOptimize(GetSumsAtomic()[state.thread_index()].value++);
  50.       }
  51.     }
  52.   }
  53. }
  54. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {
  55.   std::mutex m;
  56.   for (auto _ : state) {
  57.     int size_sum = kSetSize;
  58.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  59.     int sum = 0;
  60.     int start = state.thread_index() * size_per_thread;
  61.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  62.     for (int i = start; i < end; i++) {
  63.       const auto& inst = GetSet();
  64.       if (inst.count(i) > 0) {
  65.         std::lock_guard<std::mutex> lg(m);
  66.         benchmark::DoNotOptimize(GetSumLock()[state.thread_index()].value++);
  67.       }
  68.     }
  69.   }
  70. }
  71. // 注册基准测试,并指定线程数
  72. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
  73. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
  74. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
  75. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);
  76. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
  77. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
  78. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
  79. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);
  80. BENCHMARK_MAIN();
复制代码
benchmark效果:

第二个benchmark是对比竞争激烈时的性能,代码如下:
  1. #include <benchmark/benchmark.h>
  2. #include <atomic>
  3. #include <iostream>
  4. #include <memory>
  5. #include <mutex>
  6. #include <unordered_set>
  7. #include "spin_lock.h"
  8. constexpr int kSetSize = 10000;
  9. class MyBenchmark : public benchmark::Fixture {
  10. public:
  11.   void SetUp(const ::benchmark::State& state) override {
  12.     std::call_once(flag, [this, &state]() {
  13.       for (int i = 0; i < kSetSize; i++) {
  14.         s.insert(i);
  15.       }
  16.       count_ = 0;
  17.     });
  18.   }
  19.   const std::unordered_set<int>& GetSet() { return s; }
  20.   void SpinLockAndAdd() {
  21.     std::lock_guard<SpinLock> lg(m1_);
  22.     count_++;
  23.   }
  24.   void MutexLockAndAdd() {
  25.     std::lock_guard<std::mutex> lg(m2_);
  26.     count_++;
  27.   }
  28. private:
  29.   std::unordered_set<int> s;
  30.   std::once_flag flag;
  31.   uint32_t count_;
  32.   SpinLock m1_;
  33.   std::mutex m2_;
  34. };
  35. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {
  36.   for (auto _ : state) {
  37.     int size_sum = kSetSize;
  38.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  39.     int sum = 0;
  40.     int start = state.thread_index() * size_per_thread;
  41.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  42.     for (int i = start; i < end; i++) {
  43.       const auto& inst = GetSet();
  44.       if (inst.count(i) > 0) {
  45.         SpinLockAndAdd();
  46.       }
  47.     }
  48.   }
  49. }
  50. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {
  51.   for (auto _ : state) {
  52.     int size_sum = kSetSize;
  53.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  54.     int sum = 0;
  55.     int start = state.thread_index() * size_per_thread;
  56.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  57.     for (int i = start; i < end; i++) {
  58.       const auto& inst = GetSet();
  59.       if (inst.count(i) > 0) {
  60.         MutexLockAndAdd();
  61.       }
  62.     }
  63.   }
  64. }
  65. // 注册基准测试,并指定线程数
  66. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
  67. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
  68. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
  69. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);
  70. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
  71. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
  72. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
  73. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);
  74. BENCHMARK_MAIN();
复制代码
benchmark效果:

可以看到,无论是哪一种情况,std::mutex 的性能都更优。当然,这个测试效果可能会因不同的操作系统而有所不同,但至少可以得出一个结论:这两者的性能是一个量级的,并不存在 atomic 肯定比 std::mutex 更快的说法。这其实是由于当代 C++ 中的 std::mutex 实现已经高度优化,其实现与上面的自旋锁(SpinLock)非常相似,在低竞争的情况下并不会陷入内核态。
那么,按上面的说法,是不是我们根本不需要 atomic 变量呢?先来分析一下 atomic 的长处。
atomic 的长处有:
 1.可以实现内存占用极小的锁。
 2.当临界区操作可以等价于一个原子操作时,性能会更高。
对于第二个结论,我们可以做个测试。同样,拿前面的例子稍作修改。
case 1如下:
  1. #include <benchmark/benchmark.h>
  2. #include <atomic>
  3. #include <iostream>
  4. #include <memory>
  5. #include <mutex>
  6. #include <unordered_set>
  7. #include "spin_lock.h"
  8. constexpr int kSetSize = 10000;
  9. class MyBenchmark : public benchmark::Fixture {
  10. public:
  11.   void SetUp(const ::benchmark::State& state) override {
  12.     std::call_once(flag, [this, &state]() {
  13.       for (int i = 0; i < kSetSize; i++) {
  14.         s.insert(i);
  15.       }
  16.     });
  17.   }
  18.   const std::unordered_set<int>& GetSet() { return s; }
  19. private:
  20.   std::unordered_set<int> s;
  21.   std::once_flag flag;
  22. };
  23. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {
  24.   std::atomic<uint32_t> sum = 0;
  25.   for (auto _ : state) {
  26.     int size_sum = kSetSize;
  27.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  28.     int sum = 0;
  29.     int start = state.thread_index() * size_per_thread;
  30.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  31.     for (int i = start; i < end; i++) {
  32.       const auto& inst = GetSet();
  33.       if (inst.count(i) > 0) {
  34.         benchmark::DoNotOptimize(sum++);
  35.       }
  36.     }
  37.   }
  38. }
  39. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {
  40.   std::mutex m;
  41.   uint32_t sum = 0;
  42.   for (auto _ : state) {
  43.     int size_sum = kSetSize;
  44.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  45.     int sum = 0;
  46.     int start = state.thread_index() * size_per_thread;
  47.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  48.     for (int i = start; i < end; i++) {
  49.       const auto& inst = GetSet();
  50.       if (inst.count(i) > 0) {
  51.         std::lock_guard<std::mutex> lg(m);
  52.         sum++;
  53.       }
  54.     }
  55.   }
  56. }
  57. // 注册基准测试,并指定线程数
  58. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
  59. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
  60. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
  61. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);
  62. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
  63. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
  64. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
  65. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);
  66. BENCHMARK_MAIN();
复制代码
benchmark效果:

case 2如下:
  1. #include <benchmark/benchmark.h>
  2. #include <atomic>
  3. #include <iostream>
  4. #include <memory>
  5. #include <mutex>
  6. #include <unordered_set>
  7. #include "spin_lock.h"
  8. constexpr int kSetSize = 10000;
  9. class MyBenchmark : public benchmark::Fixture {
  10. public:
  11.   void SetUp(const ::benchmark::State& state) override {
  12.     std::call_once(flag, [this, &state]() {
  13.       for (int i = 0; i < kSetSize; i++) {
  14.         s.insert(i);
  15.       }
  16.       count_ = 0;
  17.       atomic_count_ = 0;
  18.     });
  19.   }
  20.   const std::unordered_set<int>& GetSet() { return s; }
  21.   void AtomicAdd() { atomic_count_++; }
  22.   void MutexLockAndAdd() {
  23.     std::lock_guard<std::mutex> lg(m);
  24.     count_++;
  25.   }
  26. private:
  27.   std::unordered_set<int> s;
  28.   std::once_flag flag;
  29.   uint32_t count_;
  30.   std::atomic<uint32_t> atomic_count_;
  31.   std::mutex m;
  32. };
  33. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {
  34.   for (auto _ : state) {
  35.     int size_sum = kSetSize;
  36.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  37.     int sum = 0;
  38.     int start = state.thread_index() * size_per_thread;
  39.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  40.     for (int i = start; i < end; i++) {
  41.       const auto& inst = GetSet();
  42.       if (inst.count(i) > 0) {
  43.         AtomicAdd();
  44.       }
  45.     }
  46.   }
  47. }
  48. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {
  49.   for (auto _ : state) {
  50.     int size_sum = kSetSize;
  51.     int size_per_thread = (size_sum + state.threads() - 1) / state.threads();
  52.     int sum = 0;
  53.     int start = state.thread_index() * size_per_thread;
  54.     int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);
  55.     for (int i = start; i < end; i++) {
  56.       const auto& inst = GetSet();
  57.       if (inst.count(i) > 0) {
  58.         MutexLockAndAdd();
  59.       }
  60.     }
  61.   }
  62. }
  63. // 注册基准测试,并指定线程数
  64. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
  65. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
  66. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
  67. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);
  68. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
  69. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
  70. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
  71. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);
  72. BENCHMARK_MAIN();
复制代码
benchmark效果:

接下来结合这两个长处来看,链式数据布局的场景非常适合使用 atomic 变量。
 1.内存占用少:即使每个节点都实现一个自旋锁(SpinLock),也不会浪费太多内存。

 2.链式数据布局的临界区通常可以优化成一个指针的 CAS 操作。

Epoch Based Reclamation

固然云云,但要写一个高性能的并发安全的链式数据布局是非常困难的,这重要是由于写操作包罗了删除操作。举个最简朴的例子:
假设有一个链表 A->B->C,一个线程正在读B节点,另一个线程正在删除B节点,怎样保证读线程在读B节点期间不会被另一个线程给删掉?
再举个更复杂的例子:
假设有一个链表 A->B->C。一个线程正在读取 B 节点,另一个线程正在修改 B 节点。显然,最简朴的实现是锁住 B,同时只允许一个操作,但显然这样从各方面来看性能都不是最佳的,这是第一个方法。
第二个方法是类似于 Copy On Write(COW)。写操作时先重新构造一个节点 B1,再修改对应的数据,末了通过 CAS 操作修改指针毗连 A->B1。
我们来分析一下为什么第二个方法远比第一个方法要好。
首先,上锁会触发原子写,意味着即便是你只是为了读数据,也会触发一次 Cache Line 一致性同步的题目。而且在找到 B 节点之前的每一个节点都要依次上锁来保证读取的精确性,这意味着极大概率会发生 Cache Ping-Pong 题目。
再来看写操作,写操作除了上锁以外还需要修改节点的数据。第二个方法需要先构造一个新的节点再修改,意味着这个节点在插入链表之前肯定不在其他线程的 Cache 里(清除刚好有某个变量和这个新节点的内存在同一个 Cache Line 的情况)。而第一个方法修改的节点已经在链表里,这表示在之前肯定有线程已经访问过这个节点,那么它很可能在 Cache 里面,从而触发一次 Cache Line 一致性同步的题目。
然而事情没有这么简朴。试想一下,在修改完指针 A->B1 后,B 节点需要被抛弃释放,这时候其他线程有可能正在访问 B 节点而导致瓦解。
可以看出这些题目都是由于删除操作引起的,这个题目有几个闻名的办理方案,比如 Epoch Based Reclamation 和 Hazard Pointer 等。这里只介绍其中的 Epoch Based Reclamation,感爱好的话请自行搜刮了解其他实现方式。
该算法的思路是删除操作会实验触发版本 +1,但只有当所有线程都是最新版本 e 时才华乐成,乐成后会接纳 e-1 版本的内存。因此,最多会累积 3 个版本未释放节点的内存。是个以空间换时间,轻读重写的方案。
首先,每个线程维护自己的线程变量:
 1.active:标志该线程是否正在读数据。

 2.epoch:标志该线程当前的版本。

全局维护变量:
 1.global_epoch:全局最新的版本。

 2.retire_list:等待释放的节点。

读操作:
 1.首先把线程 active 标志为 true,表示正在读数据。

 2.然后把 global_epoch 赋值给 epoch,记录当前正在读的版本。

 3.如果线程需要删除节点,则把节点放到全局的 retire_list 末尾。

 4.竣事读后,将 active 标志为 false。

写操作:
 1.如果要删除节点,则把节点放到全局的 retire_list 末尾,并且实验增长版本。

 2.增长版本时检查所有线程的状态,当所有线程满意 epoch 即是当前版本 e 大概 active 为 false 时,进行版本 e = e + 1 操作。

 3.清空 e-2 版本的 retire_list。

这里给出一个简朴的实现,代码如下:
  1. #pragma once
  2. #include <array>
  3. #include <atomic>
  4. #include <mutex>
  5. #include <numeric>
  6. #include <vector>
  7. constexpr uint8_t kEpochSize = 3;
  8. constexpr uint8_t kCacheLineSize = 64;
  9. template <uint32_t kReadThreadNum>
  10. class ThreadIDManager;
  11. template <uint32_t kReadThreadNum>
  12. struct ThreadID {
  13.   ThreadID() { tid = ThreadIDManager<kReadThreadNum>::GetInstance().AcquireThreadID(); }
  14.   ~ThreadID() { ThreadIDManager<kReadThreadNum>::GetInstance().ReleaseThreadID(tid); }
  15.   uint32_t tid;
  16. };
  17. template <uint32_t kReadThreadNum>
  18. class ThreadIDManager {
  19. public:
  20.   ThreadIDManager() : tid_list_(kReadThreadNum) { std::iota(tid_list_.begin(), tid_list_.end(), 1); }
  21.   ThreadIDManager(const ThreadIDManager &) = delete;
  22.   ThreadIDManager(ThreadIDManager &&) = delete;
  23.   ThreadIDManager &operator=(const ThreadIDManager &) = delete;
  24.   ~ThreadIDManager() = default;
  25.   static ThreadIDManager &GetInstance() {
  26.     static ThreadIDManager inst;
  27.     return inst;
  28.   }
  29.   uint32_t AcquireThreadID() {
  30.     std::lock_guard<std::mutex> lock(tid_list_mutex_);
  31.     auto tid = tid_list_.back();
  32.     tid_list_.pop_back();
  33.     return tid;
  34.   }
  35.   void ReleaseThreadID(const uint32_t tid) {
  36.     std::lock_guard<std::mutex> lock(tid_list_mutex_);
  37.     tid_list_.emplace_back(tid);
  38.   }
  39. private:
  40.   std::vector<uint32_t> tid_list_;
  41.   std::mutex tid_list_mutex_;
  42. };
  43. struct TLS {
  44.   TLS() : active(false), epoch(0) {}
  45.   TLS(TLS &) = delete;
  46.   TLS(TLS &&) = delete;
  47.   void operator=(const TLS &) = delete;
  48.   ~TLS() = default;
  49.   std::atomic_flag active;
  50.   std::atomic<uint8_t> epoch;
  51. } __attribute__((aligned(kCacheLineSize)));
  52. template <class RCObject, class DestroyClass, uint32_t kReadThreadNum>
  53. class EbrManager {
  54. public:
  55.   EbrManager() : tls_list_(), global_epoch_(0), update_(false), write_cnt_(0) {
  56.     for (int i = 0; i < kEpochSize; i++) {
  57.       retire_list_[i].store(nullptr, std::memory_order_release);
  58.     }
  59.   }
  60.   EbrManager(const EbrManager<RCObject, DestroyClass, kReadThreadNum> &) = delete;
  61.   EbrManager(EbrManager<RCObject, DestroyClass, kReadThreadNum> &&) = delete;
  62.   EbrManager &operator=(const EbrManager<RCObject, DestroyClass, kReadThreadNum> &) = delete;
  63.   ~EbrManager() { ClearAllRetireList(); }
  64.   void ClearAllRetireList() {
  65.     for (int i = 0; i < kEpochSize; i++) {
  66.       ClearRetireList(i);
  67.     }
  68.   }
  69.   inline void StartRead() {
  70.     auto &tls = GetTLS();
  71.     tls.active.test_and_set(std::memory_order_release);
  72.     tls.epoch.store(global_epoch_.load(std::memory_order_acquire), std::memory_order_release);
  73.   }
  74.   inline void EndRead() { GetTLS().active.clear(std::memory_order_release); }
  75.   inline void FreeObject(RCObject *object) {
  76.     auto epoch = global_epoch_.load(std::memory_order_acquire);
  77.     auto *node = new RetireNode;
  78.     node->obj = object;
  79.     do {
  80.       node->next = retire_list_[epoch].load(std::memory_order_acquire);
  81.     } while (!retire_list_[epoch].compare_exchange_weak(node->next, node, std::memory_order_acq_rel));
  82.     auto write_cnt = write_cnt_.fetch_add(1, std::memory_order_relaxed);
  83.     if (write_cnt > kReadThreadNum) {
  84.       if (!update_.test_and_set(std::memory_order_acq_rel)) {
  85.         TryGC();
  86.         update_.clear(std::memory_order_release);
  87.       }
  88.     }
  89.   }
  90. private:
  91.   inline TLS &GetTLS() {
  92.     thread_local ThreadID<kReadThreadNum> thread_id;
  93.     return tls_list_[thread_id.tid];
  94.   }
  95.   inline void TryGC() {
  96.     auto epoch = global_epoch_.load(std::memory_order_acquire);
  97.     // TODO 优化记录上一次搜索到的位置
  98.     for (int i = 0; i < tls_list_.size(); i++) {
  99.       if (tls_list_[i].active.test(std::memory_order::memory_order_acquire) &&
  100.           tls_list_[i].epoch.load(std::memory_order::memory_order_acquire) != epoch) {
  101.         return;
  102.       }
  103.     }
  104.     global_epoch_.store((epoch + 1) % kEpochSize, std::memory_order_release);
  105.     ClearRetireList((epoch + 2) % kEpochSize);
  106.     write_cnt_.store(0, std::memory_order_relaxed);
  107.   }
  108.   inline void ClearRetireList(int index) {
  109.     auto *retire_node = retire_list_[index].load(std::memory_order_acquire);
  110.     while (retire_node != nullptr) {
  111.       DestroyClass destroy(retire_node->obj);
  112.       auto *old_node = retire_node;
  113.       retire_node = retire_node->next;
  114.       delete old_node;
  115.     }
  116.     retire_list_[index].store(nullptr, std::memory_order_release);
  117.   }
  118.   struct RetireNode {
  119.     RCObject *obj;
  120.     RetireNode *next;
  121.   };
  122.   std::array<char, kCacheLineSize> start_padding_;
  123.   std::array<TLS, kReadThreadNum> tls_list_;
  124.   std::atomic<uint8_t> global_epoch_;
  125.   std::array<char, kCacheLineSize> mid_padding_;
  126.   std::atomic_flag update_;
  127.   std::atomic<uint32_t> write_cnt_;
  128.   std::atomic<RetireNode *> retire_list_[kEpochSize];
  129.   std::array<char, kCacheLineSize> end_padding_;
  130. };
复制代码
这里再给出一个benchmark,对比一下使用 Epoch Based Reclamation(EBR)和不使用 EBR 的区别。由于笔者时间有限,只能写一个非常简朴的版本,仅供参考。
  1. #include <benchmark/benchmark.h>
  2. #include <mutex>
  3. #include "ebr.h"
  4. #include "spin_lock.h"
  5. struct Node {
  6.   Node() : lock(), next(nullptr) {}
  7.   int key;
  8.   int value;
  9.   Node *next;
  10.   SpinLock lock;
  11. };
  12. class NodeFree {
  13. public:
  14.   NodeFree(Node *node) { delete node; }
  15. };
  16. /*
  17. * 快速测试起见,简单写了个list版本的kv结构,里面只会有3个元素,然后只支持Get和Modify,Modify也必定会命中key。
  18. * 不是直接把key,value,next设置成atomic变量而是使用SpinLock的原因是模拟复杂情况,真实情况下会存在Add和Remove操作,实现没有如此简单。
  19. */
  20. class MyList {
  21. public:
  22.   MyList() {
  23.     Node *pre_node = nullptr;
  24.     auto *&cur_node = root_;
  25.     // 这里虽然插入了10个元素,但后面的实现会假设第一个key 9作为header是绝对不会被修改或者读到的。
  26.     for (int i = 0; i < 10; i++) {
  27.       cur_node = new Node;
  28.       cur_node->key = i;
  29.       cur_node->value = i;
  30.       cur_node->next = pre_node;
  31.       pre_node = cur_node;
  32.     }
  33.   }
  34.   int Get(int key, int *value) {
  35.     root_->lock.lock();
  36.     auto *cur_node = root_->next;
  37.     auto *pre_node = root_;
  38.     while (cur_node != nullptr) {
  39.       cur_node->lock.lock();
  40.       pre_node->lock.unlock();
  41.       if (key == cur_node->key) {
  42.         *value = cur_node->value;
  43.         cur_node->lock.unlock();
  44.         return 0;
  45.       }
  46.       pre_node = cur_node;
  47.       cur_node = cur_node->next;
  48.     }
  49.     pre_node->lock.unlock();
  50.     return 1;
  51.   }
  52.   int Modify(int key, int value) {
  53.     root_->lock.lock();
  54.     auto *cur_node = root_->next;
  55.     auto *pre_node = root_;
  56.     while (cur_node != nullptr) {
  57.       cur_node->lock.lock();
  58.       pre_node->lock.unlock();
  59.       if (key == cur_node->key) {
  60.         cur_node->value = value;
  61.         cur_node->lock.unlock();
  62.         return 0;
  63.       }
  64.       pre_node = cur_node;
  65.       cur_node = cur_node->next;
  66.     }
  67.     pre_node->lock.unlock();
  68.     return 1;
  69.   }
  70.   int GetUseEbr(int key, int *value) {
  71.     ebr_mgr_.StartRead();
  72.     auto *cur_node = root_->next;
  73.     while (cur_node != nullptr) {
  74.       if (key == cur_node->key) {
  75.         *value = cur_node->value;
  76.         ebr_mgr_.EndRead();
  77.         return 0;
  78.       }
  79.       cur_node = cur_node->next;
  80.     }
  81.     ebr_mgr_.EndRead();
  82.     return 1;
  83.   }
  84.   int ModifyUseEbr(int key, int value) {
  85.     root_->lock.lock();
  86.     auto *cur_node = root_->next;
  87.     auto *pre_node = root_;
  88.     while (cur_node != nullptr) {
  89.       cur_node->lock.lock();
  90.       if (key == cur_node->key) {
  91.         auto *new_node = new Node;
  92.         new_node->key = cur_node->key;
  93.         new_node->value = value;
  94.         new_node->next = cur_node->next;
  95.         pre_node->next = new_node;
  96.         cur_node->lock.unlock();
  97.         pre_node->lock.unlock();
  98.         ebr_mgr_.FreeObject(cur_node);
  99.         return 0;
  100.       }
  101.       auto *next_node = cur_node->next;
  102.       pre_node->lock.unlock();
  103.       pre_node = cur_node;
  104.       cur_node = next_node;
  105.     }
  106.     pre_node->lock.unlock();
  107.     return 1;
  108.   }
  109. private:
  110.   Node *root_;
  111.   EbrManager<Node, NodeFree, 15> ebr_mgr_;
  112. };
  113. class MyBenchmark : public benchmark::Fixture {
  114. public:
  115.   void SetUp(const ::benchmark::State &state) override {}
  116.   MyList &GetMyList() { return l; }
  117. private:
  118.   MyList l;
  119.   std::once_flag flag;
  120. };
  121. constexpr int kKeySize = 10000;
  122. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkNoUseEbr)(benchmark::State &state) {
  123.   for (auto _ : state) {
  124.     auto &mylist = GetMyList();
  125.     if (0 == state.thread_index()) {
  126.       // modify
  127.       for (int i = 0; i < kKeySize; i++) {
  128.         mylist.Modify(i % 9, i);
  129.       }
  130.     } else {
  131.       // get
  132.       for (int i = 0; i < kKeySize; i++) {
  133.         int value;
  134.         mylist.Get(i % 9, &value);
  135.       }
  136.     }
  137.   }
  138. }
  139. BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkUseEbr)(benchmark::State &state) {
  140.   for (auto _ : state) {
  141.     auto &mylist = GetMyList();
  142.     if (0 == state.thread_index()) {
  143.       // modify
  144.       for (int i = 0; i < kKeySize; i++) {
  145.         mylist.ModifyUseEbr(i % 9, i);
  146.       }
  147.     } else {
  148.       // get
  149.       for (int i = 0; i < kKeySize; i++) {
  150.         int value;
  151.         mylist.GetUseEbr(i % 9, &value);
  152.       }
  153.     }
  154.   }
  155. }
  156. // 注册基准测试,并指定线程数
  157. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkNoUseEbr)->Threads(4);
  158. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkNoUseEbr)->Threads(8);
  159. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkNoUseEbr)->Threads(12);
  160. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkUseEbr)->Threads(4);
  161. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkUseEbr)->Threads(8);
  162. BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkUseEbr)->Threads(12);
  163. BENCHMARK_MAIN();
复制代码
benchmark效果:

结语

以上是笔者的一些个人见解。由于水平和时间有限,测试用例尽可能地简化,可能不够全面。如果内容中有任何错误,欢迎指出。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表