使用POSIX线程解决“生产者/消费者”问题


使用POSIX线程解决“生产者/消费者”问题:

  1. /* 
  2. *   File    : pc.cpp 
  3. * 
  4. *   Title   : Demo Producer/Consumer. 
  5. * 
  6. *   Short   : A solution to the producer consumer problem using pthreads. 
  7. *           This is a simple FIFO pipe between two tasks. The primary problem is 
  8. *           ensuring that the producer blocks if the FIFO is full, and the consumer 
  9. *           blocks if it is empty, and avoiding data-races along the way. A secondary 
  10. *           concern is that there is as little interference between the two tasks as 
  11. *           possible.  
  12. * 
  13. *   Author  : Andrae Muys 
  14. * 
  15. *   Date    : 18 September 1997 
  16. */  
  17.   
  18. #include <stdlib.h>   
  19. #include <pthread.h>   
  20. #include <assert.h>   
  21.   
  22. #ifdef _WIN32   
  23. #   include <windows.h>   
  24. #   define SLEEP(ms) Sleep(ms)   
  25. #elif defined(LINUX)   
  26. #   include <unistd.h>   
  27. #   define SLEEP(ms) sleep(ms)   
  28. #endif   
  29.   
  30. #define QUEUE_SIZE  10   
  31. #define LOOP        20   
  32.   
  33. void* producer (void *args);  
  34. void* consumer (void *args);  
  35.   
  36. typedef struct  
  37. {  
  38.     int buf[QUEUE_SIZE];  
  39.     long head, tail;  
  40.     bool full, empty;  
  41.     pthread_mutex_t *mutex;  
  42.     pthread_cond_t *notFull, *notEmpty;  
  43. } Queue;  
  44.   
  45. Queue*  queueInit (void);  
  46. void    queueDelete (Queue *q);  
  47. void    queueAdd (Queue *q, int in);  
  48. void    queueDel (Queue *q, int *out);  
  49.   
  50. int main(int argc, char* argv[])  
  51. {  
  52.     Queue* fifo = queueInit ();  
  53.     assert(fifo !=  NULL);  
  54.   
  55.     pthread_t pro, con;  
  56.     pthread_create (&pro, NULL, &producer, fifo);  
  57.     pthread_create (&con, NULL, &consumer, fifo);  
  58.     pthread_join (pro, NULL);  
  59.     pthread_join (con, NULL);  
  60.     queueDelete (fifo);  
  61.     return 0;  
  62. }  
  63.   
  64. void* producer (void *q)  
  65. {  
  66.     Queue* fifo = (Queue *)q;  
  67.   
  68.     for (int i = 0; i < LOOP; i++)  
  69.     {  
  70.         // 临界区操作:若队列未满,添加新数据   
  71.         pthread_mutex_lock (fifo->mutex);  
  72.         while (fifo->full)  
  73.         {  
  74.             printf ("producer: Queue FULL.\n");  
  75.             pthread_cond_wait (fifo->notFull, fifo->mutex);  
  76.         }  
  77.         queueAdd (fifo, i);  
  78.         pthread_mutex_unlock (fifo->mutex);  
  79.   
  80.         // 数据添加结束,发“队列有数据”信号   
  81.         pthread_cond_signal (fifo->notEmpty);  
  82.         SLEEP (100);  
  83.     }  
  84.   
  85.     // 与上面类似   
  86.     for (int i = 0; i < LOOP; i++)  
  87.     {  
  88.         pthread_mutex_lock (fifo->mutex);  
  89.         while (fifo->full)  
  90.         {  
  91.             printf ("producer: Queue FULL.\n");  
  92.             pthread_cond_wait (fifo->notFull, fifo->mutex);  
  93.         }  
  94.         queueAdd (fifo, i);  
  95.         pthread_mutex_unlock (fifo->mutex);  
  96.         pthread_cond_signal (fifo->notEmpty);  
  97.         SLEEP (200);  
  98.     }  
  99.     return (NULL);  
  100. }  
  101.   
  102. void* consumer (void *q)  
  103. {  
  104.     int d;  
  105.   
  106.     Queue *fifo = (Queue *)q;  
  107.   
  108.     for (int i = 0; i < LOOP; i++)  
  109.     {  
  110.         // 临界区操作:若队列不空,则取出数据   
  111.         pthread_mutex_lock (fifo->mutex);  
  112.         while (fifo->empty)  
  113.         {  
  114.             printf ("consumer: Queue EMPTY.\n");  
  115.             pthread_cond_wait (fifo->notEmpty, fifo->mutex);  
  116.         }  
  117.         queueDel (fifo, &d);  
  118.         pthread_mutex_unlock (fifo->mutex);  
  119.   
  120.         // 取完数据,发“队列不满”信号   
  121.         pthread_cond_signal (fifo->notFull);  
  122.         printf ("consumer: recieved %d.\n", d);  
  123.         SLEEP(200);  
  124.     }  
  125.   
  126.     // 与上面类似   
  127.     for (int i = 0; i < LOOP; i++)  
  128.     {  
  129.         pthread_mutex_lock (fifo->mutex);  
  130.         while (fifo->empty)  
  131.         {  
  132.             printf ("consumer: Queue EMPTY.\n");  
  133.             pthread_cond_wait (fifo->notEmpty, fifo->mutex);  
  134.         }  
  135.         queueDel (fifo, &d);  
  136.         pthread_mutex_unlock (fifo->mutex);  
  137.         pthread_cond_signal (fifo->notFull);  
  138.         printf ("consumer: recieved %d.\n", d);  
  139.         SLEEP (50);  
  140.     }  
  141.     return (NULL);  
  142. }  
  143.   
  144. Queue *queueInit (void)  
  145. {  
  146.     Queue *q = (Queue *)malloc (sizeof (Queue));  
  147.     if (q == NULL) return (NULL);  
  148.   
  149.     q->empty = true;  
  150.     q->full = false;  
  151.     q->head = 0;  
  152.     q->tail = 0;  
  153.   
  154.     q->mutex = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));  
  155.     pthread_mutex_init (q->mutex, NULL);  
  156.     q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));  
  157.     pthread_cond_init (q->notFull, NULL);  
  158.     q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));  
  159.     pthread_cond_init (q->notEmpty, NULL);  
  160.       
  161.     return (q);  
  162. }  
  163.   
  164. void queueDelete (Queue *q)  
  165. {  
  166.     pthread_mutex_destroy (q->mutex);  
  167.     free (q->mutex);   
  168.   
  169.     pthread_cond_destroy (q->notFull);  
  170.     free (q->notFull);  
  171.   
  172.     pthread_cond_destroy (q->notEmpty);  
  173.     free (q->notEmpty);  
  174.   
  175.     free (q);  
  176. }  
  177.   
  178. void queueAdd (Queue *q, int in)  
  179. {  
  180.     q->buf[q->tail] = in;  
  181.     q->tail++;  
  182.     if (q->tail == QUEUE_SIZE)   // 循环队列   
  183.         q->tail = 0;  
  184.   
  185.     if (q->tail == q->head)       // 添加数据时“触顶”   
  186.         q->full = true;  
  187.     q->empty = false;  
  188.   
  189.     return;  
  190. }  
  191.   
  192. void queueDel (Queue *q, int *out)  
  193. {  
  194.     *out = q->buf[q->head];  
  195.     q->head++;  
  196.     if (q->head == QUEUE_SIZE)   // 循环队列   
  197.         q->head = 0;  
  198.   
  199.     if (q->head == q->tail)       // 取出数据时“触底”   
  200.         q->empty = true;  
  201.     q->full = false;  
  202.   
  203.     return;  
  204. }  

相关内容