Linux多线程编程-生产者与消费者模型详解与实现(C语言) ...

打印 上一主题 下一主题

主题 683|帖子 683|积分 2049

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1.什么是生成者与消费者模型

生产者-消费者模型是并发编程中的经典问题,描述了多个线程(或历程)怎样安全、有效地共享有限的缓冲区资源。在这个模型中,有两种角色:

  • 生产者(Producer):负责生成数据或者将数据放置到共享缓冲区中。
  • 消费者(Consumer):负责从共享缓冲区中取出数据并进行处理或消费。

核心概念:

生产者和消费者模型主要涉及以下几个关键概念:


  • 共享缓冲区(Shared Buffer):生产者和消费者之间共享的有限大小的缓冲区。这个缓冲区可以是一个队列或者一个固定大小的数组。
  • 同步(Synchronization):确保生产者和消费者之间的正确协作,制止数据竞争和资源争用。例如,当缓冲区已满时,生产者应该等候;当缓冲区为空时,消费者应该等候。
  • 互斥(Mutual Exclusion):确保同一时候只有一个线程(生产者或消费者)可以访问或操纵共享缓冲区,以制止数据不一致或丢失。
详细案例:

假设有一个生产者和多个消费者的情况,以一个有界缓冲区(Bounded Buffer)为例:

  • 共享缓冲区:假设有一个大小为10的数组作为共享缓冲区,用于存放生产者生产的产物。
  • 生产者:负责生成产物,并将产物放入共享缓冲区中。如果缓冲区已满,生产者需要等候,直到有空间可以放置产物。
  • 消费者:负责从共享缓冲区中取出产物并进行消费。如果缓冲区为空,消费者需要等候,直到有产物可以消费。
解决方案:

为相识决生产者-消费者模型中的同步和互斥问题,可以采用以下方法:


  • 互斥锁(Mutex):确保在同一时候只有一个线程可以访问或修改共享缓冲区。例如,生产者和消费者在访问缓冲区前,首先要获取互斥锁,操纵完成后开释锁。
  • 条件变量(Condition Variables):用于线程间的通讯,如关照生产者缓冲区有空间可以放置新产物,或关照消费者缓冲区中有产物可以消费。
  • 信号量(Semaphores):用于控制对共享资源的访问,如控制缓冲区的空闲空间数量或产物数量。
示例代码:

使用C语言来实现,包括两个生产者线程、三个消费者线程,一个大小为10的共享缓冲区(使用链表实现),每个线程生成或消费一个数据后休眠1-2秒,并打印过程。我们将使用条件变量来实现线程的等候和唤醒机制。
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <pthread.h>
  4. #include <unistd.h>
  5. #define BUFFER_SIZE 10
  6. // 链表节点
  7. typedef struct Node {
  8.     int data;
  9.     struct Node *next;
  10. } Node;
  11. // 全局变量
  12. Node *head = NULL;                  // 指向链表头部的指针
  13. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  // 互斥锁
  14. pthread_cond_t empty = PTHREAD_COND_INITIALIZER;    // 缓冲区为空的条件变量
  15. pthread_cond_t full = PTHREAD_COND_INITIALIZER;     // 缓冲区已满的条件变量
  16. int count = 0;                      // 缓冲区当前数据项的数量
  17. // 生产者线程函数
  18. void *producer_func(void *arg) {
  19.     int id = *((int *)arg);  // 生产者线程的编号
  20.     int data = 0;
  21.    
  22.     while (1) {
  23.         // 生成数据项
  24.         data++;
  25.         
  26.         // 获取互斥锁
  27.         pthread_mutex_lock(&mutex);
  28.         
  29.         // 等待直到缓冲区有空间
  30.         while (count == BUFFER_SIZE) {
  31.             printf("Producer %d: Buffer is full. Waiting...\n", id);
  32.             //条件变量 (empty):在缓冲区已满时,生产者线程会等待在 empty 条件变量上,直到有消费者取走数据并通知它。
  33.             pthread_cond_wait(&empty, &mutex);
  34.         }
  35.         
  36.         // 将数据项放入缓冲区,头插法,取节点的时候不用遍历链表
  37.         Node *new_node = (Node *)malloc(sizeof(Node));
  38.         new_node->data = data;
  39.         new_node->next = head;
  40.         head = new_node;
  41.         count++;
  42.         
  43.         printf("Producer %d: Produced data: %d\n", id, data);
  44.         
  45.         // 在生产者放入数据后,会通过 full 条件变量唤醒等待的消费者线程,告知它们可以消费数据了。
  46.         pthread_cond_signal(&full);
  47.         
  48.         // 释放互斥锁
  49.         pthread_mutex_unlock(&mutex);
  50.         
  51.         // 休眠1-2秒
  52.         sleep(rand() % 2);
  53.     }
  54.    
  55.     pthread_exit(NULL);
  56. }
  57. // 消费者线程函数
  58. void *consumer_func(void *arg) {
  59.     int id = *((int *)arg);  // 消费者线程的编号
  60.    
  61.     while (1) {
  62.         // 获取互斥锁
  63.         pthread_mutex_lock(&mutex);
  64.         
  65.         // 等待直到缓冲区有数据
  66.         while (head == NULL) {
  67.             printf("Consumer %d: Buffer is empty. Waiting...\n", id);
  68.             //条件变量 (full):在缓冲区为空时,消费者线程会等待在 full 条件变量上,直到有生产者放入数据并通知它。
  69.             pthread_cond_wait(&full, &mutex);
  70.         }
  71.         
  72.         // 从缓冲区取出数据项
  73.         Node *temp = head;
  74.         head = head->next;
  75.         int data = temp->data;
  76.         free(temp);
  77.         count--;
  78.         
  79.         printf("Consumer %d: Consumed data: %d\n", id, data);
  80.         
  81.         // 在消费者取走数据后,会通过 empty 条件变量唤醒等待的生产者线程,告知它们可以继续生产数据。
  82.         pthread_cond_signal(&empty);
  83.         
  84.         // 释放互斥锁
  85.         pthread_mutex_unlock(&mutex);
  86.         
  87.         // 休眠1-3秒
  88.         sleep(rand() % 3 + 1);
  89.     }
  90.    
  91.     pthread_exit(NULL);
  92. }
  93. int main() {
  94.     pthread_t producers[2], consumers[3];
  95.     int producer_ids[2] = {1, 2};  // 两个生产者线程的编号
  96.     int consumer_ids[3] = {1, 2, 3};  // 三个消费者线程的编号
  97.    
  98.     // 初始化互斥锁和条件变量
  99.     pthread_mutex_init(&mutex, NULL);
  100.     pthread_cond_init(&empty, NULL);
  101.     pthread_cond_init(&full, NULL);
  102.    
  103.     // 创建生产者线程
  104.     for (int i = 0; i < 2; ++i) {
  105.         pthread_create(&producers[i], NULL, producer_func, (void *)&producer_ids[i]);
  106.     }
  107.    
  108.     // 创建消费者线程
  109.     for (int i = 0; i < 3; ++i) {
  110.         pthread_create(&consumers[i], NULL, consumer_func, (void *)&consumer_ids[i]);
  111.     }
  112.    
  113.     // 主线程等待所有子线程结束
  114.     for (int i = 0; i < 2; ++i) {
  115.         pthread_join(producers[i], NULL);
  116.     }
  117.     for (int i = 0; i < 3; ++i) {
  118.         pthread_join(consumers[i], NULL);
  119.     }
  120.    
  121.     // 销毁互斥锁和条件变量
  122.     pthread_mutex_destroy(&mutex);
  123.     pthread_cond_destroy(&empty);
  124.     pthread_cond_destroy(&full);
  125.    
  126.     return 0;
  127. }
复制代码







免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

星球的眼睛

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表