李优秀 发表于 2024-6-21 23:42:44

linux:生产者消耗者模子

https://img-blog.csdnimg.cn/accfb6cd814b42039663b29963a42579.jpeg#pic_center
   个人主页 : 个人主页
个人专栏 : 《数据结构》 《C语言》《C++》《Linux》


前言

本文是对于生产者消耗者模子的知识总结
一、生产者消耗者模子

生产者消耗者模子就是通过一个容器来解决生产者消耗者的强耦合问题。生产者和消耗者相互之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者生产完数据之后不消等待消耗者处理,直接交给容器,消耗者不找生产者要数据,而是直接从容器中取数据,壅闭队列就相称于一个缓冲区,均衡了生产者和消耗者的处理能力,其中这个容器就是用于生产者和消耗者之间解耦的
(强耦合是指两个或多个系统,组件或模块之间存在紧密依赖关系。)
https://img-blog.csdnimg.cn/direct/23b39eda2e7642cb9fc0d5c213fe97ad.png
特点:

[*]三种关系:生产者与生产者之间(互斥),消耗者与消耗者之间(互斥),生产者与消耗者之间(互斥 && 同步)
[*]两种脚色:生产者和消耗者
[*]一个交易(通讯)场所:一个容器(一段内存空间)
由于我们是多个线程访问同一个容器,那必然会导致数据不一致的问题,所以我们必要对该临界资源加锁,所以生产者与生产者之间,消耗者与消耗者之间,生产者与消耗者之间都是互斥的。
又由于容器大概为空(满),此时消耗者(生产者)还不停在临界区申请锁,又因没有数据(空间)而释放锁,从而不断申请锁释放锁,导致生产者(消耗者)的饥饿问题。此时我们就必要生产者与消耗者之间的同步。
对于2,3两点,这很好明白不解释。
我们编写生产者消耗者模子的本质就是对以上三点的维护。(互斥包管数据安全,同步包管服从)
长处:

[*]解耦
[*]支持并发
[*]支持忙闲不均
对于第一点,不就是生产者与消耗者通过容器来解耦提拔服从(如果没有这个容器,则生产者生产完数据,就必须等待消耗者来接受处理数据,不能立即继续生产数据)。
对于第二点,当生产者在生产数据时,消耗者也同时在处理数据
对于第三点,当生产者生产数据的速度凌驾消耗者的处理能力时,容器可以起到缓存的作用,将多余的数据暂时存储,等待消耗者有空闲时再进行处理。如果消耗者处理数据的能力凌驾生产者时,同理。
二、基于壅闭队列的生产者消耗者模子

在多线程编程中壅闭队列(Blocking Queue)是一种常用于实现生产者和消耗者模子的数据结构。


[*]队列为空时,从队列中取数据将被壅闭,直到队列中有数据时被唤醒。
[*]队列为满时,向队列中放入数据将被壅闭,直到队列中有数据取出被唤醒。
代码实现

下面是一个单生成单消耗模子
https://img-blog.csdnimg.cn/direct/2661bed3ac41449f8c28bcfbf525b104.png
LockGuard.hpp 文件 将加锁释放锁,交给一个对象处理,当对象创建加锁,对象烧毁释放锁
#pragma once
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mutex):_mutex(mutex)
    {}

    void Lock()
    {
      pthread_mutex_lock(_mutex);
    }

    void UnLock()
    {
      pthread_mutex_unlock(_mutex);
    }

    ~Mutex()
    {}
private:
    pthread_mutex_t *_mutex;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex): _lock(mutex)
    {
      _lock.Lock();
    }

    ~LockGuard()
    {
      _lock.UnLock();
    }
private:
    Mutex _lock;
};
Blockqueue.hpp 文件
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"

using namespace std;
const int CAPACITY = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap = CAPACITY):_capacity(cap)
    {
      pthread_mutex_init(&_mutex, nullptr);
      pthread_cond_init(&_p, nullptr);
      pthread_cond_init(&_c, nullptr);
    }

    bool isFull()
    {
      return _bq.size() == _capacity;
    }

    void Push(const T &in)
    {
      LockGuard mutex(&_mutex);
      //pthread_mutex_lock(&_mutex);
      while(isFull())
      {
            pthread_cond_wait(&_p, &_mutex);
      }

      _bq.push(in);
      // 唤醒策略为 生产一个,消费一个
      pthread_cond_signal(&_c);

      //pthread_mutex_unlock(&_mutex);
    }

    bool isEmpty()
    {
      return _bq.size() == 0;
    }

    void Pop(T *out)
    {
      LockGuard mutex(&_mutex);
      //pthread_mutex_lock(&_mutex);
      while(isEmpty())
      {
            pthread_cond_wait(&_c, &_mutex);
      }

      *out = _bq.front();
      _bq.pop();
      // 唤醒策略为 消费一个,生产一个
      pthread_cond_signal(&_p);

      //pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
      pthread_mutex_destroy(&_mutex);
      pthread_cond_destroy(&_p);
      pthread_cond_destroy(&_c);
    }
private:
    queue<T> _bq;
    int _capacity;

    pthread_mutex_t _mutex;
    pthread_cond_t _p;
    pthread_cond_t _c;
};
Task.hpp 文件
#pragma once
#include <string>

const char *opers = "+-*/%";

enum
{
    ok = 0,
    div_zero,
    mod_zero
};

class Task
{
public:
    Task()
    {}

    Task(int x, int y, char op) : _data_x(x), _data_y(y), _oper(op)
    {
      _code = ok;
    }

    void Run()
    {
      switch (_oper)
      {
      case '+':
            _result = _data_x + _data_y;
            break;
      case '-':
            _result = _data_x - _data_y;
            break;
      case '*':
            _result = _data_x * _data_y;
            break;
      case '/':
            {
                if(_data_y == 0)
                {
                  _code = div_zero;
                }
                else
                {
                  _result = _data_x / _data_y;
                }
            }
            break;
      case '%':
            {
                if(_data_y == 0)
                {
                  _code = mod_zero;
                }
                else
                {
                  _result = _data_x % _data_y;
                }
            }
            break;
      default:
            break;
      }
    }

    void operator()()
    {
      Run();
    }

    std::string PrintTask()
    {
      std::string ret = std::to_string(_data_x);
      ret += _oper;
      ret += std::to_string(_data_y);

      ret += "=?";
      return ret;
    }

    std::string PrintResult()
    {
      std::string ret = std::to_string(_data_x);
      ret += _oper;
      ret += std::to_string(_data_y);

      ret += "=";
      if(_code == ok)
      {
            ret += std::to_string(_result);
      }
      else
      {
            ret += "?";
      }

      ret += "[";
      ret += std::to_string(_code);
      ret += "]";

      return ret;
    }

    ~Task()
    {}

private:
    int _data_x;
    int _data_y;
    char _oper;

    int _result;
    int _code; // 错误码
};
Main.cc 文件
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;


void *producer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 产生任务
    while (true)
    {
      int x = rand() % 10 + 1;
      int y = rand() % 10 + 1;
      char oper = opers;

      Task task(x, y, oper);
      cout << "producer: " << task.PrintTask() << endl;
      bq->Push(task);
      sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
      if (bq->isFull())
      {
            Task task;
            bq->Pop(&task);

            task();
            cout << "consumer: " << task.PrintResult() << endl;
            //sleep(1);
      }
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p;
    pthread_create(&p, nullptr, producer, (void *)&bq);

    pthread_t c;
    pthread_create(&c, nullptr, consumer, (void *)&bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}
https://img-blog.csdnimg.cn/direct/daa242206d9f408da1d0c1d0ef624e19.png
https://img-blog.csdnimg.cn/direct/8c7a60497c6949a19e656d2c30542cf2.png
那怎样将这个单生产单消耗该为多生产多消耗呢?由于多生产多消耗本质也是多个线程访问临界资源,那我们单生产和单消耗不也是多个线程访问临界资源吗,所以我们不必要对BlockQueue.hpp文件进行修改,只必要在main函数中,创建多个生产者和消耗者即可。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;

template <class T>
class ThreadData
{
public:
    ThreadData(pthread_t tid, const string threadname, BlockQueue<T> *bq)
      : _tid(tid), _threadname(threadname), _bq(bq)
    {}

public:
    pthread_t _tid;
    string _threadname;
    BlockQueue<T>* _bq;
};

void *producer(void *args)
{
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 产生任务
    while (true)
    {
      int x = rand() % 10 + 1;
      int y = rand() % 10 + 1;
      char oper = opers;

      Task task(x, y, oper);
      cout << data->_tid << ", " << data->_threadname <<": " << task.PrintTask() << endl;
      data->_bq->Push(task);
      sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
      if (data->_bq->isFull())
      {
            Task task;
            data->_bq->Pop(&task);

            task();
            cout << data->_tid << ", " << data->_threadname << ": " << task.PrintResult() << endl;
            // sleep(1);
      }
    }
}



int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p1;
    ThreadData<Task> data1(p1, "product-1", &bq);
    pthread_create(&p1, nullptr, producer, (void *)&data1);

    pthread_t p2;
    ThreadData<Task> data2(p2, "product-2", &bq);
    pthread_create(&p2, nullptr, producer, (void *)&data2);

    pthread_t c1;
    ThreadData<Task> data3(c1, "consumer-1", &bq);
    pthread_create(&c1, nullptr, consumer, (void *)&data3);
    pthread_t c2;
    ThreadData<Task> data4(c2, "consumer-2", &bq);
    pthread_create(&c2, nullptr, consumer, (void *)&data4);

    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);

    return 0;
}
https://img-blog.csdnimg.cn/direct/952e4ce9d73d4a58bc4c1ae23c769556.png
https://img-blog.csdnimg.cn/direct/eff7ac95e22b45808d1ce7489fae9d23.png
总结

以上就是我对于线程同步的总结。
https://img-blog.csdnimg.cn/2ccbd88afaab4ff398bbe625572360d2.gif#pic_center

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: linux:生产者消耗者模子