王國慶 发表于 2025-3-18 06:38:40

嵌入式八股,手撕线程池(C++)

线程池的主要目的是复用线程资源,减少线程创建和销毁的开销,同时提高程序的并发性能。
也就是说,我创建一个线程对象,他可以复用,线程池里有多个线程对象,有任务来了,我调用一个,用完了,我再放回去。
线程池优点

提高线程利用率
提高响应速率
便于同一管理线程对象
可以控制最大并发数
C++实现

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>

class threadpools
{
private:
    /* data */
    std::vector<std::thread> threads;// 一个线程数组,用来加入线程,线程池那个池
    std::queue<std::function<void()>> task_q;//一个任务队列,用来发布任务,等待消费者来取
    std::mutex mtx;//一个互斥量
    std::condition_variable conditon;//条件变量,防止为空后继续取任务,起通知作用,管理多个线程
    bool stop;//控制线程池的开始与结束
public:
    threadpools(int threadnums);
    ~threadpools();
    template <class F, class... Args>
    void add_task(F&& f, Args&&... args);
    std::atomic<int> task_count; // 添加一个任务计数器,防止主程序退出了,线程池里一个任务都没执行,我直接写公有了,不要学我

};

threadpools::threadpools(int threadnums):stop(false), task_count(0) {
      //加线程到池中到最大数目
      for (int i = 0; i < threadnums; i++)
      {
            threads.emplace_back({//加入线程,push会拷贝加入的变量,这个不会
                while (1)
                {
                  std::unique_lock<std::mutex> lock(mtx);//创建一个直接上锁
                  conditon.wait(lock,{
                        return !this->task_q.empty() || stop;});//任务为空或线程池终止就等待结束,继续上锁,线程会卡在这里
                  if(stop&&task_q.empty()){//一个双重检测,防止任务被其它线程取到了,导致任务列表为空从而报错,即竞态条件
                        return;
                  }
                //如果任务队列里有任务,我就取一个任务去完成

                std::function<void()> task(std::move(task_q.front()));
                task_q.pop();
                lock.unlock();//取到了解锁,让其它线程去取
                task();
                task_count--;
                }
               
            });
      }
      
    }

threadpools::~threadpools()
{
    std::unique_lock<std::mutex> lock(mtx);
    stop = true;
    conditon.notify_all();//通知所有线程工作,取完所有任务
    for (auto& t:threads)
    {
      t.join();
    }
   

}

//用函数模版写一个加入任务的函数
template <class F, class... Args>
void threadpools::add_task(F &&f, Args&&... args){
    std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
    std::unique_lock<std::mutex> lock(mtx);
    task_q.emplace(std::move(task));
    task_count++; // 添加任务,计数器加一
    conditon.notify_one();

}

int main(){
    threadpools pool(4);
    for (int i = 0; i < 10; i++) {
      std::cout << "加入任务: " << i << std::endl;
      pool.add_task( {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            printf("任务%d已完成\n", i);
      });
    }

    // 等待线程池中的任务完成
    while (pool.task_count > 0) {
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    return 0;
}
100行左右的CPP线程池实现,实现结果如下
https://i-blog.csdnimg.cn/direct/ffa42d50b5004c968999c25ee9d1d6c7.png
我在执行add_task函数之前就输出了参加任务,按顺序参加,但输出任务实现的顺序是打乱的,符合线程池结果。
代码里还涉及到一些CPP11的新特性,比如完美转发,需要重点关注。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 嵌入式八股,手撕线程池(C++)