【Linux】线程同步和互斥

打印 上一主题 下一主题

主题 686|帖子 686|积分 2060

一、线程互斥

1.相关概念

   1.临界资源:多线程实行流共享的资源,且一次只能允许一个实行流访问的资源就叫做临界资源。(多线程、多历程打印数据)
  2.临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
  3.互斥:任何时间,互斥保证有且只有一个实行流进入临界区,访问临界资源,通常对临界资源起保护作用
  4.原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么不实行 。
  实现一个小实例
  
  1. #include<iostream>
  2. #include<unistd.h>
  3. #include<stdio.h>
  4. using namespace std; int ticket=10000;
  5. void* threadRoutinue(void* args) {
  6.     const char* name=static_cast<const char*>(args);
  7.     while(true)
  8.     {
  9.         if(ticket>0)
  10.         {
  11.             usleep(1000);//模拟抢票花费时间
  12.             cout<<name<<"get a ticket: "<<ticket--<<endl;
  13.         }
  14.         else{
  15.             break;
  16.         }
  17.     }
  18.     return nullptr; }
  19. int main() {
  20.     //创建线程模拟抢票
  21.     pthread_t tid[4];
  22.     int n=sizeof(tid)/sizeof(tid[0]);
  23.     for(int i=0;i<n;i++)
  24.     {
  25.         char buffer[64];
  26.         snprintf(buffer,sizeof(buffer),"thread_%d",i);
  27.         pthread_create(tid+i,nullptr,threadRoutinue,buffer);
  28.     }
  29.     for(int i=0;i<n;i++)
  30.     {
  31.         pthread_join(tid[i],nullptr);
  32.     }
  33.     return 0; }
复制代码

  从程序中可以看到,票数到0的时间就没有票了,线程就应该退出了。
但是效果中,票数甚至被抢到了负数,这是怎么回事。
这里提一个问题,这里对票(临界资源)的访问是原子的吗?(是安全的吗?) 答案肯定不是!!

大概在一个线程A中,刚刚将tickets加载到内存上,线程A就被切走了,这时线程A的数据和上下文被保存,线程A从CPU上被剥离。
线程B开始抢票,假如他的竞争力非常强,一次运行后抢到了1000张票。
线程B实行完后线程A又来了,他会从前次实行的地方继承实行,但是他前次保存的tickets的数据是10000,以是抢到了一张票后,将剩余的9999张票写回内存,原来线程B实行完后还剩9000张票,但是线程A实行完后剩余的票数反而增多了。
2.互斥锁(mutex)

对于上面的抢票程序,要想使每个线程精确的抢票就要保证:当一个线程在进入到抢票环节时,其他线程不能举行抢票。
以是就可以对抢票环节加互斥锁。
   pthread_mutex_init、pthread_mutex_destroy:对线程锁举行初始化和销毁
  1. #include <pthread.h>
  2. // pthread_mutex_t mutex: 锁变量,所有线程都可看到
  3. int pthread_mutex_destroy(pthread_mutex_t *mutex);// 销毁锁
  4. int pthread_mutex_init(pthread_mutex_t *restrict mutex,constpthread_mutexattr_t *restrict attr);// 初始化锁
  5. // attr: 锁属性,我们传入空指针就可  
  6. // 如果将锁定义为静态或者全局的,可以使用宏直接初始化,且不用销毁
  7. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码
pthread_mutex_lock、int pthread_mutex_unlock:对线程举行加锁息争锁
  1. #include <pthread.h>
  2. int pthread_mutex_lock(pthread_mutex_t *mutex);   
  3. int pthread_mutex_unlock(pthread_mutex_t *mutex);
复制代码
对抢票小demo举行加锁
  
  1. #include<iostream>
  2. #include<unistd.h>
  3. #include<stdio.h>
  4. using namespace std; int ticket=10000;  //临界资源
  5. pthread_mutex_t mutex;
  6. void* threadRoutinue(void* args) {
  7.     const char* name=static_cast<const char*>(args);
  8.     while(true)
  9.     {
  10.         pthread_mutex_lock(&mutex);
  11.         if(ticket>0)
  12.         {
  13.             usleep(1000);//模拟抢票花费时间
  14.             cout<<name<<" get a ticket: "<<ticket--<<endl;
  15.             pthread_mutex_unlock(&mutex);
  16.         }
  17.         else{
  18.             cout<<name<<"票抢完了"<<endl;
  19.             pthread_mutex_unlock(&mutex);
  20.             break;
  21.         }
  22.         usleep(1000);
  23.     }
  24.     return nullptr; }
  25. int main() {
  26.     pthread_mutex_init(&mutex,nullptr);
  27.     //创建线程模拟抢票
  28.     pthread_t tid[4];
  29.     int n=sizeof(tid)/sizeof(tid[0]);
  30.     for(int i=0;i<n;i++)
  31.     {
  32.         char buffer[64];
  33.         snprintf(buffer,sizeof(buffer),"thread_%d",i);
  34.         pthread_create(tid+i,nullptr,threadRoutinue,buffer);
  35.     }
  36.     for(int i=0;i<n;i++)
  37.     {
  38.         pthread_join(tid[i],nullptr);
  39.     }
  40.     pthread_mutex_destroy(&mutex);   
  41.     return 0; }
复制代码
多线程临界资源原子

    细节:
1.凡是访问同一个临界资源的线程,都要举行加锁保护,而且必须加同一把锁,这是一个规则,不能有破例
2.每一个线程访问临界资源之前,得加锁,加锁本质是给 临界区加锁,加锁的粒度只管细一些。
3.线程访问临界区的时间,必要先加锁 -> 以是线程都必须看到同一把锁 -> 锁本身就是公共资源 -> 锁如何保证自己的安全? -> 加锁息争锁本身就是原子的。
4.临界区可以是一行代码,可以是一批代码,a.线程大概被切换? 固然大概 b.切换会有影响嘛? 没有,由于一个线程申请一个锁以后,该线程被临时切换,其他任何线程没有办法进入临界区,无法申请到锁,以是无法访问到临界资源。
5.这也正是表现互斥带来的串行化的表现,站在其他线程的角度,对其他线程有意义的状态是:锁被申请(持有锁),锁被开释(不持有锁),原子性。


  3.互斥锁的原理

以抢票程序为例,当线程必要访问临界资源时,必要先访问mtx,为了全部的线程都能看到它,以是锁肯定是全局的。
且锁本身也是临界资源。那么如何保证锁本身是安全的,即获取锁的过程是安全的。
其原理是:加锁(lock)、解锁(unlock)的过程是原子的!

那怎样才算是原子的呢:一行代码被翻译成汇编后只有一条汇编,就是原子的。
为了实现互斥锁操作,大多数体系结构都提供了swapexchange指令。
该指令的作用是把寄存器和内存单元的数据相互换,由于只有一条指令,保证了原子性。
纵然是多处理惩罚器平台,访问内存的 总线周期也有先后,一个处理惩罚器上的互换指令实行时另一个处理惩罚器的互换指令只能等待总线周期。

当线程申请到锁之后,进入到临界区访问临界资源,这时线程也大概被切走,被切走后会保护上下文,而锁数据也在上下文中。
以是锁也被带走了,以是即便是该线程被挂起了,其他线程也不能申请到锁,也不能进入临界区。
必须等待拥有锁的线程开释锁之后才能申请到锁。
4.自定义封装一个锁

  1. #pragma once
  2. #include<iostream>
  3. #include<pthread.h>
  4. //封装锁
  5. class _Mutex
  6. {
  7. public:
  8.     _Mutex(pthread_mutex_t* mutex):_mutex(mutex)
  9.     {}
  10.     void lock()
  11.     {
  12.         pthread_mutex_lock(_mutex);
  13.     }
  14.     void unlock()
  15.     {
  16.         pthread_mutex_unlock(_mutex);
  17.     }
  18. private:
  19.     pthread_mutex_t* _mutex;
  20. };
  21. class lockGuard
  22. {
  23. public:
  24.     lockGuard(pthread_mutex_t* mutex):_mutex(mutex)
  25.     {
  26.         _mutex.lock();
  27.     }
  28.     ~lockGuard()
  29.     {
  30.         _mutex.unlock();
  31.     }
  32. private:
  33.     _Mutex _mutex;
  34. };
复制代码
  我们可以使用我们自己封装的锁解决抢票问题

  二、可重入和线程安全

   线程安全: 线程安全指的是在多线程编程中,多个线程对临界资源举行争抢访问而不会造成数据二义或程序逻辑混乱的情况。常见对全局变量大概静态变量举行操作,而且没有锁保护的情况下,会出现该问题。

重入: 同一个函数被差别的实行流调用,当前一个流程还没有实行完,就有其他的实行流再次进入,我们称之为重入。一个函数在重入的情况下,运行效果不会出现任何差别大概任何问题,则该函数被称为可重入函数,否则,是不可重入函数
  线程安全的实现,通过同步与互斥实现
具体互斥的实现可以通过互斥锁和信号量实现、而同步可以通过条件变量与信号量实现。
常见的线程不安全的情况:
   

  • 不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数
  常见不可重入的情况:
   

  • 调用了malloc/free函数,由于malloc函数是用全局链表来管理堆的
  • 调用了尺度I/O库函数,尺度I/O库的很多实现都以不可重入的方式使用全局数据结构
  • 可重入函数体内使用了静态的数据结构
  可重入与线程安全联系:
   

  • 函数是可重入的,那就是线程安全的
  • 函数是不可重入的,那就不能由多个线程使用,有大概引发线程安全问题
  • 假如一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
  可重入与线程安全区别:
   

  • 可重入函数是线程安全函数的一种
  • 线程安全不肯定是可重入的,而可重入函数则肯定是线程安全的。
  • 假如将对临界资源的访问加上锁,则这个函数是线程安全的,但假如这个重入函数锁还未开释则会产生死锁
  三、死锁

死锁概念

   死锁是指在一组历程中的各个历程均占有不会开释的资源,但因互相申请被其他历程所占用不会开释的资 源而处于的一种永久等待状态。
  死锁四个必要条件

   

  • 互斥条件:一个资源每次只能被一个实行流使用
  • 请求与保持条件:一个实行流因请求资源而壅闭时,对已获得的资源保持不放
  • 不剥夺条件:一个实行流已获得的资源,在末使用完之前,不能强行剥夺
  • 循环等待条件:多少实行流之间形成一种头尾相接的循环等待资源的关系
  如何避免死锁

核心头脑:破坏死锁的4个必要条件中任意一个!
   

  • 不加锁
  • 自动开释锁
  • 按顺序申请锁
  • 资源一次性分配
  破坏死锁的一个小demo(自动开释锁)
  
  1. #include <iostream>
  2. #include<unistd.h>
  3. #include <pthread.h>
  4. using namespace std;
  5. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  6. void *pthreadRoutinue(void *args) {  
  7.     pthread_mutex_lock(&mutex); //加锁
  8.     cout<<"I get a mutex"<<endl;
  9.     pthread_mutex_lock(&mutex); //产生死锁
  10.     cout<<"i alive again"<<endl;
  11.     return nullptr; }
  12. int main() {
  13.     pthread_t pid;
  14.     pthread_create(&pid, nullptr, pthreadRoutinue, nullptr);
  15.     sleep(3);
  16.     cout<<"main thread run"<<endl;
  17.     pthread_mutex_unlock(&mutex);//主线程区解锁
  18.     cout<<"main thread unlock"<<endl;
  19.     sleep(3);
  20.     return 0; }
复制代码

  四、线程同步

   同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问 题,叫做同步 。
  1.条件变量

概念

   与互斥锁差别,条件变量是用来等待而不是用来上锁的。条件变量用来自动壅闭一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。

条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量举行同步的一种机制,重要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。
  条件变量接口

   pthread_cond_init、pthread_cond_destroy:初始化、销毁条件变量
  1. #include <pthread.h>
  2. int pthread_cond_destroy(pthread_cond_t *cond);
  3. // pthread_cond_t:条件变量类型,类似pthread_mutex_t int
  4. pthread_cond_init(pthread_cond_t *restrict cond,constpthread_condattr_t *restrict attr);   
  5. // 如果是静态或全局的条件变量可使用宏初始化:
  6. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
复制代码
   pthread_cond_wait、pthread_cond_signal:等待条件、叫醒线程
  1. #include <pthread.h>   
  2. // 等待条件满足
  3. int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);   
  4. // 唤醒一个线程,在cond等待队列里的第一个线程
  5. int pthread_cond_signal(pthread_cond_t *cond);
  6. // 一次唤醒所有线程
  7. int pthread_cond_broadcast(pthread_cond_t *cond); ```
复制代码
demo
  
  1. #include <iostream>
  2. #include<unistd.h>
  3. #include <pthread.h>
  4. using namespace std;
  5. #define num 5
  6. int ticket =1000; pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
  7. pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
  8. void *active(void *args) {  
  9.     string name=static_cast<const char*>(args);
  10.     while(true)
  11.     {
  12.         pthread_mutex_lock(&mutex);
  13.         pthread_cond_wait(&cond,&mutex); //调用该函数,会自己释放锁
  14.         cout<<name<<" 活动"<<endl;
  15.         pthread_mutex_unlock(&mutex);
  16.     }
  17.     return nullptr; }
  18. int main() {
  19.     pthread_t tids[num];
  20.     for(int i=0;i<num;i++)
  21.     {
  22.         char * name=new char[64];
  23.         snprintf(name,64,"thread-%d",i); //线程创
  24.         pthread_create(tids+i,nullptr,active,name);
  25.     }
  26.     sleep(3);
  27.     while(true)
  28.     {
  29.         cout<<"main thread wakeup thread..."<<endl;
  30.         //pthread_cond_signal(&cond); //唤醒cond队列中的一个线程
  31.         pthread_cond_broadcast(&cond); //将cond队列中所以线程唤醒
  32.         sleep(1);
  33.     }
  34.     for(int i=0;i<num;i++)
  35.     {
  36.         pthread_join(tids[i],nullptr); //线程等待
  37.     }
  38.     sleep(3);
  39.     return 0; }
复制代码

  基于壅闭队列实现生产者消费者模子


   生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过壅闭队列来举行通讯,以是生产者生产完数据之后不用等待消费者处理惩罚,直接扔给壅闭队列,消费者不找生产者要数据,而是直接从壅闭队列里取,壅闭队列就相当于一个缓冲区,均衡了生产者和消费者的处理惩罚能力。这个壅闭队列就是用来给生产者和消费者解耦的。

生产者消费者模子的长处:解耦 支持并发 支持忙闲不均

实则之前所讲的历程间通讯中的管道通讯就是一种生产者消费者模子,管道就是让差别的历程能够看到同一份资源,且管道自带同步和互斥的机制。历程间通讯的本质实在就是生产者消费者模子。
  代码:
   blockQueue.hpp
  1. #pragma once
  2. #include <iostream>
  3. #include <queue>
  4. #include <pthread.h>
  5. const int gcap = 5;
  6. template <class T> class BlockQueue { public:
  7.     BlockQueue(const int cap = gcap) : _cap(cap)
  8.     {
  9.         pthread_mutex_init(&_mutex, nullptr);
  10.         pthread_cond_init(&_consumerCond, nullptr);
  11.         pthread_cond_init(&_productorCond, nullptr);
  12.     }
  13.     ~BlockQueue()
  14.     {
  15.         pthread_mutex_destroy(&_mutex);
  16.         pthread_cond_destroy(&_consumerCond);
  17.         pthread_cond_destroy(&_productorCond);
  18.     }
  19.     bool isFull()
  20.     {
  21.         return _cap == _q.size();
  22.     }
  23.     void push(const T &in)
  24.     { // 生产
  25.         pthread_mutex_lock(&_mutex);
  26.         while (isFull())  //细节1:使用while ,防止多线程被唤醒生产过多
  27.         {   // 我们只能在临界区内部,判断临界资源是否就绪 注定了我们在当前一定是持有锁的
  28.             pthread_cond_wait(&_productorCond, &_mutex); // 如果队列为满,生产者线程休眠 ,此时持有锁,wait会将锁unlock
  29.             // 当线程醒来的时候,注定了继续从临界区内部继续运行,因为是在临界区被切走的
  30.             // 注定了当线程被唤醒的时候,继续在pthread_cond_wait()函数继续向后运行,又要重新申请锁,申请成功才会彻底返回
  31.         }
  32.         // 没有满,让他继续生产
  33.         _q.push(in);
  34.         //策略,唤醒消费者线程
  35.         pthread_cond_signal(&_consumerCond);
  36.         pthread_mutex_unlock(&_mutex);
  37.     }
  38.     void pop(T *out)
  39.     {
  40.         pthread_mutex_lock(&_mutex);
  41.         while (_q.empty())  //队列为空
  42.         {
  43.             pthread_cond_wait(&_consumerCond, &_mutex);
  44.         }
  45.         *out = _q.front();
  46.         _q.pop();
  47.         //策略,唤醒生产者
  48.         pthread_cond_signal(&_productorCond);
  49.         pthread_mutex_unlock(&_mutex);
  50.     }
  51. private:
  52.     std::queue<T> _q;
  53.     int _cap;
  54.     pthread_mutex_t _mutex;
  55.     pthread_cond_t _consumerCond;  // 消费者对应的条件变量 空 wait
  56.     pthread_cond_t _productorCond; // 生产者对应的条件变量 满 wait }; ```
复制代码
   task.hpp
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. class Task { public:
  5.     Task() {}
  6.     Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
  7.     {
  8.     }
  9.     void operator()()
  10.     {
  11.         switch (_op)
  12.         {
  13.         case '+':
  14.             _result = _x + _y;
  15.             break;
  16.         case '-':
  17.             _result = _x - _y;
  18.             break;
  19.         case '*':
  20.             _result = _x * _y;
  21.             break;
  22.         case '/':
  23.             if(_y==0) _exitCode=-1;
  24.             else _result = _x / _y;
  25.             break;
  26.         case '%':
  27.             if(_y==0) _exitCode=-1;
  28.             else _result = _x % _y;
  29.             break;
  30.         default:
  31.             break;
  32.         }
  33.     }
  34.     std::string formatArg()
  35.     {
  36.         return std::to_string(_x)+' '+ _op+ ' '+std::to_string(_y)+" = ";
  37.     }
  38.     std::string formatRes()
  39.     {
  40.         return std::to_string(_result) + "(" +std::to_string(_exitCode)+")";
  41.     }
  42.     ~Task(){}
  43. private:
  44.     int _x;
  45.     int _y;
  46.     char _op;
  47.     int _result;
  48.     int _exitCode; }; ```
复制代码
   main.cc
  1. #include "blockQueue.hpp"
  2. #include"task.hpp"
  3. #include<ctime>
  4. #include<unistd.h>
  5. void *consumer(void *args) {
  6.     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  7.     while(true)
  8.     {
  9.         sleep(1);
  10.         Task t;
  11.         //1.将数据从blockqueue中获取  -- 获取到数据
  12.         bq->pop(&t);
  13.         t();
  14.         //2.结合某种业务逻辑,处理数据!
  15.         std::cout<<"consumer data: "<<t.formatArg()<<t.formatRes()<<std::endl;
  16.     } }
  17. void *productor(void *args) {
  18.     srand((uint64_t)time(nullptr)^getpid());
  19.     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  20.     std::string opers="+-*/%";
  21.     while(true)
  22.     {
  23.         //1.先通过某种渠道获取数据
  24.         int x=rand()%20+1;
  25.         int y=rand()%10+1;
  26.         //2.将数据推送到blockqueue  -- 完成生产过程
  27.         char op=opers[rand()%opers.size()];
  28.         Task t(x,y,op);
  29.         bq->push(t);
  30.         std::cout<<"productor Task: "<<t.formatArg()<<"?"<<std::endl;
  31.     } }
  32. int main() {
  33.     //BlockQueue<int> *bq = new BlockQueue<int>();
  34.     BlockQueue<Task> *bq = new BlockQueue<Task>();
  35.     // 单生产,单消费  支持多生产,多消费,因为看到同一快资源,使用同一把锁
  36.     pthread_t c, p;
  37.     pthread_create(&c, nullptr, consumer, bq);
  38.     pthread_create(&p, nullptr, productor, bq);
  39.     pthread_join(c, nullptr);
  40.     pthread_join(p, nullptr);
  41.     delete bq;
  42.     return 0; } ```
复制代码
   运行效果:
  

  2.信号量

概念

信号量本质就是一个计数器,用来形貌临界区中临界资源的数目巨细。
临界资源假如可以被划分为更小的资源,假如处理惩罚恰当,我们也有大概让多个线程同时访问临界资源,从而实现并发。
但是每个线程想访问临界资源,都得先申请信号量资源。
信号量操作接口

申请信号量乐成时,临界资源的数目会减一;开释信号量时,临界资源的数目会加一。
由于信号量是用来维护临界资源的,首先必须得保证自身是安全的,以是常规的对全局变量的++或–操作肯定是不可的。
P操作(申请信号量)
V操作(开释信号量)
   sem_init、sem_destroy:初始化销毁信号量(具体用法与mutex和cond非常类似)
  1. #include <semaphore.h>
  2. int sem_init(sem_t *sem, int pshared, unsigned int value);
  3. // pshared: 默认为0, value:信号量的初始值(count)
  4. int sem_destroy(sem_t *sem);   
  5. // sem_t :信号量类型 // Link with -pthread. ```
复制代码
   sem_wait、sem_signal: 申请、开释信号量
  1. int sem_wait(sem_t *sem); // P操作  
  2. int
  3. sem_post(sem_t *sem); // V操作   
  4. // Link with -pthread. ```
复制代码
基于环形队列的生产者消费者模子


但是如今的环形队列的判空判满不再使用中的两种方式判定,由于有了信号量可以判定。
队列为空的时间,消费者和生产者指向同一个位置。(生产和消费线程不能同时举行)(生产者实行)
队列为满的时间,消费者和生产者也指向同一个位置。(生产和消费线程不能同时举行)(消费者实行)
当队列不为空不为满的时间,消费者和生产者不指向同一个位置。(生产和消费线程可以并发实行)
根据上面三种情况,基于环形队列的生产者消费者模子应该遵守以下规则:
   

  • 生产者不能把消费者套一个圈
  • 消费者不能凌驾生产者
  • 当指向同一个位置的时间,要根据空、满状态,判定让谁先实行
  • 其他情况,消费者和生产者可以并发实行
  实现:
   ringQueue.hpp
  1. #pragma once
  2. #include <iostream>
  3. #include <vector>
  4. #include <pthread.h>
  5. #include <semaphore.h>
  6. static const int N = 5;
  7. template <class T> class RingQueue { private:
  8.     void P(sem_t &s)  
  9.     {
  10.         sem_wait(&s);
  11.     }
  12.     void V(sem_t &s)
  13.     {
  14.         sem_post(&s);
  15.     }
  16.     void Lock(pthread_mutex_t &m)
  17.     {
  18.         pthread_mutex_lock(&m);
  19.     }
  20.     void Unlock(pthread_mutex_t &m)
  21.     {
  22.         pthread_mutex_unlock(&m);
  23.     }
  24. public:
  25.     RingQueue(int num = N) : _ring(num), _cap(num)
  26.     {
  27.         sem_init(&_data_sem, 0, 0);
  28.         sem_init(&_space_sem, 0, num);
  29.         _c_step = _p_step = 0;
  30.         pthread_mutex_init(&_c_mutex, nullptr);
  31.         pthread_mutex_init(&_p_mutex, nullptr);
  32.     }
  33.     // 生产
  34.     void push(const T &in)
  35.     {
  36.         // 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
  37.         // 2. 什么时候用锁,对应的临界资源,是否被整体使用
  38.         P(_space_sem);  // P()
  39.         Lock(_p_mutex);
  40.         _ring[_p_step++] = in;
  41.         _p_step %= _cap;
  42.         Unlock(_p_mutex);
  43.         V(_data_sem);
  44.     }
  45.     // 消费
  46.     void pop(T *out)
  47.     {
  48.         P(_data_sem);
  49.         Lock(_c_mutex);
  50.         *out = _ring[_c_step++];
  51.         _c_step %= _cap;
  52.         Unlock(_c_mutex);
  53.         V(_space_sem);
  54.     }
  55.     ~RingQueue()
  56.     {
  57.         sem_destroy(&_data_sem);
  58.         sem_destroy(&_space_sem);
  59.         pthread_mutex_destroy(&_c_mutex);
  60.         pthread_mutex_destroy(&_p_mutex);
  61.     }
  62. private:
  63.     std::vector<T> _ring;
  64.     int _cap;         // 环形队列容器大小
  65.     sem_t _data_sem;  // 只有消费者关心
  66.     sem_t _space_sem; // 只有生产者关心
  67.     int _c_step;      // 消费位置
  68.     int _p_step;      // 生产位置
  69.     pthread_mutex_t _c_mutex;
  70.     pthread_mutex_t _p_mutex; };
复制代码
   task.hpp
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. class Task { public:
  5.     Task() {}
  6.     Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
  7.     {
  8.     }
  9.     void operator()()
  10.     {
  11.         switch (_op)
  12.         {
  13.         case '+':
  14.             _result = _x + _y;
  15.             break;
  16.         case '-':
  17.             _result = _x - _y;
  18.             break;
  19.         case '*':
  20.             _result = _x * _y;
  21.             break;
  22.         case '/':
  23.             if(_y==0) _exitCode=-1;
  24.             else _result = _x / _y;
  25.             break;
  26.         case '%':
  27.             if(_y==0) _exitCode=-1;
  28.             else _result = _x % _y;
  29.             break;
  30.         default:
  31.             break;
  32.         }
  33.     }
  34.     std::string formatArg()
  35.     {
  36.         return std::to_string(_x)+' '+ _op+ ' '+std::to_string(_y)+" = ";
  37.     }
  38.     std::string formatRes()
  39.     {
  40.         return std::to_string(_result) + "(" +std::to_string(_exitCode)+")";
  41.     }
  42.     ~Task(){}
  43. private:
  44.     int _x;
  45.     int _y;
  46.     char _op;
  47.     int _result;
  48.     int _exitCode; }; ```
复制代码
   main.cc
  1. #include "ringQueue.hpp"
  2. #include "task.hpp"
  3. #include <ctime>
  4. #include <pthread.h>
  5. #include <memory>
  6. #include <sys/types.h>
  7. #include <unistd.h>
  8. #include <cstring>
  9. using namespace std;
  10. const char *ops = "+-*/%";
  11. void *consumerRoutine(void *args) {
  12.     RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
  13.     while (true)
  14.     {
  15.         Task t;
  16.         rq->pop(&t);
  17.         t();
  18.         cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
  19.     } }
  20. void *productorRoutine(void *args) {
  21.     RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
  22.     while (true)
  23.     {
  24.         // sleep(1);
  25.         int x = rand() % 100;
  26.         int y = rand() % 100;
  27.         char op = ops[(x + y) % strlen(ops)];
  28.         Task t(x, y, op);
  29.         rq->push(t);
  30.         cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
  31.     } }
  32. int main() {
  33.     srand(time(nullptr) ^ getpid());
  34.     RingQueue<Task> *rq = new RingQueue<Task>();
  35.     // 单生产单消费
  36.     // pthread_t c, p;
  37.     // pthread_create(&c, nullptr, consumerRoutine, rq);
  38.     // pthread_create(&p, nullptr, productorRoutine, rq);
  39.     // pthread_join(c, nullptr);
  40.     // pthread_join(p, nullptr);
  41.     //多生产,多消费
  42.     pthread_t c[3], p[2];
  43.     for (int i = 0; i < 3; i++)
  44.         pthread_create(c + i, nullptr, consumerRoutine, rq);
  45.     for (int i = 0; i < 2; i++)
  46.         pthread_create(p + i, nullptr, productorRoutine, rq);
  47.     for (int i = 0; i < 3; i++)
  48.         pthread_join(c[i], nullptr);
  49.     for (int i = 0; i < 2; i++)
  50.         pthread_join(p[i], nullptr);
  51.     delete rq;
  52.     return 0; } ```
复制代码
   运行效果
  

  五、总结

互斥锁与信号量的异同
   

  • 互斥锁由同一线程加放锁,信号量可以由差别线程举行PV操作。
  • 计数信号量允很多个线程,且值为剩余可用资源数量。互斥锁保证多个线程对一个共享资源的互斥访问,信号量用于协调多个线程对一系列资源的访问条。
  条件变量与信号量的异同
   

  • 使用条件变量可以一次叫醒全部等待者,而这个信号量没有的功能。
  • 信号量是有一个值,而条件变量是没有的。从实现上来说一个信号量可以是用mutex + count + cond实现的。由于信号量有一个状态,可以精准的同步,信号量可以解决条件变量中存在的叫醒丢失问题。
  • 条件变量一样平常必要配合互斥锁使用,而信号量可根据情况而定。
  • 有了互斥锁和条件变量还提供信号量的原因是:只管信号量的意图在于历程间同步,互斥锁和条件变量的意图在于线程间同步,但是信号量也可用于线程间,互斥锁和条件变量也可用于历程间。信号量最有效的场景是用以指明可用资源的数量。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表