读书人

linux历程间的通信(C): 使用信号量进行

发布时间: 2013-02-28 11:33:09 作者: rapoo

linux进程间的通信(C): 使用信号量进行同步的共享内存机制
一、简介共享内存为在多个进程之间共享和传递数据提供了一种有效的方式。但它本身并未提供同步机制。在实际编程中,可以使用 信号量, 传递消息(使用管道或IPC消息), 生成信号, 条件变量,等方法来提供读写之间的有效的同步机制。
本例程序使用信号量进行同步,主要是因为它方便,使用广泛,且独立于进程。
本例程序实现了,生产者进程: 每次读取YUV输入文件的一帧, 然后将其写到共享内存中。消费者进程: 每次从共享内存中读到一帧, 处理后, 将其写到输出文件。两个进程间使用信号量来保证同步处理每一帧。
本例程序很好地示范了共享内存和信号量的机制,对于实际程序的开发很有意义。
二、生产者进程common.h用来设置一些测试用的基本参数。

  • /*
  • * \File
  • * common.h
  • */
  • #ifndef __COMMON_H__
  • #define __COMMON_H__

  • #define TEST_FILE "coastguard_cif.yuv"
  • #define OUTPUT_FILE "coastguard_cif_trsd.yuv"

  • #define FYUV_WIDTH 352
  • #define FYUV_HEIGHT 288

  • #endif
    共享内存相关的头文件。shm_com.h
  • /*
  • * \File
  • * shm_com.h
  • * \Brief
  • */
  • #ifndef __SHM_COM_H__
  • #define __SHM_COM_H__

  • #define SHM_SEED 1001
  • #define MAX_SHM_SIZE 2048*2048*3

  • typedef struct shared_use_st
  • {
  • int end_flag; //用来标记进程间的内存共享是否结束: 0, 未结束; 1, 结束
  • char shm_sp[MAX_SHM_SIZE]; //共享内存的空间
  • }shared_use_st;

  • #endif
    信号量的头文件semaphore.h
  • /*
  • * \File
  • * semaphore.h
  • * \Brief
  • * semaphore operation
  • */
  • #ifndef __SEMAPHORE_H__
  • #define __SEMAPHORE_H__

  • #define SEM_SEED 1000

  • union semun
  • {
  • int val;
  • struct semid_ds *buf;
  • unsigned short *array;
  • };

  • int set_semvalue(int sem_id, int value);
  • void del_semvalue(int sem_id);
  • int semaphore_p(int sem_id);
  • int semaphore_v(int sem_id);

  • #endif
    帧处理的头文件frame.h
  • /*
  • * \File
  • * frame.h
  • * \Brief
  • *
  • */
  • #ifndef __FRAME_H__
  • #define __FRAME_H__

  • #include "shm_com.h"

  • #define PIX_VALUE 1

  • typedef enum
  • {
  • YUV420,
  • YUV422,
  • YUV444,
  • RGB
  • }frame_type_e;

  • typedef struct
  • {
  • int frm_w; // width
  • int frm_h; // height
  • frame_type_e frm_type;
  • int frm_size;

  • char *frm_comps;
  • }frame_t, *frame_p;


  • int init_frame(frame_p frame, int width, int height, frame_type_e type);
  • int free_frame(frame_p frame);

  • int read_frame_from_file(frame_p frame, FILE* fp);
  • int write_frame_into_file(FILE* fp, frame_p frame);

  • int read_frame_from_shm(frame_p frame, shared_use_st* shm);
  • int write_frame_into_shm(shared_use_st* shm, frame_p frame);

  • int crop_frame(frame_p frame, int l_offset, int t_offset, int c_w, int c_h);

  • #endif
    生产者进程的基本流程:打开输入文件并初始化帧;创建并初始化共享内存和信号量;然后每次读取一帧, 用信号量获取共享内存的权限后, 将读取的帧写入共享内存, 再释放共享内存的权限。当处理完所有的帧后,设置结束标志,并释放相关的资源。producer.c
  • /*
  • * \File
  • * producer.c
  • * \Brief
  • * Test shared-memory and message-queue
  • */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <string.h>
  • #include <errno.h>
  • #include <unistd.h>

  • #include <sys/shm.h>
  • #include <sys/sem.h>

  • #include "common.h"
  • #include "semaphore.h"
  • #include "shm_com.h"
  • #include "frame.h"


  • int main(char argc, char* argv[])
  • {
  • FILE* fp_in = NULL;
  • frame_t frame;
  • int frame_cnt = 0;

  • int sem_id; // semaphore id

  • int shm_id; // shared-memory id
  • void* shared_memory = (void*)0;
  • shared_use_st* shared_stuff;

  • /* Open input file */
  • if ((fp_in = fopen(TEST_FILE, "rb")) < 0 )
  • {
  • printf("Open input file failed: %s\n", TEST_FILE);
  • exit(EXIT_FAILURE);
  • }

  • /* Init frame */
  • init_frame(&frame, FYUV_WIDTH, FYUV_HEIGHT, YUV420);
  • printf("FRAME: w = %d, h = %d\n", frame.frm_w, frame.frm_h);

  • /* Create and init semaphore */
  • sem_id = semget((key_t)SEM_SEED, 1, 0666 | IPC_CREAT);
  • if (sem_id == -1)
  • {
  • fprintf(stderr, "semget failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • /* Init shared-memory */
  • shm_id = shmget((key_t)SHM_SEED, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
  • if (shm_id == -1)
  • {
  • fprintf(stderr, "shmget failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • shared_memory = shmat(shm_id, (void*)0, 0);
  • if (shared_memory == (void*)-1)
  • {
  • fprintf(stderr, "shmat failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • shared_stuff = (struct shared_use_st*)shared_memory;
  • shared_stuff->end_flag = 0;

  • printf("FRAME_CNT: %d\n", frame_cnt);
  • set_semvalue(sem_id, 1);

  • while (read_frame_from_file(&frame, fp_in) == 0)
  • {
  • semaphore_p(sem_id);

  • /* Write it into shared memory */
  • write_frame_into_shm(shared_stuff, &frame);
  • shared_stuff->end_flag = 0;

  • semaphore_v(sem_id);

  • frame_cnt++;
  • printf("FRAME_CNT: %d\n", frame_cnt);
  • }
  • semaphore_p(sem_id);
  • shared_stuff->end_flag = 1;
  • semaphore_v(sem_id);

  • /* over */
  • printf("\nProducer over!\n");
  • fclose(fp_in);
  • free_frame(&frame);
  • del_semvalue(sem_id);
  • if (shmdt(shared_memory) == -1)
  • {
  • fprintf(stderr, "shmdt failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • exit(EXIT_SUCCESS);
  • }
    三、消费者进程消费者进程的基本流程:打开输出文件并初始化帧;获取共享内存和信号量;每次 得到共享内存的权限后, 从共享内存中读取一帧并获得结束标志 进行帧处理, 释放共享内存的权限。直到结束标志为真。
    最后释放相关的资源。consumer.c
  • /*
  • * \File
  • * consumer.c
  • * \Brief
  • *
  • */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <string.h>
  • #include <errno.h>
  • #include <unistd.h>

  • #include <sys/shm.h>
  • #include <sys/sem.h>

  • #include "common.h"
  • #include "semaphore.h"
  • #include "shm_com.h"
  • #include "frame.h"


  • int main(char argc, char *argv[])
  • {
  • FILE* fp_out = NULL;
  • frame_t frame;
  • int frame_cnt = 0;

  • int sem_id; // semaphore id

  • int shm_id; // shared-memory id
  • void* shared_memory = (void*)0;
  • shared_use_st* shared_stuff;
  • int end_flag = 0;

  • /* Open output file */
  • if ((fp_out = fopen(OUTPUT_FILE, "wb")) < 0 )
  • {
  • printf("Open output file failed: %s\n", OUTPUT_FILE);
  • exit(EXIT_FAILURE);
  • }

  • /* Init frame */
  • init_frame(&frame, FYUV_WIDTH, FYUV_HEIGHT, YUV420);
  • printf("FRAME: w = %d, h = %d\n", frame.frm_w, frame.frm_h);

  • /* Create semaphore */
  • sem_id = semget((key_t)SEM_SEED, 1, 0666 | IPC_CREAT);
  • if (sem_id == -1)
  • {
  • fprintf(stderr, "semget failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • /* Init shared-memory */
  • shm_id = shmget((key_t)SHM_SEED, sizeof(struct shared_use_st), 0666 | IPC_CREAT);
  • if (shm_id == -1)
  • {
  • fprintf(stderr, "shmget failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • shared_memory = shmat(shm_id, (void*)0, 0);
  • if (shared_memory == (void*)-1)
  • {
  • fprintf(stderr, "shmat failed.\n");
  • exit(EXIT_FAILURE);
  • }
  • shared_stuff = (struct shared_use_st*)shared_memory;

  • printf("FRAME_CNT: %d\n", frame_cnt);
  • /*
  • * 必须先置0,
  • * 否则会因生产者进程的异常退出未释放信号量而导致程序出错
  • */
  • set_semvalue(sem_id, 0);

  • do
  • {
  • semaphore_p(sem_id);

  • /* Read frame from shared-memory */
  • read_frame_from_shm(&frame, shared_stuff);
  • end_flag = shared_stuff->end_flag;

  • crop_frame(&frame, 10, 10, 40, 40);
  • write_frame_into_file(fp_out, &frame);

  • semaphore_v(sem_id);

  • frame_cnt++;
  • printf("FRAME_CNT: %d\n", frame_cnt);
  • } while(!end_flag);

  • /* Over */
  • printf("\nConsumer over!\n");
  • fclose(fp_out);
  • free_frame(&frame);
  • if (shmdt(shared_memory) == -1)
  • {
  • fprintf(stderr, "shmdt failed.\n");
  • exit(EXIT_FAILURE);
  • }

  • exit(EXIT_SUCCESS);
  • }

    四、信号量函数和帧处理函数信号量函数semaphore.c
  • /*
  • * \File
  • * semaphore.c
  • * \Breif
  • *
  • */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <unistd.h>
  • #include <sys/sem.h>

  • #include "semaphore.h"

  • /* init semaphore by semctl */
  • int set_semvalue(int sem_id, int value)
  • {
  • union semun sem_union;

  • sem_union.val = value;
  • if (semctl(sem_id, 0, SETVAL, sem_union) == -1)
  • return 1;

  • return 0;
  • }


  • /* delete semaphore by sectl */
  • void del_semvalue(int sem_id)
  • {
  • union semun sem_union;

  • if (semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
  • fprintf(stderr, "Failed to delete semaphore\n");
  • }

  • /* P(v) */
  • int semaphore_p(int sem_id)
  • {
  • struct sembuf sem_b;
  • sem_b.sem_num = 0;
  • sem_b.sem_op = -1; /* P(v) */
  • sem_b.sem_flg = SEM_UNDO;

  • if (semop(sem_id, &sem_b, 1) == -1)
  • {
  • fprintf(stderr, "semaphore_p failed\n");
  • return 1;
  • }

  • return 0;
  • }

  • /* V(v) */
  • int semaphore_v(int sem_id)
  • {
  • struct sembuf sem_b;

  • sem_b.sem_num = 0;
  • sem_b.sem_op = 1; // V(v)
  • sem_b.sem_flg = SEM_UNDO;

  • if (semop(sem_id, &sem_b, 1) == -1)
  • {
  • fprintf(stderr, "semaphore_v failed\n");
  • return 1;
  • }

  • return 0;
  • }
    帧处理函数frame.c
  • /*
  • * \File
  • * frame.c
  • */
  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <unistd.h>
  • #include <memory.h>
  • #include <assert.h>
  • #include <errno.h>

  • #include "frame.h"

  • /*
  • * \Brief
  • * init frame
  • */
  • int init_frame(frame_p frame, int width, int height, frame_type_e type)
  • {
  • assert(frame != NULL);

  • frame->frm_w = width;
  • frame->frm_h = height;
  • frame->frm_type = type;

  • switch(frame->frm_type)
  • {
  • case YUV420:
  • frame->frm_size = (frame->frm_w * frame->frm_h * 3) / 2;
  • break;
  • case YUV422:
  • frame->frm_size = frame->frm_w * frame->frm_h * 2;
  • break;
  • case YUV444:
  • frame->frm_size = frame->frm_w * frame->frm_h * 3;
  • break;
  • case RGB:
  • frame->frm_size = frame->frm_w * frame->frm_h * 3;
  • break;
  • default:
  • fprintf(stderr, "frame type is invalid.");
  • return 1;
  • }

  • if ((frame->frm_comps = (char*)calloc(frame->frm_size, sizeof(char))) == NULL)
  • {
  • fprintf(stderr, "calloc failed.");
  • return 1;
  • }

  • return 0;
  • }

  • /*
  • * \Brief
  • * init frame
  • */
  • int free_frame(frame_p frame)
  • {
  • assert(frame != NULL);

  • free(frame->frm_comps);
  • return 0;
  • }


  • /*
  • * \Brief
  • * read a frame from file
  • */
  • int read_frame_from_file(frame_p frame, FILE* fp)
  • {
  • int ret;
  • assert(frame != NULL && fp != NULL);

  • if (ret = (fread(frame->frm_comps, sizeof(char), frame->frm_size, fp))
  • != frame->frm_size)
  • {
  • fprintf(stderr, "read_frame_from_file failed. %d\n", ret);
  • return 1;
  • }

  • return 0;
  • }


  • /*
  • * \Brief
  • * write a frame into file
  • */
  • int write_frame_into_file(FILE* fp, frame_p frame)
  • {
  • assert(frame != NULL && fp != NULL);

  • if (fwrite(frame->frm_comps, sizeof(char), frame->frm_size, fp)
  • != frame->frm_size)
  • {
  • fprintf(stderr, "write_frame_into_file failed.\n");
  • return 1;
  • }

  • return 0;
  • }

  • /*
  • * \Brief
  • * read a frame from shared-memory
  • */
  • int read_frame_from_shm(frame_p frame, shared_use_st* shm)
  • {
  • assert(frame != NULL && shm != NULL);

  • memcpy((char*)frame->frm_comps, (char*)shm->shm_sp, frame->frm_size);

  • return 0;
  • }

  • /*
  • * \Brief
  • * write a frame into shared-memory
  • */
  • int write_frame_into_shm(shared_use_st* shm, frame_p frame)
  • {
  • assert(frame != NULL && shm != NULL);

  • memcpy((char*)shm->shm_sp, (char*)frame->frm_comps, frame->frm_size);

  • return 0;
  • }



  • /*
  • * \Brief
  • * crop frame
  • */
  • int crop_frame(frame_p frame, int l_offset, int t_offset, int c_w, int c_h)
  • {
  • char* crop_loc;
  • int index_h;

  • assert(frame != NULL);

  • if ((l_offset + c_w) > frame->frm_w)
  • {
  • printf("Crop width is out of range.\n");
  • return 1;
  • }
  • if ((t_offset + c_h) > frame->frm_h)
  • {
  • printf("Crop height is out of range.\n");
  • return 1;
  • }

  • crop_loc = frame->frm_comps + (t_offset * frame->frm_w) + l_offset;
  • for (index_h = 0; index_h < c_h; index_h++)
  • {
  • memset(crop_loc, PIX_VALUE, c_w);

  • crop_loc += frame->frm_w;
  • }

  • return 0;
  • }

    五、编译与运行
    makefile.producer
  • OBJECTS = producer.o semaphore.o frame.o
  • CC = gcc
  • CFLAG = -g -Wa

  • producer : $(OBJECTS)
  • $(CC) $(CFLAG) -o producer $(OBJECTS)

  • producer.o: common.h semaphore.h frame.h shm_com.h
  • semaphore.o:semaphore.h
  • frame.o: frame.h

  • .PHONY:clean
  • clean:
  • rm producer $(OBJECTS)
    makefile.consumer
  • OBJECTS = consumer.o semaphore.o frame.o
  • CC = gcc
  • CFLAG = -g -Wa

  • consumer : $(OBJECTS)
  • $(CC) $(CFLAG) -o consumer $(OBJECTS)

  • consumer.o: common.h semaphore.h frame.h shm_com.h
  • semaphore.o:semaphore.h
  • frame.o:frame.h

  • .PHONY:clean
  • clean:
  • rm consumer $(OBJECTS)
    编译与运行:
  • $make -f makefile.producer
  • $make -f makefile.consumer
  • $producer &
  • $consumer

  • 读书人网 >UNIXLINUX

    热点推荐