用PV操作实现多线程间同步互斥(PV)
在计算机操作系统中,PV操作是进程管理中的难点。
首先应弄清PV操作的含义:PV操作由P操作原语和V操作原语组成(原语是不可中断的过程),对信号量进行操作,具体定义如下:
????P(S):①将信号量S的值减1,即S=S-1;
???????????②如果S30,则该进程继续执行;否则该进程置为等待状态,排入等待队列。
????V(S):①将信号量S的值加1,即S=S+1;
???????????②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程。
PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信。
什么是信号量?信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。当它的值大于0时,表示当前可用资源的数量;当它的值小于0时,其绝对值表示等待使用该资源的进程个数。注意,信号量的值仅能由PV操作来改变。
????一般来说,信号量S30时,S表示可用资源的数量。执行一次P操作意味着请求分配一个单位资源,因此S的值减1;当S<0时,表示已经没有可用资源,请求者必须等待别的进程释放该类资源,它才能运行下去。而执行一个V操作意味着释放一个单位资源,因此S的值加1;若S£0,表示有某些进程正在等待该资源,因此要唤醒一个等待状态的进程,使之运行下去。
????利用信号量和PV操作实现进程互斥的一般模型是:
?
?
进程P1??????????????进程P2???????????……??????????进程Pn
……??????????????????……???????????????????????????……
P(S);??????????????P(S);?????????????????????????P(S);
临界区;?????????????临界区;????????????????????????临界区;
V(S);??????????????V(S);????????????????????????V(S);
……??????????????????……????????????……???????????……
????其中信号量S用于互斥,初值为1。
????使用PV操作实现进程互斥时应该注意的是:
????(1)每个程序中用户实现互斥的P、V操作必须成对出现,先做P操作,进临界区,后做V操作,出临界区。若有多个分支,要认真检查其成对性。
????(2)P、V操作应分别紧靠临界区的头尾部,临界区的代码应尽可能短,不能有死循环。
(3)互斥信号量的初值一般为1。
????利用信号量和PV操作实现进程同步
PV操作是典型的同步机制之一。用一个信号量与一个消息联系起来,当信号量的值为0时,表示期望的消息尚未产生;当信号量的值非0时,表示期望的消息已经存在。用PV操作实现进程同步时,调用P操作测试消息是否到达,调用V操作发送消息。
????使用PV操作实现进程同步时应该注意的是:
????(1)分析进程间的制约关系,确定信号量种类。在保持进程间有正确的同步关系情况下,哪个进程先执行,哪些进程后执行,彼此间通过什么资源(信号量)进行协调,从而明确要设置哪些信号量。
????(2)信号量的初值与相应资源的数量有关,也与P、V操作在程序代码中出现的位置有关。
????(3)同一信号量的P、V操作要成对出现,但它们分别在不同的进程代码中。
【例1】生产者-消费者问题
在多道程序环境下,进程同步是一个十分重要又令人感兴趣的问题,而生产者-消费者问题是其中一个有代表性的进程同步问题。下面我们给出了各种情况下的生产者-消费者问题,深入地分析和透彻地理解这个例子,对于全面解决操作系统内的同步、互斥问题将有很大帮助。
?
?
?
(1)一个生产者,一个消费者,公用一个缓冲区。
定义两个同步信号量:
empty——表示缓冲区是否为空,初值为1。
?
消费者进程
while(True){
P(full);
???从Buffer取出一个产品;
???V(empty);
???消费该产品;
???}
full——表示缓冲区中是否为满,初值为0。
?
生产者进程
while(TRUE){
生产一个产品;
?????P(empty);
?????产品送往Buffer;
?????V(full);
?????}
?
?
?
(2)一个生产者,一个消费者,公用n个环形缓冲区。
定义两个同步信号量:
empty——表示缓冲区是否为空,初值为n。
full——表示缓冲区中是否为满,初值为0。
?
消费者进程
while(TRUE){
?P(full);
???从buffer(out)中取出产品;
???out=(out+1)mod n;
???V(empty);
???消费该产品;
???}
?
?
?
????设缓冲区的编号为1~n-1,定义两个指针in和out,分别是生产者进程和消费者进程使用的指针,指向下一个可用的缓冲区。
?
生产者进程
while(TRUE){
?????生产一个产品;
?????P(empty);
?????产品送往buffer(in);
?????in=(in+1)mod n;
?????V(full);
?????}
?
?
?
(3)一组生产者,一组消费者,公用n个环形缓冲区
????在这个问题中,不仅生产者与消费者之间要同步,而且各个生产者之间、各个消费者之间还必须互斥地访问缓冲区。
定义四个信号量:
empty——表示缓冲区是否为空,初值为n。
full——表示缓冲区中是否为满,初值为0。
mutex1——生产者之间的互斥信号量,初值为1。
mutex2——消费者之间的互斥信号量,初值为1。
????设缓冲区的编号为1~n-1,定义两个指针in和out,分别是生产者进程和消费者进程使用的指针,指向下一个可用的缓冲区。
---------------------------------------------------------
//*******************************************************************************************/
---------------------------------------------------------
?
?
?
?
?
1任务
(1)用PV操作实现多线程间同步互斥;
?
?
(2)显示一个服务线程(i)为某个客户线程(j)提供服务的服务线程号与客户线程号,当服务结束时,打印其返回值,并释放被服务完的客户线程,让等待客户线程使用,直到所有的客户线程都被得到服务;
?
?
(3)程序中可设置不同的服务线程(N)和客户线程(M)的个数。
?
?
2?程序设计
main()?中启动3个服务线程server ( 1-3 )和50个客户线程client( 1-50 ),client()中先用信号量Vacancy确保有某服务线程空闲,然后在该服务线程相应的a[i]中登记客户线程号,并置Ready[i]=1,等待服务结束后取出返回结果a[i].rv。Server(i)中先等待Ready[i] = 1,然后提供服务,最后将结果放入a[i].rv中。程序中的CCriticalSection::Lock()通常被称为P操作,而CCriticalSection::Unlock()被称为V操作。
?
?
2.1?功能设计
(1)用PV操作实现多线程间同步互斥;
?
?
(2)显示一个服务线程(i)为某个客户线程(j)提供服务的服务线程号与客户线程号,当服务结束时,打印其返回值,并释放被服务完的客户线程,让等待客户线程使用,直到所有的客户线程都被得到服务;
?
?
(3)通过命令行参数设置不同的服务线程(N)和客户线程(M)的个数。
?
?
为了看清某服务线程正在为哪个客户线程服务,特定义结构数组如下:
?
?
struct{
?
?
int flag;??????// flag=1: available, flag=o:busy
?
?
int clientno;??//?客户线程号
?
?
int rv;????????//?给客户线程的返回值
?
?
} a[N];
?
?
并为其定义互斥信号量Mutex(?初始值1,最大值1?)。
?
?
为弄清是否还有空闲服务线程存在,特定义信号量Vacancy(?初始值N,最大值N)。
?
?
此外,为了同步客户线程和服务线程,定义信号量组:
?
?
Ready[N](?初始值0,最大值1?)和Finish[N](?初始值0,最大值1)。
?
?
2.2?程序
#i nclude“stdafx. h”
#i nclude <stdlib. h>
#i nclude <stdio. h>
#i nclude <process. h>
#i nclude“afxmt. h”
?
?
?
#define N 3????????// N个服务线程
#define M 50???????// M个客户线程
?
?
?
void client ( int );
void server( int serverno );
void tmain( int ,TCHAR **,TCHAR **);
?
?
?
typedef struct ClientControl {
int flag;?????// flag=1 for vacanay
int clientno;??//?客户线程号
int rv;??????//?客户线程的返回值
}ClientControl
?
?
?
ClientControl????a [N];
CCriticalSection??slMutex;
CSemaphore?????*vacanay = new CSemaphore(N,N);
CEvent??????????apslReady[N];
CEvent??????????apslFinish[N];
?
?
?
/*******************************
*client——client thread
*******************************/
void client ( int clientno )
{
CsingleLock slVacancy( vacancy );// vacancy为全局变量
int i;
?
?
?
slVacancy. Lock ();?????????????//?获取服务线程
slMutex. Lock ();??????????????//?进入互斥空间
for ( i = 0;i < N; i ++ )
if ( a[i].flag == 1 )??{??????//?服务线程i空间
?
?
a[i].flag = 0;????????//?控制其他客户线程
?
?
a[i].clientno = clientno;
apslReady[ i ].SetEvent();??//?服务线程就绪
break;
}
slMutex.Unlock ();??????????????????//?释放互斥空间
?
?
::WaitForSingleOject( apslfinish[i],INFINITE );
/**********************************************
?
?
等待此服务线程完成对该客户线程的服务,需要server??apsFinish[ serverno ].SetEvent()作决定
?
?
*********************************************/
?
?
apslFinish[i].ResetEvent();??????????????//?重置服务线程
?
?
slMutex.Lock ();??????????????????????//?再次进入互斥空间
a[i].flag = 1;?????????????????????????//?释放服务线程i
?
?
printf (“/t/t server %d free %d:rv = %d/n”, i, a[i].rv );???//?打印结果
slMutex.Unlock ();
slVacansy.Unlock ();
}
/*******************************
*server——server thread
*******************************/
void server( int serverno )
{
whlie ( 1 )???{
::WaitForSingleOject( apslReady[ serverno ],INFINITE );
//若apslReady信号为1,说明有客户线程等待,继续;不为1,则等待
?
?
apslReady[serverno]。ResetEvent();
slMutex.Lock ();
printf ( “/t/t server %d—> cliend %d/n”,serverno,a[serverno].clientno,);
slMutex.Unlock();
a[ servero ].rv = a[ servero ].clientno;
for( int i = 0;i < 10000;i ++ )??{
//?避免只有一个服务线程为所有的客户线程服务
?
?
apslFinish[ serverno ].SetEvent();
slMuter.Unlock( 1 );
}
}
?
?
?
void tmain( int argv,TCHAR *argv[],TCHAR *envp[] )
{
int nReCode = 0;
int i;
?
?
?
if( !AfxWinInit(::GetModuleHandle( NULL ),NULL,::getCommandLine(),0) ) {
//?出错处理
cerr << _T( “Fatal Error{ MFC initialization Failed” } << end1;
nReCode = 1;
}
else??{
cout<< /t”BIGIN !”<< end1;
}
for ( i = 0;i < N; i ++) {??//?初始化所有的服务线程,置为空闲状态
a[i].flag = 1;
}
for ( i = 0;i < N; i ++)??//?启动服务线程
-beginthread (( void (*) ( void *) )server,0, ( void *) i );
?
?
?
for ( i = 0;i < N; i ++)??//启动客户线程
-beginthread (( void (*) ( void ) )client,0, ( void *) i );
?
?
?
// wait for all threads’ completion
Sleep (10000);
Printf ( “All done/n” );
}
?
?
?
3.运行结果
server2 free,rv=27
?
?
server3->client18
?
?
server0->client28
?
?
server0 free,rv=28
?
?
server1->client19
?
?
server1 free,rv=19
?
?
server2->client20
?
?
server3 free,rv=18
?
?
server0->client21
?
?
server1->client22
?
?
server4->client29
?
?
server2 free,rv=20
?
?
server0 free,rv=21
?
?
server1 free,rv=22
?
?
server2->client23
?
?
server4 free,rv=29
?
?
server0->client24
?
?
server1->client26
?
?
server0 free,rv=24
?
?
server3->client25
?
?
server2 free,rv=23
?
?
server1 free,rv=26
?
?
server3 free,rv=25