[Linux#43][线程] 死锁 | 同步 | 基于 BlockingQueue 的生产者消费者模子 ...

打印 上一主题 下一主题

主题 1794|帖子 1794|积分 5382

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
目次
1. 死锁
解决死锁题目
2. 同步
2.1 条件变量函数 cond
2.2 条件变量的使用:
3.CP 题目--理论
4. 基于 BlockingQueue 的生产者消费者模子
1. 基本概念
2.BlockQueue.hpp
基本设置:
生产关系控制:
消费关系的控制
⭕思考点
test 函数:
进化执行 Task.hpp
3. 注意点


1. 死锁

• 死锁是指在一组历程中的各个历程均占有不会开释的资源,但因相互申请被其他历程所占用不会开释的资源,而处于的一种永久等待状态。
   死锁四个必要条件 (必须同时满意)
  

  • 互斥条件:一个资源每次只能被一个执行流使用--前提
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已得到的资源保持不放--原则
  • 不剥夺条件:一个执行流已得到的资源,在末使用完之前,不能强行剥夺--原则
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系--紧张条件


解决死锁题目

理念:粉碎四个必要条件--只需要一个不满意就可以的
   方法:
  

  • 加锁顺序一致
我们在申请锁的时候,A线程先申请A锁,在申请B锁,而B线程先申请B锁,在申请A锁,所以两个线程天然申请锁的顺序就是环状的。我们可以尽量不让线程出现这个环路情况,我们让两个线程申请锁的顺序保持一致,就可以粉碎循环等待题目。两个线程都是先申请A锁在申请B锁。


  • 避免锁未开释的场景
接口:pthread_mutex_trylock,失败了就会返回退出,开释锁


  • 资源一次性分配
资源一次性分配,比如说你有一万行代码,有五处要申请锁,你可以最开始一次就给线程分配好,而不要把五处申请打散到代码各个区域里所导致加锁场景非常复杂

   避免死锁算法
  

  • 死锁检测算法(相识)
  • 银行家算法(相识)

2. 同步

同步!同步题目是包管数据安全的情况下,让我们的线程访问资源具有一定的顺序性
   包管线程安全同步了,为什么还要设置锁?
  注意前言和后果。排队是结果。例如忽然新来了一个线程,被锁挡在了门外,才开始到后面排队的。
分配均衡的可以使用纯互斥,同步是解决分配不均衡题目的


  • 快速提出解决方案 条件变量


  • 锁和铃铛(条件变量--布尔类型)都是一个结构体,OS 先描述再组织
  • 条件变量必须依靠于锁的使用(条件就是被锁了,所以才加入等待队列)
   条件变量
  • 当一个线程互斥地访问某个变量时,它可能发如今别的线程改变状态之前,它 什么也做不了。
• 例如一个线程访问队列时,发现队列为空,它只能等待,只到别的线程将一个 节点添加到队列中。这种情况就需要用到条件变量。

2.1 条件变量函数 cond

条件变量就相称于是铃铛,和锁的设置非常的相似
   

  • 初始化 – pthread_cond_init()
  



    • 静态分配:

  1. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
复制代码




    • 动态分配:







      • 原型:


  1. int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
复制代码






      • 参数:










        • cond: 需要初始化的条件变量
        • attr: 初始化条件变量的属性,一般设置为 nullptr









      • 返回值: 乐成返回 0,失败返回错误码


   

  • 销毁 – pthread_cond_destroy()
  



    • 原型:

  1. int pthread_cond_destroy(pthread_cond_t *cond);
复制代码




    • 参数:







      • cond: 需要销毁的条件变量






    • 返回值: 乐成返回 0,失败返回错误码
    • 注意: 使用 PTHREAD_COND_INITIALIZER 初始化的条件变量不需要销毁

   

  • 等待条件变量 – pthread_cond_wait()
  



    • 原型:

  1. int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
复制代码




    • 参数:







      • cond: 需要等待的条件变量
      • mutex: 当火线程所处临界区对应的互斥锁


   

  • ⭕ (为什么要传这个锁变量呢?
  • 1. pthread_cond_wait让线程等待的时候,会主动开释锁,将其加入到等待队列中,不用管临界资源的状态情况
  



    • 返回值: 乐成返回 0,失败返回错误码
    • 注意: wait 一定要在加锁和解锁之间进行!

   

  • 唤醒等待
  



    • 唤醒全部线程 – pthread_cond_broadcast()







      • 原型:


  1. int pthread_cond_broadcast(pthread_cond_t *cond);
复制代码






      • 功能: 唤醒等待队列中的全部线程
      • 参数:










        • cond: 需要等待的条件变量









      • 返回值: 乐成返回 0,失败返回错误码






    • 唤醒首个线程 – pthread_cond_signal()







      • 原型:


  1. int pthread_cond_signal(pthread_cond_t *cond);
复制代码






      • 功能: 唤醒等待队列中的首个线程
      • 参数:










        • cond: 需要等待的条件变量









      • 返回值: 乐成返回 0,失败返回错误码


测试:


庞杂原因:多线程打印出现庞杂,表现器是文件,看作一个共享资源
   uint64 是什么?
  一种跨平台的方式来表现至少 64 位的无符号整数

2.2 条件变量的使用:

原理图,以我们的单人自习室为例


对于线程的管理: 先所有都锁上,再依次唤醒,就实现了每个人进去执行一次,退出,下一个执行
  1. while(true)
  2.     {
  3.         pthread_mutex_lock(&mutex);
  4.         pthread_cond_wait(&cond, &mutex);              
  5.         std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;
  6.         pthread_mutex_unlock(&mutex);
  7.     }
  8. }
  9. int main()
  10. {
  11.     for(uint64_t i = 0; i < 5; i++)
  12.     {
  13.         pthread_t tid;
  14.         pthread_create(&tid, nullptr, Count, (void*)i);
  15.     }
  16.     while(true)
  17.     {
  18.         sleep(1);
  19.         pthread_cond_broadcast(&cond);
  20.         std::cout << "signal one thread..." << std::endl;
  21.     }
  22.     return 0;
  23. }
复制代码
  我们怎么知道我们要让一个线程去等待了?
  一定是临界资源(自习室里面有人)不停当,没错,临界资源也是有状态的!!
直接走人叫互斥,去后面排队叫同步

   你怎么知道临界资源是停当还是不停当的?即怎么知道自习室里面有没有人
  你判断出来的!判断是访问临界资源(自习室) 吗?必须是的,也就是判断必须在加锁之后!!!
所以等待的过程,一定要在加锁和解锁之间pthread_cond_wait让线程等待的时候,会主动开释锁,将其加入到等待队列中

   sum
  

  • 等待条件满意的时候往往是在临界区内等待的




    • 当该线程进入等待的时候,互斥锁会主动开释
    • 而当该线程被唤醒时,又会主动得到对应的互斥锁



  • 条件变量需要配合互斥锁使用,其中条件变量是用来完成同步的,而互斥锁是用来完成互斥的

3.CP 题目--理论

生产者消费者模子(consumer producter)


   存在超市的原因
  

  • 效率高,中转站
  • 大号的缓存,解决了忙闲不均。生产者视角:有多少存储空间,消费者:有多少商品数
  • 让生产和消费的行为,进行一定程度的解耦

   在盘算机中,抽象出来
  

  • 生产者:线程负担
  • 超市:特定结构的内存空间->共享资源->存在并发题目
  • 消费者:线程负担
将商品理解为数据,执行流在做通讯

   如何高效的通讯
  互斥是一个包管安全的手段
研究超市的并发 三种关系:


  • 生产者 vs 生产者(竞争的互斥关系,只允许一个)
  • 消费者 vs 消费者(互斥)
  • 生产者 vs 消费者(互斥--安全,同步--一定的顺序性)

321 原则(便与记忆和给别人先容)


  • 3 种关系
  • 2 种脚色--生产和消费
  • 1 个生意业务场合--特点结构的内存空间
例如解耦 add 和 main ,实现高并发

4. 基于 BlockingQueue 的生产者消费者模子

1. 基本概念

在多线程编程中,阻塞队列(Blocking Queue)是一种常用的数据结构,用于实现生产者和消费者模子。与平凡队列相比,阻塞队列具有以下特点:


  • 当队列为空时,从队列获取元素的操纵将会被阻塞,直到队列中有新元素被放入。
  • 当队列满时,往队列里存放元素的操纵也会被阻塞,直到队列中有元素被取出。
  • 其余时间就是边生产边消费,同时进行


2.BlockQueue.hpp

基本设置:

  1. template <class T>
  2. class BlockQueue
  3. {
  4.     static const int defalutnum = 20;
  5. public:
  6.     BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
  7.     {
  8.         pthread_mutex_init(&mutex_, nullptr);
  9.         pthread_cond_init(&c_cond_, nullptr);
  10.         pthread_cond_init(&p_cond_, nullptr);
  11.         // low_water_ = maxcap_/3;
  12.         // high_water_ = (maxcap_*2)/3;
  13.     }
  14. ~BlockQueue()
  15.     {
  16.         pthread_mutex_destroy(&mutex_);
  17.         pthread_cond_destroy(&c_cond_);
  18.         pthread_cond_destroy(&p_cond_);
  19.     }
  20. private:
  21.     std::queue<T> q_;
  22.     //int mincap_;
  23.     int maxcap_;      // 极值
  24.     pthread_mutex_t mutex_;
  25.     pthread_cond_t c_cond_;
  26.     pthread_cond_t p_cond_;
  27.     // int low_water_;
  28.     // int high_water_;
  29. };
复制代码

  • 队列 q_ 共享资源, q被当做团体使用的,q只有一份,加锁
生产关系控制:

  1. void push(const T &in)
  2.     {
  3.         pthread_mutex_lock(&mutex_);
  4.         while(q_.size() == maxcap_){
  5.             pthread_cond_wait(&p_cond_, &mutex_);
  6.         }
  7.         q_.push(in);                  
  8.         // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
  9.         pthread_cond_signal(&c_cond_);
  10.         pthread_mutex_unlock(&mutex_);
  11.     }
复制代码
细节点:

  • 你想生产,就直接能生产吗?不一定。你得先确保生产条件满意
pthread_mutex_lock(&mutex_);
while(q_.size() == maxcap_)

  • 判断也是在访问临界资源,在内部进行判断的,所以锁要放在外貌掩护
  • 货品满了,就伪唤醒后加入等待队列
消费关系的控制

  1. T pop()
  2.     {
  3.         pthread_mutex_lock(&mutex_);
  4.         while(q_.size() == 0)
  5.         {
  6.             pthread_cond_wait(&c_cond_, &mutex_);
  7.         }
  8.         
  9.         T out = q_.front();
  10.         q_.pop();
  11.         // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
  12.         pthread_cond_signal(&p_cond_);
  13.         pthread_mutex_unlock(&mutex_);
  14.         return out;
  15.     }
复制代码
细节点:

  • 生产和消费需要分别设置两个等待队列
  • 你想消费,就直接能消费吗?不一定。你得先确保消费条件满意,while判断队列情况
⭕思考点

   谁来唤醒呢?
  例如:有生产,就可以解锁唤醒消费队列了
  1. q_.pop();
  2. pthread_mutex_unlock(&mutex_);
复制代码
  对计谋的添加:
  发现生产和消费的同步,通过水位线来进行范围管控,例如:

  • if(q_.size()>high_water_) pthread_cond_signal(&c_cond_);//大于某一水位后,唤醒尽快消费
  • if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);

test 函数:

  1. #include "BlockQueue.hpp"
  2. #include "Task.hpp"
  3. #include <unistd.h>
  4. #include <ctime>
  5. void *Consumer(void *args)
  6. {
  7.     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  8.     while (true)
  9.     {
  10.         // 消费(存在管控的加入执行,调用等待队列的封装接口)
  11.         Task t = bq->pop();
  12.         
  13.         std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
  14.         t.run();//接收到的任务对象,调用接口跑起来了
  15.         sleep(1);//Pop前已经检验测试一大堆了
  16.     }
  17. }
  18. void *Productor(void *args)
  19. {
  20.     int len = opers.size();
  21.     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  22.     int x = 10;
  23.     int y = 20;
  24.     while (true)
  25.     {
  26.         // 模拟生产者生产数据
  27.         int data1 = rand() % 10 + 1; // [1,10]
  28.         usleep(10);
  29.         int data2 = rand() % 10;
  30.         char op = opers[rand() % len];
  31.         Task t(data1, data2, op);
  32.         // 生产
  33.         bq->push(t);
  34.         std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
  35.         sleep(1);
  36.     }
  37. }
  38. int main()
  39. {
  40.     srand(time(nullptr));
  41.     // 因为 321 原则
  42.     BlockQueue<Task> *bq = new BlockQueue<Task>();
  43.     pthread_t c[3], p[5];
  44.     for (int i = 0; i < 3; i++)
  45.     {
  46.         pthread_create(c + i, nullptr, Consumer, bq);
  47.     }
  48.     for (int i = 0; i < 5; i++)
  49.     {
  50.         pthread_create(p + i, nullptr, Productor, bq);
  51.     }
  52.     for (int i = 0; i < 3; i++)
  53.     {
  54.         pthread_join(c[i], nullptr);
  55.     }
  56.     for (int i = 0; i < 5; i++)
  57.     {
  58.         pthread_join(p[i], nullptr);
  59.     }
  60.     delete bq;
  61.     return 0;
  62. }
复制代码
  BlockQueue 内部可不可以传递其他数据,比如对象?比如任务???
  可以。进化为基于任务的阻塞队列了
进化执行 Task.hpp

Task t = bq->pop();,cout 执行直接变为t.run()
例如执行如下任务
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. std::string opers="+-*/%";
  5. enum{
  6.     DivZero=1,
  7.     ModZero,
  8.     Unknown
  9. };
  10. class Task
  11. {
  12. public:
  13.     Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
  14.     {
  15.     }
  16.     void run()
  17.     {
  18.         switch (oper_)
  19.         {
  20.         case '+':
  21.             result_ = data1_ + data2_;
  22.             break;
  23.         case '-':
  24.             result_ = data1_ - data2_;
  25.             break;
  26.         case '*':
  27.             result_ = data1_ * data2_;
  28.             break;
  29.         case '/':
  30.             {
  31.                 if(data2_ == 0) exitcode_ = DivZero;
  32.                 else result_ = data1_ / data2_;
  33.             }
  34.             break;
  35.         case '%':
  36.            {
  37.                 if(data2_ == 0) exitcode_ = ModZero;
  38.                 else result_ = data1_ % data2_;
  39.             }            break;
  40.         default:
  41.             exitcode_ = Unknown;
  42.             break;
  43.         }
  44.     }
  45.     void operator ()()
  46.     {
  47.         run();
  48.     }
  49.     std::string GetResult()
  50.     {
  51.         std::string r = std::to_string(data1_);
  52.         r += oper_;
  53.         r += std::to_string(data2_);
  54.         r += "=";
  55.         r += std::to_string(result_);
  56.         r += "[code: ";
  57.         r += std::to_string(exitcode_);
  58.         r += "]";
  59.         return r;
  60.     }
  61.     std::string GetTask()
  62.     {
  63.         std::string r = std::to_string(data1_);
  64.         r += oper_;
  65.         r += std::to_string(data2_);
  66.         r += "=?";
  67.         return r;
  68.     }
  69.     ~Task()
  70.     {
  71.     }
  72. private:
  73.     int data1_;
  74.     int data2_;
  75.     char oper_;
  76.     int result_;
  77.     int exitcode_;
  78. };
复制代码
  生产者消费者模子高效在哪里?
  答案是生产者消费者模式并不高效在队列中拿放,而是在生产之前和消费之后,让线程并行执行!!
同样生产者消费者的意义也不再队列中,而是在放之前同时生产,拿之后同时消费。

3. 注意点


  • 判断生产消费条件




    • 这是因为线程可能被伪唤醒(即线程被唤醒但条件仍未满意),使用 while 可以确保线程在真正满意条件时才继续执行。


  • pthread_cond_wait 函数




    • pthread_cond_wait 是让当火线程进入等待状态的函数。
    • 如果调用失败,线程将继续执行,可能导致逻辑错误(如实验从空队列中取数据或向满队列中添加数据)。


  • 多消费者情况下的唤醒




    • 使用 pthread_cond_broadcast 唤醒所有等待的消费者时,若只有一个数据可供消费,则会导致其他消费者被伪唤醒。
    • 为了避免这种情况,线程在被唤醒后应再次检查条件是否满意。


  • 使用 while 判断的必要性


  • 在判断是否满意生产或消费条件时,应使用 while 循环而非 if 语句。




    • 为了防止伪唤醒导致的题目,必须使用 while ,确保线程在满意条件时才继续执行。


思路:上锁访问,检查到达某一条件,等待,未到达执行,检查完后解锁,所检查完后满意的线程,接收任务同时跑,线程再上锁访问,实验获取下一个任务以此循环。
cp 模子的思路如上,简单实现了代码,下一章将结合信号量,优化完善代码并进行测试~

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表