高级线程管理_第九章_《C++并发编程实战》笔记

饭宝  金牌会员 | 2025-3-14 22:01:57 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 995|帖子 995|积分 2985

1. 线程池(Thread Pool)

核心目的:避免频繁创建/烧毁线程,复用固定数量的线程处理任务队列。
1.1 线程池布局要素



  • 任务队列:存储待执行的任务(函数对象)
  • 工作线程集合:执行任务的线程
  • 同步机制:互斥锁(std::mutex)、条件变量(std::condition_variable)
  • 克制标志:安全停止池的开关(std::atomic)
1.2 线程池实现步调



  • 步调1:定义任务队列和工作线程
  • 步调2:初始化工作线程,循环等候任务
  • 步调3:提交任务到队列,唤醒线程
  • 步调4:安全关闭池,等候剩余任务完成
    代码示例:
  1. #include <iostream>
  2. #include <vector>
  3. #include <queue>
  4. #include <thread>
  5. #include <mutex>
  6. #include <condition_variable>
  7. #include <future>
  8. #include <functional>
  9. class ThreadPool {
  10. public:
  11.     explicit ThreadPool(size_t num_threads) : stop(false) {
  12.         for(size_t i = 0; i < num_threads; ++i) {
  13.             workers.emplace_back([this] {
  14.                 while(true) {
  15.                     std::function<void()> task;
  16.                     {
  17.                         std::unique_lock<std::mutex> lock(queue_mutex);
  18.                         condition.wait(lock, [this] { return stop || !tasks.empty(); });
  19.                         if (stop && tasks.empty()) return;
  20.                         task = std::move(tasks.front());
  21.                         tasks.pop();
  22.                     }
  23.                     task();
  24.                 }
  25.             });
  26.         }
  27.     }
  28.     template<class F, class... Args>
  29.     auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
  30.         using return_type = decltype(f(args...));
  31.         auto task = std::make_shared<std::packaged_task<return_type()>>(
  32.             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  33.         );
  34.         std::future<return_type> res = task->get_future();
  35.         {
  36.             std::unique_lock<std::mutex> lock(queue_mutex);
  37.             if(stop) throw std::runtime_error("enqueue on stopped ThreadPool");
  38.             tasks.emplace([task]() { (*task)(); });
  39.         }
  40.         condition.notify_one();
  41.         return res;
  42.     }
  43.     ~ThreadPool() {
  44.         {
  45.             std::unique_lock<std::mutex> lock(queue_mutex);
  46.             stop = true;
  47.         }
  48.         condition.notify_all();
  49.         for(std::thread &worker : workers)
  50.             worker.join();
  51.     }
  52. private:
  53.     std::vector<std::thread> workers;
  54.     std::queue<std::function<void()>> tasks;
  55.     std::mutex queue_mutex;
  56.     std::condition_variable condition;
  57.     std::atomic<bool> stop;
  58. };
  59. // 测试用例
  60. int main() {
  61.     ThreadPool pool(4);
  62.     std::vector<std::future<int>> results;
  63.     for(int i = 0; i < 8; ++i) {
  64.         results.emplace_back(pool.enqueue([i] {
  65.             std::cout << "Task " << i << " executed by thread "
  66.                       << std::this_thread::get_id() << std::endl;
  67.             std::this_thread::sleep_for(std::chrono::seconds(1));
  68.             return i * i;
  69.         }));
  70.     }
  71.     for(auto&& result : results)
  72.         std::cout << "Result: " << result.get() << std::endl;
  73.     return 0;
  74. }
复制代码
2. 线程停止(Interruptible Threads)

核心目标:安全地请求线程停止,而非强制停止(避免资源泄漏)。
2.1 停止机制实现



  • 检查点:线程定期检查停止标志
  • 原子标志:std::atomic 作为停止信号
  • 异常通报:通过抛出特定异常跳出任务
    代码:
  1. #include <iostream>
  2. #include <thread>
  3. #include <atomic>
  4. #include <stdexcept>
  5. #include <mutex>
  6. // 线程安全输出的互斥锁
  7. std::mutex cout_mutex;
  8. class InterruptFlag {
  9. public:
  10.     void set() { flag.store(true); }
  11.     bool is_set() const { return flag.load(); }
  12. private:
  13.     std::atomic<bool> flag{false};
  14. };
  15. class InterruptibleThread {
  16. public:
  17.     template<typename FunctionType>
  18.     InterruptibleThread(FunctionType f, InterruptFlag& flag)
  19.         : flag(flag), thread([this, f, &flag] {
  20.             try {
  21.                 f(flag);
  22.             } catch (...) {
  23.                 std::lock_guard<std::mutex> lock(cout_mutex);
  24.                 std::cerr << "Thread interrupted by an unknown exception." << std::endl;
  25.             }
  26.         })
  27.     {}
  28.     void join() { thread.join(); }
  29.     void interrupt() { flag.set(); }
  30. private:
  31.     InterruptFlag& flag;
  32.     std::thread thread;
  33. };
  34. // 测试用例
  35. void task(InterruptFlag& flag) {
  36.     while (!flag.is_set()) {
  37.         {
  38.             std::lock_guard<std::mutex> lock(cout_mutex);
  39.             std::cout << "Working..." << std::endl;
  40.         }
  41.         std::this_thread::sleep_for(std::chrono::milliseconds(500));
  42.     }
  43.     throw std::runtime_error("Exit by interrupt");
  44. }
  45. int main() {
  46.     InterruptFlag flag;
  47.     InterruptibleThread t(task, flag);
  48.    
  49.     std::this_thread::sleep_for(std::chrono::seconds(2));
  50.     {
  51.         std::lock_guard<std::mutex> lock(cout_mutex);
  52.         std::cout << "Sending interrupt..." << std::endl;
  53.     }
  54.     t.interrupt();
  55.     t.join();
  56.     return 0;
  57. }
  58. /*
  59. - 协作式中断:线程在安全点检查标志,决定是否退出。
  60. - 异常处理:通过抛出异常跳出深层嵌套调用栈。
  61. - 资源释放:确保线程退出前正确释放持有资源。
  62. */
复制代码
多选题

1.(多选)在线程池计划中,以下哪种情况会导致任务无法被正确执行?
A. 工作线程在pop_task时未正确处理条件变量的虚伪唤醒
B. 使用shared_ptr生存任务队列的锁,导致死锁
C. 提交任务时未检查线程池是否已关闭,直接放入队列
D. 任务队列的互斥锁粒度计划过大,影响并发性能
E. 在析构线程池时未优雅关闭所有线程
2.(多选)关于线程停止的正确计划,以下描述错误的是?
A. 可以通过atomic标志位实现线程的强制停止
B. C++标准库中的std::thread原生支持停止操作
C. 线程池中的任务应包含停止检查点以实现协作式停止
D. 停止一个阻塞在I/O操作上的线程可能导致资源泄漏
E. 使用条件变量等候时应联合谓词避免无法唤醒的阻塞
3.(多选)在线程池的任务调度中,以下哪些操作必要同步?
A. 检查任务队列是否为空
B. 多个工作线程同时从队列头部提取任务
C. 提交新任务到队列尾部
D. 调整线程池中的线程数量
E. 修改线程池的运行状态标志
4.(多选)线程池的优雅关闭应包含哪些步调?
A. 克制接受新任务
B. 等候所有已提交任务执行完成
C. 强制停止所有阻塞的工作线程
D. 通过条件变量通知所有工作线程退出循环
E. 重置任务队列中的未完成任务
5.(多选)以下哪些计划可能导致线程池的性能瓶颈?
A. 使用无锁队列实现任务提交
B. 工作线程频繁竞争同一互斥锁访问队列
C. 动态调整线程数量时未考虑负载平衡
D. 任务分发接纳简单的轮询策略
E. 使用线程当地存储缓存高频访问的数据
多选题答案


  • 答案:A、C、E
剖析:
A. 工作线程在弹出任务时未处理条件变量的虚伪唤醒,可能导致线程错误地尝试处理空队列中的任务,从而崩溃或逻辑错误。
C. 提交任务时不检查线程池是否关闭,可能导致任务被添加到已关闭的队列中,无法执行或引发异常。
E. 析构线程池时未优雅关闭线程,可能导致资源泄漏或线程访问已烧毁的数据。
B错误:shared_ptr生存锁不会直接导致死锁,锁本身计划或使用顺序错误才会导致死锁。shared_ptr管理锁可能导致所有权混乱,但题目中“导致死锁”的描述不准确。
D错误:互斥锁粒度过大会降低并发性能,但不会直接导致任务无法执行。

  • 答案:A、B
剖析:
A错误:atomic标志位无法强制停止阻塞中的线程(如等候条件变量或I/O),此为协作式停止,选项描述错误。
B错误:C++标准库的std::thread不原生支持停止,需通过标志位实现协作式停止。
C正确:任务应包含停止检查点。
D正确:停止阻塞在I/O的线程可能引发资源泄漏。
E正确:条件变量需联合谓词避免虚伪唤醒。

  • 答案:A、B、C、D、E
剖析:
A. 检查队列是否为空需同步,以避免数据竞争。
B. 多个线程提取任务需同步,防止并发修改队列。
C. 提交任务到队列尾部需同步,保证队列的线程安全。
D. 调整线程数量需同步,避免中间状态不一致。
E. 修改运行状态标志需同步,确保可见性与原子性。

  • 答案:A、B、D
剖析:
A. 克制接受新任务是优雅关闭的第一步。
B. 等候已提交任务完成保证任务全部处理。
D. 通过条件变量通知线程退出循环是关键步调。
C错误:强制停止线程是暴力方式,不优雅。
E错误:重置未完成任务非必要,取决于计划。

  • 答案:B、C、D
剖析:
B. 频繁竞争同一锁(如任务队列锁)是范例性能瓶颈。
C. 动态调整线程数未负载平衡,可能导致资源浪费或竞争。
D. 轮询策略可能导致负载不均,影响性能。
A正确:无锁队列可提升性能。
E正确:线程当地存储优化高频数据访问,反而是优化点。
计划题目


  • 底子线程池实现
    题目要求:计划一个固定巨细的线程池,支持提交无参数任务。实现任务队列、工作线程启动和任务分发逻辑,并在main函数中测试提交10个任务。
  • 支持任务优先级的线程池
    题目要求:扩展线程池,实现任务优先级(高/低)。高优先级任务优先执行。在main函数中测试混淆优先级任务的执行顺序。
  • 线程池中的任务返回值
    题目要求:实现线程池支持任务返回Future。使用std::packaged_task封装任务,允许调用者获取计算结果。
  • 动态调整线程池巨细
    题目要求:允许运行时动态增加或镌汰工作线程数量。需处理线程的启动和退出同步问题。
  • 线程安全的停止机制
    题目要求:为线程池中的任务添加协作式停止支持,允许外部请求停止长时间运行的任务。
计划题目答案

  1. // 1.
  2. #include <iostream>
  3. #include <vector>
  4. #include <thread>
  5. #include <queue>
  6. #include <mutex>
  7. #include <condition_variable>
  8. #include <functional>
  9. #include <atomic>
  10. class ThreadPool {
  11. public:
  12.     explicit ThreadPool(size_t num_threads) : stop(false) {
  13.         for (size_t i = 0; i < num_threads; ++i) {
  14.             workers.emplace_back([this] {
  15.                 while (true) {
  16.                     std::function<void()> task;
  17.                     {
  18.                         std::unique_lock<std::mutex> lock(queue_mutex);
  19.                         condition.wait(lock, [this] { return stop || !tasks.empty(); });
  20.                         if (stop && tasks.empty()) return;
  21.                         task = std::move(tasks.front());
  22.                         tasks.pop();
  23.                     }
  24.                     task();
  25.                 }
  26.             });
  27.         }
  28.     }
  29.     template<typename F>
  30.     void enqueue(F&& f) {
  31.         {
  32.             std::unique_lock<std::mutex> lock(queue_mutex);
  33.             tasks.emplace(std::forward<F>(f));
  34.         }
  35.         condition.notify_one();
  36.     }
  37.     ~ThreadPool() {
  38.         {
  39.             std::unique_lock<std::mutex> lock(queue_mutex);
  40.             stop = true;
  41.         }
  42.         condition.notify_all();
  43.         for (auto& worker : workers)
  44.             worker.join();
  45.     }
  46. private:
  47.     std::vector<std::thread> workers;
  48.     std::queue<std::function<void()>> tasks;
  49.     std::mutex queue_mutex;
  50.     std::condition_variable condition;
  51.     std::atomic<bool> stop;
  52. };
  53. // 测试代码
  54. int main() {
  55.     ThreadPool pool(4);
  56.     for (int i = 0; i < 10; ++i) {
  57.         pool.enqueue([i] {
  58.             std::this_thread::sleep_for(std::chrono::milliseconds(100));
  59.             std::cout << "Task " << i << " done by thread "
  60.                       << std::this_thread::get_id() << std::endl;
  61.         });
  62.     }
  63.     std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待任务完成
  64.     return 0;
  65. }
  66. // 2.
  67. #include <iostream>
  68. #include <vector>
  69. #include <queue>
  70. #include <thread>
  71. #include <mutex>
  72. #include <condition_variable>
  73. #include <functional>
  74. #include <atomic>
  75. class ThreadPool {
  76. private:
  77.     struct Task {
  78.         std::function<void()> func;
  79.         int priority; // 0:低, 1:高
  80.         bool operator<(const Task& other) const { return priority < other.priority; }
  81.     };
  82.     std::vector<std::thread> workers;
  83.     std::priority_queue<Task> tasks;
  84.     std::mutex queue_mutex;
  85.     std::condition_variable condition;
  86.     std::atomic<bool> stop{ false };
  87.     void worker() {
  88.         while (true) {
  89.             std::function<void()> task;
  90.             {
  91.                 std::unique_lock<std::mutex> lock(this->queue_mutex);
  92.                 this->condition.wait(lock, [this] { return this->stop ||!this->tasks.empty(); });
  93.                 if (this->stop && this->tasks.empty())
  94.                     return;
  95.                 task = std::move(this->tasks.top().func);
  96.                 this->tasks.pop();
  97.             }
  98.             task();
  99.         }
  100.     }
  101. public:
  102.     ThreadPool(size_t numThreads) {
  103.         for (size_t i = 0; i < numThreads; ++i) {
  104.             workers.emplace_back([this] { this->worker(); });
  105.         }
  106.     }
  107.     void enqueue(std::function<void()> f, int priority) {
  108.         std::unique_lock<std::mutex> lock(queue_mutex);
  109.         tasks.push({ std::move(f), priority });
  110.         condition.notify_one();
  111.     }
  112.     ~ThreadPool() {
  113.         {
  114.             std::unique_lock<std::mutex> lock(queue_mutex);
  115.             stop = true;
  116.         }
  117.         condition.notify_all();
  118.         for (std::thread& worker : workers) {
  119.             worker.join();
  120.         }
  121.     }
  122. };
  123. int main() {
  124.     ThreadPool pool(2);
  125.     // 高优先级任务
  126.     pool.enqueue([&] {
  127.         std::cout << "High priority task is running." << std::endl;
  128.     }, 1);
  129.     // 低优先级任务
  130.     pool.enqueue([&] {
  131.         std::cout << "Low priority task is running." << std::endl;
  132.     }, 0);
  133.     // 再添加一个高优先级任务
  134.     pool.enqueue([&] {
  135.         std::cout << "Another high priority task is running." << std::endl;
  136.     }, 1);
  137.     // 让主线程休眠一段时间,确保任务有足够的时间执行
  138.     std::this_thread::sleep_for(std::chrono::seconds(2));
  139.     return 0;
  140. }
  141. // 3.
  142. #include <iostream>
  143. #include <vector>
  144. #include <queue>
  145. #include <thread>
  146. #include <mutex>
  147. #include <condition_variable>
  148. #include <functional>
  149. #include <atomic>
  150. #include <future>
  151. #include <memory>
  152. class ThreadPool {
  153. private:
  154.     std::vector<std::thread> workers;
  155.     std::queue<std::function<void()>> tasks;
  156.     std::mutex queue_mutex;
  157.     std::condition_variable condition;
  158.     std::atomic<bool> stop{ false };
  159.     void worker() {
  160.         while (true) {
  161.             std::function<void()> task;
  162.             {
  163.                 std::unique_lock<std::mutex> lock(this->queue_mutex);
  164.                 this->condition.wait(lock, [this] { return this->stop ||!this->tasks.empty(); });
  165.                 if (this->stop && this->tasks.empty())
  166.                     return;
  167.                 task = std::move(this->tasks.front());
  168.                 this->tasks.pop();
  169.             }
  170.             task();
  171.         }
  172.     }
  173. public:
  174.     ThreadPool(size_t numThreads) {
  175.         for (size_t i = 0; i < numThreads; ++i) {
  176.             workers.emplace_back([this] { this->worker(); });
  177.         }
  178.     }
  179.     template<typename Func>
  180.     auto enqueue(Func&& func) -> std::future<typename std::result_of<Func()>::type> {
  181.         using return_type = typename std::result_of<Func()>::type;
  182.         auto task = std::make_shared< std::packaged_task<return_type()> >(
  183.             std::forward<Func>(func)
  184.         );
  185.         
  186.         std::future<return_type> res = task->get_future();
  187.         {
  188.             std::unique_lock<std::mutex> lock(queue_mutex);
  189.             if (stop)
  190.                 throw std::runtime_error("enqueue on stopped ThreadPool");
  191.             tasks.emplace([task]() { (*task)(); });
  192.         }
  193.         condition.notify_one();
  194.         return res;
  195.     }
  196.     ~ThreadPool() {
  197.         {
  198.             std::unique_lock<std::mutex> lock(queue_mutex);
  199.             stop = true;
  200.         }
  201.         condition.notify_all();
  202.         for (std::thread& worker : workers) {
  203.             worker.join();
  204.         }
  205.     }
  206. };
  207. int main() {
  208.     // 创建一个包含 2 个线程的线程池
  209.     ThreadPool pool(2);
  210.     // 测试用例 1:返回一个整数
  211.     auto fut1 = pool.enqueue([] { return 42; });
  212.     std::cout << "Result 1: " << fut1.get() << std::endl;
  213.     // 测试用例 2:返回一个字符串
  214.     auto fut2 = pool.enqueue([] { return std::string("Hello, ThreadPool!"); });
  215.     std::cout << "Result 2: " << fut2.get() << std::endl;
  216.     // 测试用例 3:执行一个复杂一点的计算
  217.     auto fut3 = pool.enqueue([] {
  218.         int sum = 0;
  219.         for (int i = 1; i <= 10; ++i) {
  220.             sum += i;
  221.         }
  222.         return sum;
  223.     });
  224.     std::cout << "Result 3: " << fut3.get() << std::endl;
  225.     return 0;
  226. }
  227. // 4.
  228. #include <iostream>
  229. #include <vector>
  230. #include <queue>
  231. #include <thread>
  232. #include <mutex>
  233. #include <condition_variable>
  234. #include <functional>
  235. #include <atomic>
  236. #include <future>
  237. #include <memory>
  238. #include <chrono>
  239. // 日志输出控制宏
  240. #define ENABLE_LOGGING 1
  241. #if ENABLE_LOGGING
  242. #define LOG(msg) std::cout << msg << std::endl
  243. #else
  244. #define LOG(msg)
  245. #endif
  246. class ThreadPool {
  247. private:
  248.     struct Task {
  249.         std::function<void()> func;
  250.         int priority;
  251.         bool operator<(const Task& other) const { return priority < other.priority; }
  252.     };
  253.     std::vector<std::thread> workers;
  254.     std::priority_queue<Task> tasks;
  255.     std::mutex queue_mutex;
  256.     std::condition_variable condition;
  257.     std::atomic<bool> stop{ false };
  258.     std::atomic<size_t> current_size{ 0 };
  259.     std::atomic<size_t> threads_to_exit{ 0 };
  260.     void worker() {
  261.         ++current_size;
  262.         LOG("Thread started. Current size: " << current_size.load());
  263.         while (true) {
  264.             std::function<void()> task;
  265.             {
  266.                 std::unique_lock<std::mutex> lock(queue_mutex);
  267.                 // 等待任务或退出信号
  268.                 condition.wait(lock, [this] { return stop || threads_to_exit > 0 ||!tasks.empty(); });
  269.                 // 如果需要退出且任务队列为空,则退出线程
  270.                 if ((threads_to_exit > 0 && tasks.empty()) || stop) {
  271.                     if (threads_to_exit > 0) {
  272.                         --threads_to_exit;
  273.                     }
  274.                     --current_size;
  275.                     LOG("Thread exited. Current size: " << current_size.load());
  276.                     return;
  277.                 }
  278.                 task = std::move(tasks.top().func);
  279.                 tasks.pop();
  280.             }
  281.             try {
  282.                 task();
  283.             } catch (const std::exception& e) {
  284.                 LOG("Exception in task: " << e.what());
  285.             }
  286.         }
  287.     }
  288.     void add_threads(size_t num) {
  289.         for (size_t i = 0; i < num; ++i) {
  290.             workers.emplace_back([this] { worker(); });
  291.         }
  292.     }
  293.     void remove_threads(size_t num) {
  294.         threads_to_exit = num;
  295.         condition.notify_all(); // 通知所有线程检查是否需要退出
  296.         {
  297.             std::unique_lock<std::mutex> lock(queue_mutex);
  298.             // 等待线程退出
  299.             while (current_size > workers.size() - num) {
  300.                 condition.wait(lock);
  301.             }
  302.         }
  303.         // 移除已退出的线程
  304.         workers.erase(workers.begin() + (workers.size() - num), workers.end());
  305.     }
  306. public:
  307.     explicit ThreadPool(size_t initial_size = 4) {
  308.         resize(initial_size);
  309.     }
  310.     void resize(size_t new_size) {
  311.         std::unique_lock<std::mutex> lock(queue_mutex);
  312.         if (new_size > workers.size()) {
  313.             add_threads(new_size - workers.size());
  314.         } else if (new_size < workers.size()) {
  315.             remove_threads(workers.size() - new_size);
  316.         }
  317.         LOG("Thread pool size resized to: " << workers.size());
  318.     }
  319.     template<typename Func>
  320.     auto enqueue(Func&& func, int priority = 0) -> std::future<typename std::invoke_result<Func>::type> {
  321.         using return_type = typename std::invoke_result<Func>::type;
  322.         auto task = std::make_shared< std::packaged_task<return_type()> >(
  323.             std::forward<Func>(func)
  324.         );
  325.         std::future<return_type> res = task->get_future();
  326.         {
  327.             std::unique_lock<std::mutex> lock(queue_mutex);
  328.             if (stop)
  329.                 throw std::runtime_error("Cannot enqueue task on stopped thread pool.");
  330.             tasks.push({ [task]() { (*task)(); }, priority });
  331.         }
  332.         condition.notify_one();
  333.         return res;
  334.     }
  335.     // 添加获取当前线程数量的方法
  336.     size_t get_current_size() const {
  337.         return workers.size();
  338.     }
  339.     // 检查任务队列是否为空,去掉 const 限定符
  340.     bool is_task_queue_empty() {
  341.         std::unique_lock<std::mutex> lock(queue_mutex);
  342.         return tasks.empty();
  343.     }
  344.     ~ThreadPool() {
  345.         {
  346.             std::unique_lock<std::mutex> lock(queue_mutex);
  347.             stop = true;
  348.         }
  349.         condition.notify_all();
  350.         for (auto& worker : workers) {
  351.             if (worker.joinable()) {
  352.                 worker.join();
  353.             }
  354.         }
  355.     }
  356. };
  357. // 任务函数增加中断检查
  358. void long_running_task(std::atomic<bool>& interrupt_flag) {
  359.     int count = 0;
  360.     while (!interrupt_flag.load()) {
  361.         // 模拟分段执行任务
  362.         std::this_thread::sleep_for(std::chrono::milliseconds(200));
  363.         LOG("Task is running, step: " << count++);
  364.     }
  365.     LOG("Task is interrupted.");
  366. }
  367. int main() {
  368.     try {
  369.         ThreadPool pool(2);
  370.         LOG("Initial thread pool size: " << pool.get_current_size());
  371.         // 增大线程池大小
  372.         pool.resize(4);
  373.         LOG("Thread pool size after increasing: " << pool.get_current_size());
  374.         // 缩小线程池大小
  375.         pool.resize(1);
  376.         LOG("Thread pool size after decreasing: " << pool.get_current_size());
  377.         std::atomic<bool> interrupt_flag(false);
  378.         auto fut = pool.enqueue([&interrupt_flag] { long_running_task(interrupt_flag); });
  379.         std::this_thread::sleep_for(std::chrono::seconds(1));
  380.         interrupt_flag = true;
  381.         fut.wait();
  382.         // 检查任务队列是否为空
  383.         if (pool.is_task_queue_empty()) {
  384.             LOG("Task queue is empty.");
  385.         } else {
  386.             LOG("Task queue is not empty.");
  387.         }
  388.     } catch (const std::exception& e) {
  389.         std::cerr << "Exception: " << e.what() << std::endl;
  390.     }
  391.     return 0;
  392. }   
  393. // 5.
  394. #include <iostream>
  395. #include <vector>
  396. #include <queue>
  397. #include <thread>
  398. #include <mutex>
  399. #include <condition_variable>
  400. #include <functional>
  401. #include <atomic>
  402. #include <future>
  403. #include <memory>
  404. #include <chrono>
  405. class ThreadPool {
  406. private:
  407.     struct Task {
  408.         std::function<void()> func;
  409.         int priority;
  410.         bool operator<(const Task& other) const { return priority < other.priority; }
  411.     };
  412.     std::vector<std::thread> workers;
  413.     std::priority_queue<Task> tasks;
  414.     std::mutex queue_mutex;
  415.     std::condition_variable condition;
  416.     std::atomic<bool> stop{ false };
  417.     std::atomic<size_t> current_size{ 0 };
  418.     std::atomic<size_t> threads_to_exit{ 0 };
  419.     void worker() {
  420.         ++current_size;
  421.         while (true) {
  422.             std::function<void()> task;
  423.             {
  424.                 std::unique_lock<std::mutex> lock(queue_mutex);
  425.                 condition.wait(lock, [this] { return stop || threads_to_exit > 0 ||!tasks.empty(); });
  426.                 // 如果需要退出且任务队列为空,则退出线程
  427.                 if ((threads_to_exit > 0 && tasks.empty()) || stop) {
  428.                     if (threads_to_exit > 0) {
  429.                         --threads_to_exit;
  430.                     }
  431.                     --current_size;
  432.                     return;
  433.                 }
  434.                 task = std::move(tasks.top().func);
  435.                 tasks.pop();
  436.             }
  437.             task();
  438.         }
  439.     }
  440. public:
  441.     explicit ThreadPool(size_t initial_size = 4) {
  442.         resize(initial_size);
  443.     }
  444.     void resize(size_t new_size) {
  445.         std::unique_lock<std::mutex> lock(queue_mutex);
  446.         if (new_size > current_size) {
  447.             for (size_t i = current_size; i < new_size; ++i) {
  448.                 workers.emplace_back([this] { worker(); });
  449.             }
  450.         } else if (new_size < current_size) {
  451.             threads_to_exit = current_size - new_size;
  452.             condition.notify_all(); // 通知所有线程检查是否需要退出
  453.             // 等待线程退出
  454.             while (current_size > new_size) {
  455.                 condition.wait(lock);
  456.             }
  457.             // 移除已退出的线程
  458.             workers.erase(workers.begin() + new_size, workers.end());
  459.         }
  460.     }
  461.     template<typename R>
  462.     std::future<R> enqueue(std::function<R()> task, int priority = 0) {
  463.         auto packaged = std::make_shared<std::packaged_task<R()>>(task);
  464.         auto fut = packaged->get_future();
  465.         {
  466.             std::unique_lock<std::mutex> lock(queue_mutex);
  467.             // 修改为使用 push 方法
  468.             tasks.push({ [packaged] { (*packaged)(); }, priority });
  469.         }
  470.         condition.notify_one();
  471.         return fut;
  472.     }
  473.     // 添加获取当前线程数量的方法
  474.     size_t get_current_size() const {
  475.         return current_size;
  476.     }
  477.     ~ThreadPool() {
  478.         {
  479.             std::unique_lock<std::mutex> lock(queue_mutex);
  480.             stop = true;
  481.         }
  482.         condition.notify_all();
  483.         for (auto& worker : workers) {
  484.             if (worker.joinable()) {
  485.                 worker.join();
  486.             }
  487.         }
  488.     }
  489. };
  490. // 任务函数增加中断检查
  491. void long_running_task(std::atomic<bool>& interrupt_flag) {
  492.     int count = 0;
  493.     while (!interrupt_flag.load()) {
  494.         // 模拟分段执行任务
  495.         std::this_thread::sleep_for(std::chrono::milliseconds(200));
  496.         std::cout << "Task is running, step: " << count++ << std::endl;
  497.     }
  498.     std::cout << "Task is interrupted." << std::endl;
  499. }
  500. int main() {
  501.     ThreadPool pool(2);
  502.     // 定义中断标志
  503.     std::atomic<bool> flag(false);
  504.     // 显式转换 lambda 为 std::function
  505.     auto task = std::function<void()>([&flag] { long_running_task(flag); });
  506.     auto fut = pool.enqueue(task);
  507.     // 主线程休眠一段时间,让任务执行一会儿
  508.     std::this_thread::sleep_for(std::chrono::seconds(1));
  509.     // 外部触发中断
  510.     std::cout << "Triggering interruption..." << std::endl;
  511.     flag.store(true);
  512.     // 等待任务完成
  513.     fut.wait();
  514.     return 0;
  515. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

饭宝

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表