读书人

生产者消费者有关问题中缓冲区的读写

发布时间: 2013-01-09 09:38:16 作者: rapoo

生产者消费者问题中,缓冲区的读写问题
linux下gcc;
5个生产者线程,5个消费者线程,生产者从1开始写,一直写到500。
用一个结构体数据模拟读写缓冲区,每写一次对其遍历排序一次,将未被读取的数据置前。

问题:
(消费线程中)
当一个消费者线程持续获取缓冲区锁 并占有时,出现读取数据失败的情况。
(ps:消费线程切换则不会出现这种情况;
可能描述比较模糊,运行一下就能看到现象了,生产者消费者有关问题中,缓冲区的读写有关问题!)

#include "common.h"

//common中含头文件和 模拟缓冲区的结构体s_client
/*
#include <sys/un.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <semaphore.h>

typedef struct s_client
{
int clientid;//数值
int status;//标示是否有值,有值=1,无值=0
};
*/

//main.c
#define MAX_CONSUMERS 5 //消费线程数
#define MAX_PRODUCERS 5 //生产线程数
#define MAX_BUFFERS 20 //缓冲区可存数据
#define MAX_NUMBER 500 //生产总数

FILE *fp, *fp_out, *fp_in;//可无视,in.txt 与 123.txt 结合stdout 可看出问题点buff内数据>2

pthread_mutex_t full_mutex,empty_mutex,file_mutex;
//分别为消费、生产、缓冲区互斥锁

volatile int buff_counts = 0, exit_flag = 0, i_in = 0, exit_flag_consumer = 0;
//exit_flag、exit_flag_consumer分别为生产、消费线程退出条件,i_in 取值 1至500

volatile struct s_client buff[MAX_BUFFERS]; //读写缓冲区

//生产者写缓冲区
int write_one(volatile struct s_client *buffer, int num)
{
int i,j,sum=0;

//遍历,将未被读取的数据置前
for(i=0;i<MAX_BUFFERS;i++)
{
if(buffer[i].status) continue;

for(j=i+1;j<MAX_BUFFERS;j++)
{
if(buffer[j].status )
{
buffer[i].status = 1;
buffer[i].clientid = buffer[j].clientid;
buffer[j].status = 0;
buffer[j].clientid = 0;
}
}
}
for(i=0;i<MAX_BUFFERS;i++)
{
if(buffer[i].status == 0 )
{
buffer[i].clientid = num;
buffer[i].status = 1;
fprintf(fp_in, "buffer[%d].clientid = %d\n",i,num);
break;
}
}
return 0;
}

//消费者读缓冲区
int get_one(volatile struct s_client *buffer)
{
int i,num = -1;
for(i = 0; i < MAX_BUFFERS; i++)


{
if(buffer[i].status)
{
num = buffer[i].clientid ;
buffer[i].status = 0;
//fprintf(fp_in, "buffer[%d].clientid = %d\n",i,num);
break;
}
//return -1;
}
return num;
}

//生产者线程
int producer()
{

printf("%4d produce is begain\n", pthread_self()%10000);
fprintf(fp_out, "%4d produce is begain\n", pthread_self()%10000);
while(1)
{
pthread_mutex_lock(&empty_mutex);
while(buff_counts<MAX_BUFFERS)
{
i_in++;
if(i_in>MAX_NUMBER || exit_flag)
{
exit_flag = 1;
break;
}

pthread_mutex_lock(&file_mutex);
fprintf(fp,"%d\n", i_in);
//fflush(fp);
write_one(buff, i_in);
buff_counts++;
pthread_mutex_unlock(&file_mutex);

//printf("pid=%04d producet %d \n", pthread_self()%10000, i_in);
fprintf(fp_out, "pid=%04d producet %d \n", pthread_self()%10000, i_in);
usleep(1000);
}
pthread_mutex_unlock(&empty_mutex);
//usleep(10000);
if(i_in>MAX_NUMBER || exit_flag) break;
}
printf("producer %04d exited!\n", pthread_self()%10000);
fprintf(fp_out, "producer %04d exited!\n", pthread_self()%10000);
pthread_exit("produce over!");
}

//消费者线程
int consumer()
{
int i;
while(1)
{
pthread_mutex_lock(&full_mutex);

while(buff_counts>0)
{


if(exit_flag_consumer) break;

pthread_mutex_lock(&file_mutex);
i = get_one(buff);
if(i<0)
{
printf("get_one failed!\n");
pthread_mutex_unlock(&file_mutex);
continue;
}
fprintf(fp, "buff_counts = %d //pid=%04d consume %d\n", buff_counts, pthread_self()%10000,i);
if(i==MAX_NUMBER)
{
exit_flag_consumer = 1;
}
printf("pid=%04d consume %d\n", pthread_self()%10000,i);
buff_counts--;
pthread_mutex_unlock(&file_mutex);

if(exit_flag_consumer)
{
exit_flag = 1;
break;
}
//usleep(1000);
//this sentence will make the get_one(buff) failed in the old thread
//问题出现在这里,如果这里用usleep,这一个消费者线程继续读取的缓冲区buff会失败(当缓冲区多余2个以上数据时)
//若不使用usleep,其他消费者线程获取缓冲区锁,不会出现读取失败的问题。
}
pthread_mutex_unlock(&full_mutex);
if(i==MAX_NUMBER || exit_flag) break;
usleep(1000);
}
printf("consumer %04d exited!\n", pthread_self()%10000);
pthread_exit("consume over!");
}

int main(int argc, int argv[])
{
pthread_t consumer_queue[MAX_CONSUMERS], producer_queue[MAX_PRODUCERS];
int i,result;
char temp[10];
void *message;

memset((void *)buff,0,sizeof(buff));
fp = fopen("./123.txt", "wr+");
fp_in = fopen("./in.txt", "w+");
fp_out = fopen("./out.txt", "w+r");
if(!fp)


{
perror("fopen failed!");
exit(1);
}
result = pthread_mutex_init(&full_mutex, NULL);
if(result)
{
perror("pthread_mutex_init failed!");
exit(1);
}

result = pthread_mutex_init(&empty_mutex, NULL);
if(result)
{
perror("pthread_mutex_init failed!");
exit(1);
}

result = pthread_mutex_init(&file_mutex, NULL);
if(result)
{
perror("pthread_mutex_init failed!");
exit(1);
}

//消费线程
for(i = 0; i<MAX_CONSUMERS; i++)
{
result = pthread_create(&consumer_queue[i], NULL, (void *)consumer, NULL);
if(result)
{
perror("pthread_create failed!");
exit(1);
}
}
//生产线程
for(i = 0; i<MAX_PRODUCERS; i++)
{
result = pthread_create(&producer_queue[i], NULL, (void *)producer, NULL);
if(result)
{
perror("pthread_create failed!");
exit(1);
}
}
for(i = MAX_PRODUCERS-1; i>=0; i--)
{
pthread_join(producer_queue[i], &message);
printf("return message is: %s\n", (char*)message);
}
sleep(1);
fclose(fp);
exit(0);

}

[解决办法]
仅供参考

//循环向a函数每次发送200个字节长度(这个是固定的)的buffer,
//a函数中需要将循环传进来的buffer,组成240字节(也是固定的)的新buffer进行处理,
//在处理的时候每次从新buffer中取两个字节打印
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <process.h>
#include <io.h>
//Log{
#define MAXLOGSIZE 10000000
#define ARRSIZE(x) (sizeof(x)/sizeof(x[0]))
#include <time.h>
#include <sys/timeb.h>
#include <stdarg.h>
char logfilename1[]="MyLog1.log";


char logfilename2[]="MyLog2.log";
char logstr[16000];
char datestr[16];
char timestr[16];
char mss[4];
CRITICAL_SECTION cs_log;
FILE *flog;
void Lock(CRITICAL_SECTION *l) {
EnterCriticalSection(l);
}
void Unlock(CRITICAL_SECTION *l) {
LeaveCriticalSection(l);
}
void LogV(const char *pszFmt,va_list argp) {
struct tm *now;
struct timeb tb;

if (NULL==pszFmt
[解决办法]
0==pszFmt[0]) return;
if (-1==_vsnprintf(logstr,ARRSIZE(logstr),pszFmt,argp)) logstr[ARRSIZE(logstr)-1]=0;
ftime(&tb);
now=localtime(&tb.time);
sprintf(datestr,"%04d-%02d-%02d",now->tm_year+1900,now->tm_mon+1,now->tm_mday);
sprintf(timestr,"%02d:%02d:%02d",now->tm_hour ,now->tm_min ,now->tm_sec );
sprintf(mss,"%03d",tb.millitm);
printf("%s %s.%s %s",datestr,timestr,mss,logstr);
flog=fopen(logfilename1,"a");
if (NULL!=flog) {
fprintf(flog,"%s %s.%s %s",datestr,timestr,mss,logstr);
if (ftell(flog)>MAXLOGSIZE) {
fclose(flog);
if (rename(logfilename1,logfilename2)) {
remove(logfilename2);
rename(logfilename1,logfilename2);
}
flog=fopen(logfilename1,"a");
if (NULL==flog) return;
}
fclose(flog);
}
}
void Log(const char *pszFmt,...) {
va_list argp;

Lock(&cs_log);
va_start(argp,pszFmt);
LogV(pszFmt,argp);
va_end(argp);
Unlock(&cs_log);
}
//Log}
#define ASIZE 200
#define BSIZE 240
#define CSIZE 2
char Abuf[ASIZE];
char Cbuf[CSIZE];
CRITICAL_SECTION cs_HEX ;
CRITICAL_SECTION cs_BBB ;
struct FIFO_BUFFER {
int head;
int tail;
int size;
char data[BSIZE];
} BBB;
int No_Loop=0;
void HexDump(int cn,char *buf,int len) {
int i,j,k;
char binstr[80];

Lock(&cs_HEX);
for (i=0;i<len;i++) {
if (0==(i%16)) {


sprintf(binstr,"%03d %04x -",cn,i);
sprintf(binstr,"%s %02x",binstr,(unsigned char)buf[i]);
} else if (15==(i%16)) {
sprintf(binstr,"%s %02x",binstr,(unsigned char)buf[i]);
sprintf(binstr,"%s ",binstr);
for (j=i-15;j<=i;j++) {
sprintf(binstr,"%s%c",binstr,('!'<buf[j]&&buf[j]<='~')?buf[j]:'.');
}
Log("%s\n",binstr);
} else {
sprintf(binstr,"%s %02x",binstr,(unsigned char)buf[i]);
}
}
if (0!=(i%16)) {
k=16-(i%16);
for (j=0;j<k;j++) {
sprintf(binstr,"%s ",binstr);
}
sprintf(binstr,"%s ",binstr);
k=16-k;
for (j=i-k;j<i;j++) {
sprintf(binstr,"%s%c",binstr,('!'<buf[j]&&buf[j]<='~')?buf[j]:'.');
}
Log("%s\n",binstr);
}
Unlock(&cs_HEX);
}
int GetFromRBuf(int cn,CRITICAL_SECTION *cs,FIFO_BUFFER *fbuf,char *buf,int len) {
int lent,len1,len2;

lent=0;
Lock(cs);
if (fbuf->size>=len) {
lent=len;
if (fbuf->head+lent>BSIZE) {
len1=BSIZE-fbuf->head;
memcpy(buf ,fbuf->data+fbuf->head,len1);
len2=lent-len1;
memcpy(buf+len1,fbuf->data ,len2);
fbuf->head=len2;
} else {
memcpy(buf ,fbuf->data+fbuf->head,lent);
fbuf->head+=lent;
}
fbuf->size-=lent;
}
Unlock(cs);
return lent;


}
void thdB(void *pcn) {
char *recv_buf;
int recv_nbytes;
int cn;
int wc;
int pb;

cn=(int)pcn;
Log("%03d thdB thread begin...\n",cn);
while (1) {
Sleep(10);
recv_buf=(char *)Cbuf;
recv_nbytes=CSIZE;
wc=0;
while (1) {
pb=GetFromRBuf(cn,&cs_BBB,&BBB,recv_buf,recv_nbytes);
if (pb) {
Log("%03d recv %d bytes\n",cn,pb);
HexDump(cn,recv_buf,pb);
Sleep(1);
} else {
Sleep(1000);
}
if (No_Loop) break;//
wc++;
if (wc>3600) Log("%03d %d==wc>3600!\n",cn,wc);
}
if (No_Loop) break;//
}
}
int PutToRBuf(int cn,CRITICAL_SECTION *cs,FIFO_BUFFER *fbuf,char *buf,int len) {
int lent,len1,len2;

Lock(cs);
lent=len;
if (fbuf->size+lent>BSIZE) {
lent=BSIZE-fbuf->size;
}
if (fbuf->tail+lent>BSIZE) {
len1=BSIZE-fbuf->tail;
memcpy(fbuf->data+fbuf->tail,buf ,len1);
len2=lent-len1;
memcpy(fbuf->data ,buf+len1,len2);
fbuf->tail=len2;
} else {
memcpy(fbuf->data+fbuf->tail,buf ,lent);
fbuf->tail+=lent;
}
fbuf->size+=lent;
Unlock(cs);
return lent;
}
void thdA(void *pcn) {


char *send_buf;
int send_nbytes;
int cn;
int wc;
int a;
int pa;

cn=(int)pcn;
Log("%03d thdA thread begin...\n",cn);
a=0;
while (1) {
Sleep(100);
memset(Abuf,a,ASIZE);
a=(a+1)%256;
if (16==a) {No_Loop=1;break;}//去掉这句可以让程序一直循环直到按Ctrl+C或Ctrl+Break或当前目录下存在文件No_Loop
send_buf=(char *)Abuf;
send_nbytes=ASIZE;
Log("%03d sending %d bytes\n",cn,send_nbytes);
HexDump(cn,send_buf,send_nbytes);
wc=0;
while (1) {
pa=PutToRBuf(cn,&cs_BBB,&BBB,send_buf,send_nbytes);
Log("%03d sent %d bytes\n",cn,pa);
HexDump(cn,send_buf,pa);
send_buf+=pa;
send_nbytes-=pa;
if (send_nbytes<=0) break;//
Sleep(1000);
if (No_Loop) break;//
wc++;
if (wc>3600) Log("%03d %d==wc>3600!\n",cn,wc);
}
if (No_Loop) break;//
}
}
int main() {
InitializeCriticalSection(&cs_log );
Log("Start===========================================================\n");
InitializeCriticalSection(&cs_HEX );
InitializeCriticalSection(&cs_BBB );

BBB.head=0;
BBB.tail=0;
BBB.size=0;

_beginthread((void(__cdecl *)(void *))thdA,0,(void *)1);
_beginthread((void(__cdecl *)(void *))thdB,0,(void *)2);

if (!access("No_Loop",0)) {
remove("No_Loop");
if (!access("No_Loop",0)) {
No_Loop=1;
}


}
while (1) {
Sleep(1000);
if (No_Loop) break;//
if (!access("No_Loop",0)) {
No_Loop=1;
}
}
Sleep(3000);
DeleteCriticalSection(&cs_BBB );
DeleteCriticalSection(&cs_HEX );
Log("End=============================================================\n");
DeleteCriticalSection(&cs_log );
return 0;
}

读书人网 >C语言

热点推荐