(第四章 2)生产者-消费者模型
输出大致像这样:
[hadoop@sam1 test]$ ./producer_consumer [P0] Producing 0 ...[P1] Producing 0 ...[P1] Producing 1 ...[P0] Producing 1 ...------> [C1] Comsuming 0 ...------> [C1] Comsuming 0 ...[P2] Producing 0 ...[P2] Producing 1 ...------> [C2] Comsuming 1 ...------> [C0] Comsuming 1 ...------> [C0] Comsuming 0 ...------> [C1] Comsuming 1 ...[P1] Producing 2 ...[P1] Producing 3 ...------> [C0] Comsuming 2 ...------> [C0] Comsuming 3 ...[P0] Producing 2 ...[P2] Producing 2 ...------> [C2] Comsuming 2 ...[P2] Producing 3 ...[P0] Producing 3 ...------> [C1] Comsuming 2 ...------> [C2] Comsuming 3 ...------> [C2] Comsuming 3 ...
?
代码:
#include <pthread.h>#include <stdio.h>#include <stdlib.h>#include </usr/include/semaphore.h>#define BUFF_SIZE 5/* total number of slots */#define NP 3/* total number of producers */#define NC 3/* total number of consumers */#define NITERS 4/* number of items produced/consumed */typedef struct { int buf[BUFF_SIZE]; /* shared var */ int in; /* buf[in%BUFF_SIZE] is the first empty slot */ int out; /* buf[out%BUFF_SIZE] is the first full slot */ sem_t full; /* keep track of the number of full spots */ sem_t empty; /* keep track of the number of empty spots */ sem_t mutex; /* enforce mutual exclusion to shared data */} sbuf_t;sbuf_t shared;void *Producer(void *arg){ int i, item, index; //index是线程编号 index = (int)arg; for (i=0; i < NITERS; i++) { /* Produce item */ item = i; /* Prepare to write item to buf ??*/ /* If there are no empty slots, wait */ sem_wait(&shared.empty); /* If another thread uses the buffer, wait */ sem_wait(&shared.mutex); shared.buf[shared.in] = item; shared.in = (shared.in+1)%BUFF_SIZE; printf("[P%d] Producing %d ...\n", index, item);fflush(stdout); /* Release the buffer */ sem_post(&shared.mutex); /* Increment the number of full slots */ sem_post(&shared.full); /* Interleave producer and consumer execution */ if (i % 2 == 1) sleep(1); } return NULL;}void *Consumer(void *arg){ /* Fill in the code here */int i,item,index;index=(int)arg;for(i=0;i<NITERS;i++){item=i;sem_wait(&shared.full);sem_wait(&shared.mutex);printf("------> [C%d] Comsuming %d ...\n",index,shared.buf[shared.out]);//fflush(stdout);shared.out=(shared.out+1)%BUFF_SIZE;sem_post(&shared.mutex);sem_post(&shared.empty);}return NULL;}int main(){ pthread_t idP, idC; int index; //注意: //1. 信号量一定要初始化,否则程序卡住,而且不报错 //2. value=1的信号量等同于互斥锁?? sem_init(&shared.mutex,0,1); sem_init(&shared.full, 0, 0); sem_init(&shared.empty, 0, BUFF_SIZE); /* Insert code here to initialize mutex*/ for (index = 0; index < NP; index++) { /* Create a new producer */ pthread_create(&idP, NULL, Producer, (void*)index); } /* Insert code here to create NC consumers */for(index=0;index<NC;index++){pthread_create(&idC,NULL,Consumer,(void*)index);} pthread_exit(NULL);//仅仅结束主线程,进程不结束(对其他子线程不影响)}?