Linux之信号量 | 消耗者生产者模子的循环队列

打印 上一主题 下一主题

主题 576|帖子 576|积分 1728

目录
一、信号量
1、概念
2、信号量操纵函数
二、基于环形队列的生产者消耗者模子
1、模子分析
2、代码实现
1、单生产单消耗的生产者消耗者模子
2、多生产多消耗的生产者消耗者模子


一、信号量

1、概念

引入:前面我们讲到了,对临界资源进行访问时,为了保证数据的一致性,我们需要对临界资源进行加锁保护。当我们用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只答应一个实验流对这块临界资源进行访问。这样做非常合理,但是效率太低了。
但是,我们最抱负的方案,其实是:如果在同一时刻,差别的实验流要访问的是临界资源的差别地区,那么我们是答应它们同时进行临界资源的访问,这样就大大提高了效率。
比如,如果一个临界资源是一个大小为10数组,我们可以对其加锁保护。但是,如今来看,如果有10个线程同时访问这个数组,可以吗?固然可以,只要这10个线程在同一时刻访问的是数组的差别位置,即10个位置一个线程访问一个。这样我们就可以让多个实验流同时访问临界资源了。
这时,我们就利用了信号量来帮助我们实现这个方案。
   信号量:信号量的本质是一个计数器,通常用来体现临界资源中,资源数的多少。申请信号量实际上就是对临界资源的预定机制。信号量重要用于同步和互斥。
  每个实验流在进入临界区之前都应该先申请信号量,申请成功就有了访问临界资源的权限,当访问完毕后就应该开释信号量。 
信号量的PV操纵:
~  P操纵:我们将申请信号量称为P操纵。申请信号量的本质就是申请获得临界资源中某块资源的访问权限,当申请成功时临界资源中资源的数目应该减一,因此P操纵的本质就是让信号量减一。
~  V操纵:我们将开释信号量称为V操纵。开释信号量的本质就是归还临界资源中某块资源的访问权限,当开释成功时临界资源中资源的数目就应该加一,因此V操纵的本质就是让信号量加一。
2、信号量操纵函数

sem_init:初始化信号量的函数,该函数的函数原型如下:返回值,初始化信号量成功返回0,失败返回-1。
  1. NAME
  2.        sem_init - initialize an unnamed semaphore
  3. SYNOPSIS
  4.        #include <semaphore.h>
  5.        int sem_init(sem_t *sem, int pshared, unsigned int value);
  6.        Link with -pthread.
复制代码
参数说明:
sem:需要初始化的信号量。
pshared:传入0值体现线程间共享,传入非零值体现进程间共享。
value:信号量的初始值。
sem_destroy:销毁信号量的函数,该函数的函数原型如下:
  1. NAME
  2.        sem_destroy - destroy an unnamed semaphore
  3. SYNOPSIS
  4.        #include <semaphore.h>
  5.        int sem_destroy(sem_t *sem);
  6.        Link with -pthread.
复制代码
参数说明:sem:需要销毁的信号量。
返回值:销毁信号量成功返回0,失败返回-1。
sem_wait:期待信号量(申请信号量)的函数,该函数的函数原型如下:
  1. NAME
  2.        sem_wait, sem_timedwait, sem_trywait - lock a semaphore
  3. SYNOPSIS
  4.        #include <semaphore.h>
  5.        int sem_wait(sem_t *sem);
  6.        Link with -pthread.
复制代码
参数:sem:需要期待的信号量。
返回值:期待信号量成功返回0,信号量的值减一。期待信号量失败返回-1,信号量的值保持不变。
sem_post:开释信号量(发布信号量)的函数,该函数的函数原型如下:
  1. NAME
  2.        sem_post - unlock a semaphore
  3. SYNOPSIS
  4.        #include <semaphore.h>
  5.        int sem_post(sem_t *sem);
  6.        Link with -pthread.
复制代码
参数:sem:需要开释的信号量。
返回值:开释信号量成功返回0,信号量的值加一。开释信号量失败返回-1,信号量的值保持不变。
有了对信号量的各种操纵,我们下面来通过一个具体的例子来利用一下他们。
二、基于环形队列的生产者消耗者模子

1、模子分析


实际上并不是真正的环形队列,因为我们没有这种数据布局,它的实现是通过数组模仿的,当数据到场到最后的位置时直接模便是数组的大小即可,这样就可以回到数组的起始位置。
我们在对环形队列进行访问时,当队列为空或者为满,生产者和消耗者就会指向同一个位置,这时我们就需要生产者和消耗者互斥和同步了,如果为空,让生产者先访问,为满就让消耗者先访问。
而当队列不为空,也不为满时,生产者和消耗者可以访问队列差别的位置,以实现并发。这样就可以提高效率了。
为了实现消耗者和生产者的并发访问,我们需要利用信号量。我们利用信号量来体现队列中相关资源的个数。
   1、对于生产者,在意的是队列中的空间资源,只要有空间生产者就可以进行生产。空间资源界说成一个生产者需要的信号量(space_sem),在初始化时,它的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
  
  2、对于消耗者,在意的是队列中的数据资源,只要有数据消耗者就可以进行消耗。数据资源界说成一个消耗者需要的信号量(data_sem),在初始化时,它的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
  当生产者线程生产数据时,它需要先申请信号量,即对space_sem进行P操纵,然后生产数据,放入到队列中。生产完成后,这时,队列中就多出了一个数据资源,需要对data_sem进行V操纵。
当消耗者线程消耗数据时,它也需要先申请信号量,即对data_sem进行P操纵,然后消耗数据。消耗完成后,队列中就会多出一个空间资源,需要对space_sem进行V操纵。
2、代码实现

对信号量进行封装:sem.hpp
  1. #pragma once
  2. #include <iostream>
  3. #include <semaphore.h>
  4. using namespace std;
  5. class sem
  6. {
  7. public:
  8.     sem(int sem)
  9.     {
  10.         sem_init(&sem_, 0, sem);
  11.     }
  12.     void p()
  13.     {
  14.         sem_wait(&sem_);
  15.     }
  16.     void v()
  17.     {
  18.         sem_post(&sem_);
  19.     }
  20.     ~sem()
  21.     {
  22.         sem_destroy(&sem_);
  23.     }
  24. private:
  25.     sem_t sem_;
  26. };
复制代码
1、单生产单消耗的生产者消耗者模子

RingQueue.hpp:环形队列的实现:
  1. #include <iostream>
  2. #include <vector>
  3. #include <pthread.h>
  4. #include "sem.hpp"
  5. #include <sys/types.h>
  6. #include <unistd.h>
  7. using namespace std;
  8. #define GVAL 5
  9. template <class T>
  10. class RingQueue
  11. {
  12. public:
  13.     RingQueue(const int num = GVAL)
  14.         : num_(num), rq_(num), space_sem(num), data_sem(0), p_step(0), c_step(0)
  15.     {
  16.     }
  17.     void push(const T &in)
  18.     {
  19.         space_sem.p(); // 申请信号量
  20.         rq_[p_step++] = in;
  21.         p_step %= num_;
  22.         data_sem.v(); // 释放信号量
  23.     }
  24.     void pop(T *out)
  25.     {
  26.         data_sem.p(); // 申请信号量
  27.         *out = rq_[c_step++];
  28.         c_step %= num_;
  29.         space_sem.v(); // 释放信号量
  30.     }
  31.     ~RingQueue()
  32.     {
  33.     }
  34. private:
  35.     vector<T> rq_;
  36.     int num_;      // 大小
  37.     sem space_sem; // 空间资源信号量
  38.     sem data_sem;  // 数据资源信号量
  39.     int c_step;    // 消费者下标
  40.     int p_step;    // 生产者下标
  41. };
复制代码
单生产单消耗的生产者消耗者模子:
  1. #include <iostream>
  2. #include <pthread.h>
  3. #include "RingQueue.hpp"
  4. using namespace std;
  5. void *consumer(void *arg)
  6. {
  7.     RingQueue<int> *rq = (RingQueue<int> *)arg;
  8.     while(true)
  9.     {
  10.         int x;
  11.         rq->pop(&x);
  12.         cout << "消费:" << x << endl;
  13.         sleep(1);
  14.     }
  15. }
  16. void *productor(void *arg)
  17. {
  18.     RingQueue<int> *rq = (RingQueue<int> *)arg;
  19.     while(true)
  20.     {
  21.         int x = rand() % 100 + 1;
  22.         rq->push(x);
  23.         cout << "生产:" << x << endl;
  24.     }
  25. }
  26. int main()
  27. {
  28.     srand((uint64_t)time(nullptr) ^ getpid() ^ 2321);
  29.     RingQueue<int> *rq = new RingQueue<int>();
  30.     pthread_t c, p;
  31.     pthread_create(&c, nullptr, consumer, (void *)rq);
  32.     pthread_create(&p, nullptr, productor, (void *)rq);
  33.     pthread_join(c, nullptr);
  34.     pthread_join(p, nullptr);
  35.     return 0;
  36. }
复制代码
运行结果:

2、多生产多消耗的生产者消耗者模子

我们只要保证,最终进入临界区的是一个生产者,一个消耗就行,即生产者和生产者之间是互斥的,消耗者和消耗者之间是互斥的。所以我们需要提供两把锁。
RingQueue.hpp:
  1. #include <iostream>
  2. #include <vector>
  3. #include <pthread.h>
  4. #include "sem.hpp"
  5. #include <sys/types.h>
  6. #include <unistd.h>
  7. using namespace std;
  8. #define GVAL 5
  9. template <class T>
  10. class RingQueue
  11. {
  12. public:
  13.     RingQueue(const int num = GVAL)
  14.         : num_(num), rq_(num), space_sem(num), data_sem(0), p_step(0), c_step(0)
  15.     {
  16.         pthread_mutex_init(&clock, nullptr);
  17.         pthread_mutex_init(&plock, nullptr);
  18.     }
  19.     void push(const T &in)
  20.     {
  21.         space_sem.p(); // 申请信号量
  22.         pthread_mutex_lock(&plock);
  23.         rq_[p_step++] = in;
  24.         p_step %= num_;
  25.         pthread_mutex_unlock(&plock);
  26.         data_sem.v(); // 释放信号量
  27.     }
  28.     void pop(T *out)
  29.     {
  30.         data_sem.p(); // 申请信号量
  31.         pthread_mutex_lock(&clock);
  32.         *out = rq_[c_step++];
  33.         c_step %= num_;
  34.         pthread_mutex_unlock(&clock);
  35.         space_sem.v(); // 释放信号量
  36.     }
  37.     ~RingQueue()
  38.     {
  39.         pthread_mutex_destroy(&clock);
  40.         pthread_mutex_destroy(&plock);
  41.     }
  42. private:
  43.     vector<T> rq_;
  44.     int num_;      // 大小
  45.     sem space_sem; // 空间资源信号量
  46.     sem data_sem;  // 数据资源信号量
  47.     int c_step;    // 消费者下标
  48.     int p_step;    // 生产者下标
  49.     pthread_mutex_t plock;
  50.     pthread_mutex_t clock;
  51. };
复制代码
  1. #include <iostream>
  2. #include <pthread.h>
  3. #include "RingQueue.hpp"
  4. using namespace std;
  5. void *consumer(void *arg)
  6. {
  7.     RingQueue<int> *rq = (RingQueue<int> *)arg;
  8.     while (true)
  9.     {
  10.         int x;
  11.         rq->pop(&x);
  12.         cout << "消费:" << x << " " << pthread_self() << endl;
  13.         sleep(1);
  14.     }
  15. }
  16. void *productor(void *arg)
  17. {
  18.     RingQueue<int> *rq = (RingQueue<int> *)arg;
  19.     while (true)
  20.     {
  21.         int x = rand() % 100 + 1;
  22.         rq->push(x);
  23.         cout << "生产:" << x << " " << pthread_self() << endl;
  24.     }
  25. }
  26. int main()
  27. {
  28.     srand((uint64_t)time(nullptr) ^ getpid() ^ 2321);
  29.     RingQueue<int> *rq = new RingQueue<int>();
  30.     pthread_t c[2], p[3];
  31.     for (int i = 0; i < 2; i++)
  32.         pthread_create(c + i, nullptr, consumer, (void *)rq);
  33.     for (int i = 0; i < 3; i++)
  34.         pthread_create(p + i, nullptr, productor, (void *)rq);
  35.     for (int i = 0; i < 2; i++)
  36.         pthread_join(c[i], nullptr);
  37.     for (int i = 0; i < 3; i++)
  38.         pthread_join(p[i], nullptr);
  39.     return 0;
  40. }
复制代码
运行结果:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表