linux:生产者消耗者模子

李优秀  金牌会员 | 2024-6-21 23:42:44 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 683|帖子 683|积分 2049


   个人主页 : 个人主页
个人专栏 : 《数据结构》 《C语言》《C++》《Linux》
  
  

前言

本文是对于生产者消耗者模子的知识总结

一、生产者消耗者模子

生产者消耗者模子就是通过一个容器来解决生产者消耗者的强耦合问题。生产者和消耗者相互之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者生产完数据之后不消等待消耗者处理,直接交给容器,消耗者不找生产者要数据,而是直接从容器中取数据,壅闭队列就相称于一个缓冲区,均衡了生产者和消耗者的处理能力,其中这个容器就是用于生产者和消耗者之间解耦的
(强耦合是指两个或多个系统,组件或模块之间存在紧密依赖关系。)


特点:

  • 三种关系:生产者与生产者之间(互斥),消耗者与消耗者之间(互斥),生产者与消耗者之间(互斥 && 同步)
  • 两种脚色:生产者和消耗者
  • 一个交易(通讯)场所:一个容器(一段内存空间)
由于我们是多个线程访问同一个容器,那必然会导致数据不一致的问题,所以我们必要对该临界资源加锁,所以生产者与生产者之间,消耗者与消耗者之间,生产者与消耗者之间都是互斥的。
又由于容器大概为空(满),此时消耗者(生产者)还不停在临界区申请锁,又因没有数据(空间)而释放锁,从而不断申请锁释放锁,导致生产者(消耗者)的饥饿问题。此时我们就必要生产者与消耗者之间的同步。
对于2,3两点,这很好明白不解释。
我们编写生产者消耗者模子的本质就是对以上三点的维护。(互斥包管数据安全,同步包管服从)

长处:

  • 解耦
  • 支持并发
  • 支持忙闲不均
对于第一点,不就是生产者与消耗者通过容器来解耦提拔服从(如果没有这个容器,则生产者生产完数据,就必须等待消耗者来接受处理数据,不能立即继续生产数据)。
对于第二点,当生产者在生产数据时,消耗者也同时在处理数据
对于第三点,当生产者生产数据的速度凌驾消耗者的处理能力时,容器可以起到缓存的作用,将多余的数据暂时存储,等待消耗者有空闲时再进行处理。如果消耗者处理数据的能力凌驾生产者时,同理。
二、基于壅闭队列的生产者消耗者模子

在多线程编程中壅闭队列(Blocking Queue)是一种常用于实现生产者和消耗者模子的数据结构。


  • 队列为空时,从队列中取数据将被壅闭,直到队列中有数据时被唤醒。
  • 队列为满时,向队列中放入数据将被壅闭,直到队列中有数据取出被唤醒。
代码实现

下面是一个单生成单消耗模子

LockGuard.hpp 文件 将加锁释放锁,交给一个对象处理,当对象创建加锁,对象烧毁释放锁
  1. #pragma once
  2. #include <pthread.h>
  3. class Mutex
  4. {
  5. public:
  6.     Mutex(pthread_mutex_t *mutex):_mutex(mutex)
  7.     {}
  8.     void Lock()
  9.     {
  10.         pthread_mutex_lock(_mutex);
  11.     }
  12.     void UnLock()
  13.     {
  14.         pthread_mutex_unlock(_mutex);
  15.     }
  16.     ~Mutex()
  17.     {}
  18. private:
  19.     pthread_mutex_t *_mutex;
  20. };
  21. class LockGuard
  22. {
  23. public:
  24.     LockGuard(pthread_mutex_t *mutex): _lock(mutex)
  25.     {
  26.         _lock.Lock();
  27.     }
  28.     ~LockGuard()
  29.     {
  30.         _lock.UnLock();
  31.     }
  32. private:
  33.     Mutex _lock;
  34. };
复制代码
Blockqueue.hpp 文件
  1. #pragma once
  2. #include <iostream>
  3. #include <queue>
  4. #include <pthread.h>
  5. #include "LockGuard.hpp"
  6. using namespace std;
  7. const int CAPACITY = 5;
  8. template<class T>
  9. class BlockQueue
  10. {
  11. public:
  12.     BlockQueue(int cap = CAPACITY):_capacity(cap)
  13.     {
  14.         pthread_mutex_init(&_mutex, nullptr);
  15.         pthread_cond_init(&_p, nullptr);
  16.         pthread_cond_init(&_c, nullptr);
  17.     }
  18.     bool isFull()
  19.     {
  20.         return _bq.size() == _capacity;
  21.     }
  22.     void Push(const T &in)
  23.     {
  24.         LockGuard mutex(&_mutex);
  25.         //pthread_mutex_lock(&_mutex);
  26.         while(isFull())
  27.         {
  28.             pthread_cond_wait(&_p, &_mutex);
  29.         }
  30.         _bq.push(in);
  31.         // 唤醒策略为 生产一个,消费一个
  32.         pthread_cond_signal(&_c);
  33.         //pthread_mutex_unlock(&_mutex);
  34.     }
  35.     bool isEmpty()
  36.     {
  37.         return _bq.size() == 0;
  38.     }
  39.     void Pop(T *out)
  40.     {
  41.         LockGuard mutex(&_mutex);
  42.         //pthread_mutex_lock(&_mutex);
  43.         while(isEmpty())
  44.         {
  45.             pthread_cond_wait(&_c, &_mutex);
  46.         }
  47.         *out = _bq.front();
  48.         _bq.pop();
  49.         // 唤醒策略为 消费一个,生产一个
  50.         pthread_cond_signal(&_p);
  51.         //pthread_mutex_unlock(&_mutex);
  52.     }
  53.     ~BlockQueue()
  54.     {
  55.         pthread_mutex_destroy(&_mutex);
  56.         pthread_cond_destroy(&_p);
  57.         pthread_cond_destroy(&_c);
  58.     }
  59. private:
  60.     queue<T> _bq;
  61.     int _capacity;
  62.     pthread_mutex_t _mutex;
  63.     pthread_cond_t _p;
  64.     pthread_cond_t _c;
  65. };
复制代码
Task.hpp 文件
  1. #pragma once
  2. #include <string>
  3. const char *opers = "+-*/%";
  4. enum
  5. {
  6.     ok = 0,
  7.     div_zero,
  8.     mod_zero
  9. };
  10. class Task
  11. {
  12. public:
  13.     Task()
  14.     {}
  15.     Task(int x, int y, char op) : _data_x(x), _data_y(y), _oper(op)
  16.     {
  17.         _code = ok;
  18.     }
  19.     void Run()
  20.     {
  21.         switch (_oper)
  22.         {
  23.         case '+':
  24.             _result = _data_x + _data_y;
  25.             break;
  26.         case '-':
  27.             _result = _data_x - _data_y;
  28.             break;
  29.         case '*':
  30.             _result = _data_x * _data_y;
  31.             break;
  32.         case '/':
  33.             {
  34.                 if(_data_y == 0)
  35.                 {
  36.                     _code = div_zero;
  37.                 }
  38.                 else
  39.                 {
  40.                     _result = _data_x / _data_y;
  41.                 }
  42.             }
  43.             break;
  44.         case '%':
  45.             {
  46.                 if(_data_y == 0)
  47.                 {
  48.                     _code = mod_zero;
  49.                 }
  50.                 else
  51.                 {
  52.                     _result = _data_x % _data_y;
  53.                 }
  54.             }
  55.             break;
  56.         default:
  57.             break;
  58.         }
  59.     }
  60.     void operator()()
  61.     {
  62.         Run();
  63.     }
  64.     std::string PrintTask()
  65.     {
  66.         std::string ret = std::to_string(_data_x);
  67.         ret += _oper;
  68.         ret += std::to_string(_data_y);
  69.         ret += "=?";
  70.         return ret;
  71.     }
  72.     std::string PrintResult()
  73.     {
  74.         std::string ret = std::to_string(_data_x);
  75.         ret += _oper;
  76.         ret += std::to_string(_data_y);
  77.         ret += "=";
  78.         if(_code == ok)
  79.         {
  80.             ret += std::to_string(_result);
  81.         }
  82.         else
  83.         {
  84.             ret += "?";
  85.         }
  86.         ret += "[";
  87.         ret += std::to_string(_code);
  88.         ret += "]";
  89.         return ret;
  90.     }
  91.     ~Task()
  92.     {}
  93. private:
  94.     int _data_x;
  95.     int _data_y;
  96.     char _oper;
  97.     int _result;
  98.     int _code; // 错误码
  99. };
复制代码
Main.cc 文件
  1. #include <iostream>
  2. #include <pthread.h>
  3. #include <unistd.h>
  4. #include <ctime>
  5. #include <string.h>
  6. #include "BlockQueue.hpp"
  7. #include "Task.hpp"
  8. using namespace std;
  9. void *producer(void *args)
  10. {
  11.     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  12.     // 产生任务
  13.     while (true)
  14.     {
  15.         int x = rand() % 10 + 1;
  16.         int y = rand() % 10 + 1;
  17.         char oper = opers[rand() % strlen(opers)];
  18.         Task task(x, y, oper);
  19.         cout << "producer: " << task.PrintTask() << endl;
  20.         bq->Push(task);
  21.         sleep(1);
  22.     }
  23.     return nullptr;
  24. }
  25. void *consumer(void *args)
  26. {
  27.     // usleep(1000);
  28.     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  29.     // 获取任务,处理任务
  30.     while (true)
  31.     {
  32.         if (bq->isFull())
  33.         {
  34.             Task task;
  35.             bq->Pop(&task);
  36.             task();
  37.             cout << "consumer: " << task.PrintResult() << endl;
  38.             //sleep(1);
  39.         }
  40.     }
  41. }
  42. int main()
  43. {
  44.     srand(time(nullptr) ^ getpid());
  45.     BlockQueue<Task> bq;
  46.     pthread_t p;
  47.     pthread_create(&p, nullptr, producer, (void *)&bq);
  48.     pthread_t c;
  49.     pthread_create(&c, nullptr, consumer, (void *)&bq);
  50.     pthread_join(p, nullptr);
  51.     pthread_join(c, nullptr);
  52.     return 0;
  53. }
复制代码


那怎样将这个单生产单消耗该为多生产多消耗呢?由于多生产多消耗本质也是多个线程访问临界资源,那我们单生产和单消耗不也是多个线程访问临界资源吗,所以我们不必要对BlockQueue.hpp文件进行修改,只必要在main函数中,创建多个生产者和消耗者即可。
  1. #include <iostream>
  2. #include <pthread.h>
  3. #include <unistd.h>
  4. #include <ctime>
  5. #include <string.h>
  6. #include "BlockQueue.hpp"
  7. #include "Task.hpp"
  8. using namespace std;
  9. template <class T>
  10. class ThreadData
  11. {
  12. public:
  13.     ThreadData(pthread_t tid, const string threadname, BlockQueue<T> *bq)
  14.         : _tid(tid), _threadname(threadname), _bq(bq)
  15.     {}
  16. public:
  17.     pthread_t _tid;
  18.     string _threadname;
  19.     BlockQueue<T>* _bq;
  20. };
  21. void *producer(void *args)
  22. {
  23.     ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
  24.     // 产生任务
  25.     while (true)
  26.     {
  27.         int x = rand() % 10 + 1;
  28.         int y = rand() % 10 + 1;
  29.         char oper = opers[rand() % strlen(opers)];
  30.         Task task(x, y, oper);
  31.         cout << data->_tid << ", " << data->_threadname <<": " << task.PrintTask() << endl;
  32.         data->_bq->Push(task);
  33.         sleep(1);
  34.     }
  35.     return nullptr;
  36. }
  37. void *consumer(void *args)
  38. {
  39.     // usleep(1000);
  40.     ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
  41.     // 获取任务,处理任务
  42.     while (true)
  43.     {
  44.         if (data->_bq->isFull())
  45.         {
  46.             Task task;
  47.             data->_bq->Pop(&task);
  48.             task();
  49.             cout << data->_tid << ", " << data->_threadname << ": " << task.PrintResult() << endl;
  50.             // sleep(1);
  51.         }
  52.     }
  53. }
  54. int main()
  55. {
  56.     srand(time(nullptr) ^ getpid());
  57.     BlockQueue<Task> bq;
  58.     pthread_t p1;
  59.     ThreadData<Task> data1(p1, "product-1", &bq);
  60.     pthread_create(&p1, nullptr, producer, (void *)&data1);
  61.     pthread_t p2;
  62.     ThreadData<Task> data2(p2, "product-2", &bq);
  63.     pthread_create(&p2, nullptr, producer, (void *)&data2);
  64.     pthread_t c1;
  65.     ThreadData<Task> data3(c1, "consumer-1", &bq);
  66.     pthread_create(&c1, nullptr, consumer, (void *)&data3);
  67.     pthread_t c2;
  68.     ThreadData<Task> data4(c2, "consumer-2", &bq);
  69.     pthread_create(&c2, nullptr, consumer, (void *)&data4);
  70.     pthread_join(p1, nullptr);
  71.     pthread_join(p2, nullptr);
  72.     pthread_join(c1, nullptr);
  73.     pthread_join(c2, nullptr);
  74.     return 0;
  75. }
复制代码



总结

以上就是我对于线程同步的总结。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

李优秀

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表