你送出去的每颗糖都去了该去的地方, 其实地球是圆的, 你做的功德终会回到你身上。 --- 何炅 ---
1 信号量
信号量本质是一个计数器,可以在初始化时对设置资源数目,进程 / 线程 可以获取信号量来对资源举行操纵和结束操纵可以开释信号量!
用于多进程 / 多线程 对共享数据对象的读取,它和管道有所差别,它不以传送数据为重要目的,它重要是用来保护共享资源(信号量也属于临界资源),使得资源在一个时刻只有一个进程独享。 在资源只有一个时就一把互斥锁!
信号量只能举行两种操纵获取等待和开释信号,即PV操纵:
- P(sv):我们将申请获取信号量称为P操纵,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请乐成时临界资源中资源的数目应该减去一。所以P操纵的本质就是让计数器减一,假如sv的值大于零,就给它减1;假如它的值为零,就挂起该进程的执行。对应的接口为,使用很简单:
- #include <semaphore.h>
- //阻塞等待获取
- int sem_wait(sem_t *sem);
- //只进行一次获取,非阻塞等待
- int sem_trywait(sem_t *sem);
- //时间片内进行等待,超出就退出阻塞!
- int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
复制代码 - V(sv):我们将开释信号量称为V操纵,开释信号量的本质就是归还临界资源中某块资源的使用权限,当开释乐成时临界资源中资源的数目就应该加一。所以V操纵本质就是让计数器加一,假如有其他进程 / 线程因等待sv而被挂起,就发送信号让它恢复运行,假如没有进程 / 线程因等待信号量而挂起,就给他加1。对应接口为:
- #include <semaphore.h>
- //释放获取的信号量
- int sem_post(sem_t *sem);
复制代码 PV操纵都是原子的,不消担心线程安全!此外信号量初始化和烧毁的接口是:
- 信号量初始化:
- #include <semaphore.h>
- int sem_init(sem_t *sem, int pshared, unsigned int value);
复制代码 参数分别为:
- sem_t *sem:传入信号量的地址
- pshared:传入0值表现线程间共享,传入非零值表现进程间共享。
- value:信号量的初始值(计数器的初始值)。
- 信号量烧毁:
- #include <semaphore.h>
- int sem_destroy(sem_t *sem);
复制代码 2 框架构建
- 环形队列的成员变量
- 线性容器vector模仿环形队列
- 最大容量 int _max_step
- 消耗者位置 _c_step 与 生产者位置 _p_step
- 两个信号量来表现生产与消耗的剩余容量
sem_t _data_sem : 当前有多少数据
sem_t _space_sem: 当前剩余空间还有多少
- 构造函数初始化
- 最大容量必要给值初始化
- 两个初始位置都为 0
- 信号量初始化 sem_init() 数据为 0 ,空间为 最大容量
- Push接口用来参加数据
- 起首必要申请信号量 P 来对空间信号量举行获取 sem_wait (&sem_t _space_sem)(申请信号量是原子的)
获取信号量的本质是对资源 –
- 生产举行插入 , 对应下标向后移动 , 注意不能越界
- 末了举行开释信号量 V 来对资源信号量举行开释 sem_post()
开释信号量的本质是对资源 ++
- Pop接口用来获取数据
- 起首必要申请信号量 P 来对资源信号量举行获取 sem_wait (&sem_t _space_sem)(申请信号量是原子的)
获取信号量的本质是对资源 –
- 获取队列资源,并举行开释, 对应下标向后移动 , 注意不能越界
- 末了举行开释信号量 V 来对空间信号量举行开释 sem_post()
开释信号量的本质是对资源 ++
- 多生产多消耗改造:多个生产 / 消耗线程存在 消耗对消耗 生产对生产的问题!
- 信号量保证了单生产单消耗中,两个线程可以通过信号量来保证不会出现访问越界 / 访问重叠的问题!
- 多线程的情况下可能会发生访问同一位置的可能,获取到信号量之后由于中央的处置惩罚是临界区,可能会发生线程的切换,就会导致对同一位置举行处置惩罚,进而发生问题!
- 为了保证线程安全,必要两把锁,分别管理生产者和消耗者!
- 锁的处置惩罚:
- 获取信号量之后再举行加锁,获取信号量是原子的,先申请信号量可以保证多个线程在获取中举行排队等待。
- 假如先加锁,就只能使一个线程进入到获取信号量的队列中,服从低(影戏院先买票在排队 ,先排队再买票)
6.为什么信号量不加条件判断?:
在环形队列的实现中,没有使用条件变量,像阻塞队列一样举行条件的判断 而是直接来不管三七二十一举行获取信号量,因为信号量本身就是判断条件,信号量是用来描述内部资源的多少的,是原子的!本质是一个计数器 通过预订机制来保证内部资源的公道使用,当信号量的资源数目为1时和锁时等价的!
3 代码实现
- #pragma once
- #include <vector>
- #include <semaphore.h>
- const int default_cap = 5;
- template <class T>
- class RingQueue
- {
- public:
- RingQueue(int max_cap = default_cap) : _rq(max_cap), _max_cap(max_cap), _p_step(0), _c_step(0)
- {
- // 信号量初始化
- sem_init(&_space_sem, 0, _max_cap);
- sem_init(&_data_sem, 0, 0);
- //锁进行初始化
- pthread_mutex_init(&_c_mtx , nullptr);
- pthread_mutex_init(&_p_mtx , nullptr);
- }
- // 获取信号量
- void P(sem_t &sp)
- {
- sem_wait(&sp);
- }
- // 释放信号量
- void V(sem_t &sp)
- {
- sem_post(&sp);
- }
- // 插入操作
- void Push(const T &t)
- {
- // 获取空间信号量 --
- P(_space_sem);
- //临界区上锁
- pthread_mutex_lock(&_p_mtx );
- _rq[_p_step] = t;
- _p_step++;
- _p_step %= _max_cap;
- //解锁
- pthread_mutex_unlock(&_p_mtx);
- // 释放信号量 ++
- V(_data_sem);
- }
- // 获取操作
- void Pop(T *t)
- {
- // 获取资源信号量
- P(_data_sem);
- pthread_mutex_lock(&_c_mtx);
- *t = _rq[_c_step];
- _c_step++;
- _c_step %= _max_cap;
- pthread_mutex_unlock(&_c_mtx);
- // 释放信号量
- V(_space_sem);
- }
- ~RingQueue()
- {
- // 销毁对应信号量!
- sem_destroy(&_space_sem);
- sem_destroy(&_data_sem);
- //锁进行释放
- pthread_mutex_destroy(&_c_mtx);
- pthread_mutex_destroy(&_p_mtx);
- }
- private:
- // 底层线性结构,模拟环形队列
- std::vector<T> _rq;
- // 最大容量
- int _max_cap;
- // 生产者/消费者 下标
- int _p_step;
- int _c_step;
- // 空间/资源 信号量
- sem_t _space_sem;
- sem_t _data_sem;
- // 生产 / 消费 锁
- pthread_mutex_t _p_mtx;
- pthread_mutex_t _c_mtx;
- };
复制代码 4 测试运行
我们来做一些简单测试,我们计划了Task类,用于执行加法操纵。它包含两个整型参数_x和_y,并提供方法来执行加法并获取效果。通过重载括号运算符,Task对象可以被直接调用以执行计算。此外,类还提供了调试信息和效果输出的功能。
我写了一段代码段用于测试。在该测试中:界说了两个线程函数Consumer和Productor,分别模仿消耗者和生产者行为:
- Consumer线程不断从环形队列中取出Task对象,执行其操纵,并打印消耗效果。
- Productor线程则持续生成新的Task对象并将其放入队列中,同时打印出生产信息。
主函数main中创建了一个容量为5的RingQueue<Task>实例,并启动了两个线程。pthread_create用于创建线程,pthread_join确保主线程等待子线程执行完毕。通过这种方式,我们验证了环形队列在多线程环境下的线程安全性和功能精确性。
- #include <iostream>
- #include "RingQueue.hpp"
- #include <pthread.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include "Task.hpp"
- void *Consumer(void *args)
- {
- RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
- srand(time(nullptr) ^ getpid());
- while (true)
- {
- // 不断的进行获取
- Task data ;
- rq->Pop(&data);
- data();
- std::cout << "Consumer 消费者消费 -> " << data.result() << std::endl;
- sleep(1);
- }
- }
- void *Productor(void *args)
- {
- RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
- srand(time(nullptr) ^ getpid());
- while (true)
- {
- // 不断的进行写入
- int num1 = rand() % 10;
- usleep(1000);
- int num2 = rand() % 10;
- Task t(num1 , num2);
- rq->Push(t);
- std::cout << "Productor 生产者生产 -> " << t.debug() << std::endl;
- usleep(10000);
- }
- }
- int main()
- {
- // 环形队列
- RingQueue<Task> rq(5);
- // 使用两个线程来测试
- pthread_t t1, t2;
- pthread_create(&t1, nullptr, Consumer, &rq);
- pthread_create(&t2, nullptr, Productor, &rq);
- pthread_join(t1, nullptr);
- pthread_join(t2, nullptr);
- }
复制代码 运行效果:
很好的完成了任务!!!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |