读书人

并行处理之摘引计数与状态的使用

发布时间: 2013-03-12 11:19:35 作者: rapoo

并行处理之引用计数与状态的使用

--------------------------------------------
标题: 并行处理之引用计数与状态的使用
作者: 叶飞虎
日期: 2013.03.09
--------------------------------------------

在并行处理过程中多线程对同一对象进行操作,而且操作各自独立,但又需要依赖对象
本身的状态,如已打开或已登录等等。这时需要给状态进行引用计数,以保证操作的有效和
并发性。

为了便于描述, 以<远程文件系统>中的客户端连接对象类(TRFConnObj)中部分代码做为
示例:

// 对象状态enum TObjState        {osInactive        = 0,        // 未打开         osClosing         = 1,        // 正在关闭         osOpening         = 2,        // 正在打开         osOpened          = 3};       // 已经打开// 引用计数加 1bool TRFConnObj::IncRefCount(){   // 初始化   bool result = false;   // 操作   Lock();   if (FState == osOpened)   {      FRefCount++;      result = true;   }   Unlock();   // 返回结果   return result;}// 引用计数减 1void TRFConnObj::DecRefCount(){   Lock();   if (FRefCount > 0)      FRefCount--;   Unlock();}// 打开long TRFConnObj::Open(){   // 初始化   long  result   = crStateInvalid;   bool  boolNext = false;   // 更改状态   Lock();   if (FState == osInactive)   {      FState   = osOpening;      boolNext = true;      FRefCount= 0;   }   else if (FState == osOpened)      result   = crSuccess;   Unlock();   // 判断是否继续   if (boolNext)   {      // 执行打开      boolNext = false;      result   = DoOpen();      // 更改状态      Lock();      if (FState != osOpening)         boolNext = true;      else if (result == crSuccess)         FState   = osOpened;      else         FState   = osInactive;      Unlock();      // 判断是否需要关闭      if (boolNext)      {         // 执行关闭         if (result == crSuccess)         {            DoClose(true);            result = crFailure;         }         // 更改状态(不需要加锁)         FState = osInactive;      }   }   // 返回结果   return result;}// 关闭void TRFConnObj::Close(bool ANeedWait){   // 初始化   bool boolNext  = false;   bool boolWait  = false;   // 更改状态   Lock();   if (FState == osOpened)   {      FState   = osClosing;      boolNext = true;   }   else if (FState == osOpening)   {      FState   = osClosing;      boolWait = true;   }   else if (FState == osClosing)      boolWait = ANeedWait;   Unlock();   // 判断是否等待   if (boolWait)   {      // 等待 Close 结束      while (FState == osClosing)         Sleep(15);   }   else if (boolNext)   {      // 执行关闭      DoClose(true);      // 更改状态(不需要加锁)      FState = osInactive;   }}// 并行操作的方法long TRFConnObj::<method>(...){   // 初始化   long result = crStateInvalid;   // 引用计数加 1   if (IncRefCount())   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}


可以把 Open 和 Close 方法理解为打开和关闭并行处理之门,只有 Open 之后才有效。
当调用 Close 方法时,若还有其它线程正在操作相关方法,则会等待操作完成后才能关闭,
若已调用 Close 方法且状态为 osClosing 且其它线程开始操作相关方法时,则会直接返回
状态无效,并拒之门外。

对象状态和引用计数配合使用会提高事件处理的并发性,且不会在边界状态时导致异常
发生,同时还可以保证系统的并发性能。在服务器程序的开发中会常用到引用计数和状态,
若完全理解 Open 和 Close 中状态处理,则有助于灵活运用到并发性高的软件开发中。

<远程文件系统>提供完整源码,下载地址如下:
远程文件系统服务器源码[http://download.csdn.net/detail/kyee/5109478]
远程文件系统客户端源码[http://download.csdn.net/detail/kyee/5109484]
服务器和客户端相关接口[http://download.csdn.net/detail/kyee/5109491]

客户端连接对象类(TRFConnObj)部分代码如下:

// 返回值及错误码enum TRFCResult        {crSuccess         =  1,       // 成功         crFailure         =  0,       // 失败         crUnknown         = -1,       // 未知错误         crNotExist        = -2,       // 不存在(如: AConnObj)         crNotStart        = -3,       // 服务器未启动         crNotConnect      = -4,       // 连接未打开         crNonsupport      = -5,       // 不支持         crVersion         = -6,       // 版本太低         crPassword        = -7,       // 密码错误         crIsExisted       = -8,       // 已存在         crIsIllegal       = -9,       // 不合法         crAttrInvalid     = -10,      // 属性无效         crStateInvalid    = -11,      // 状态无效         crHandleInvalid   = -12,      // 句柄无效         crAccessIllegal   = -13};     // 存取非法// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~/* TRFConnObj - 客户端连接对象类 */class TRFConnObj{public:   TRFConnObj(TRFConnEvent* AEventObj  = NULL, const char* AHost     = NULL,                      long  APort      = 0,    const char* APassword = NULL);   virtual ~TRFConnObj();   // 属性   void*             Data() const         { return FData; }   TRCConnection*    ConnObj() const      { return FConnObj; }   TRFConnEvent*     EventObj() const     { return FEventObj; }   Longword          CallTimeout() const  { return FCallTimeout; }   TObjState         State() const        { return FState; }   KYString          RFVersion() const    { return FRFInfo.Version; }   TDateTime         RFStartTime() const  { return FRFInfo.StartTime; }   TDateTime         RFDateTime();   // 设置属性   void              SetData(void* AData) { FData = AData; }   void              SetCallTimeout(Longword ATimeout);   // 打开/关闭连接   long              Open();   void              Close(bool ANeedWait = false);   // 读取/设置文件属性   long              GetFileAttr(const char* AFileName, long* Attrib);   long              SetFileAttr(const char* AFileName, long  Attrib);   // 文件存在/删除/移动文件或目录   long              FileExisted(const char* AFileName);   long              DeleteFile(const char* AFileName);   long              MoveFile(const char* AOldName, const char* ANewName);   // 目录存在/创建/删除   long              DirExisted(const char* APathName);   long              CreateDir(const char* APathName, bool AForced = false);   long              RemoveDir(const char* APathName, bool AForced = false);   // 磁盘操作相关函数   long              DriveType(const char* ADrive, long* AType);   long              DiskSpace(const char* APath, __int64* ATotalSpace,                                                  __int64* AFreeSpace);protected:   // 状态锁   void              Lock() const         { FLock->Enter(); }   void              Unlock() const       { FLock->Leave(); }   // 对象次数增减   long              IncObjTimes()        { return InterlockedIncrement(&FObjTimes); }   long              DecObjTimes()        { return InterlockedDecrement(&FObjTimes); }   // 引用计数增减   bool              IncRefCount_Valid();   bool              IncRefCount();   void              DecRefCount();   // 读取 RF 的信息   void              GetRFInfo();   void              GetRFDateTime();   // 存取 RF 属性方法   long              GetRFInt(long Attrib, long& AValue);   long              GetRFStr(long Attrib, KYString& AValue);   long              SetRFInt(long Attrib, long AValue);   long              SetRFStr(long Attrib, const char* AValue, long ALen);private:   // 执行初始化/释放   void              DoInit();   void              DoFreeing()          { FEventObj->DoFreeing(this); }   // 执行打开/关闭   long              DoOpen();   void              DoClose(bool ANeedClose);   // 执行断开连接   void              DoDisconnect();   void              CallOnDisconnect();   // 执行 RC 事件的回调方法   void              DoRecvData(const void* AData, long ASize);private:   void*             FData;               // 自定义数据   TKYCritSect*      FLock;               // 状态锁   TRCConnection*    FConnObj;            // RC 连接对象   TRFConnEvent*     FEventObj;           // 事件对象   TObjState         FState;              // 状态   long              FObjTimes;           // 对象次数   long              FRefCount;           // 引用计数   Longword          FCallTimeout;        // 调用超时时长(毫秒)   TRFInfo           FRFInfo;             // 存储服务器信息   Longword          FLastTick;           // 读取最后一次 RFDateTime 的 tick 值   TDateTime         FDeltaTime;          // 存储服务器的当前时间差值   TRFCOnDisconnect  FOnDisconnect;       // OnDisconnect 回调事件函数private:   // RC 连接的回调函数   static void __stdcall   _DoRCCallback(long AConnID, long AEvent,                                         long ACmdID,  long AReturn);   static void __stdcall   _DoRCRecvData(long AConnID, long ASize, const void* AData);};


/* TRFConnObj - 客户端连接对象类 */// ---------------- 静态函数 ----------------// RC 连接的回调函数void __stdcall TRFConnObj::_DoRCCallback(long AConnID, long AEvent,                                         long ACmdID,  long AReturn){   if (AEvent == rcvConnClosed)      try      {         TRCConnection* objConn = ConnID2Object(AConnID);         if (objConn != NULL)            ((TRFConnObj*)objConn->Data())->DoDisconnect();      }      catch (...) {}}// RC 接收自定义数据的回调函数void __stdcall TRFConnObj::_DoRCRecvData(long AConnID, long  ASize,                                                 const void* AData){   try   {      TRCConnection* objConn = ConnID2Object(AConnID);      if (objConn != NULL)         ((TRFConnObj*)objConn->Data())->DoRecvData(AData, ASize);   }   catch (...) {}}// ---------------- 构造函数和析构函数 ----------------// 构造函数TRFConnObj::TRFConnObj(TRFConnEvent* AEventObj, const char* AHost,                               long  APort,     const char* APassword){   // 初始化   FData          = NULL;   FState         = osInactive;   FEventObj      = AEventObj;   FObjTimes      = 1;   FRefCount      = 0;   FCallTimeout   = Timeout_DefCall;   // 初始化函数指针   FOnDisconnect  = NULL;   // 创建连接对象   FConnObj       = new TRCConnection(AHost, RF_AppName, APassword, APort,                                      &TRFConnObj::_DoRCCallback);   // 设置连接对象属性   FConnObj->SetData(this);   FConnObj->SetLinger(0);   FConnObj->SetOnRecvData(&TRFConnObj::_DoRCRecvData);   // 创建对象   FLock          = new TKYCritSect;   // 执行初始化   DoInit();}// 析构函数TRFConnObj::~TRFConnObj(){   // 关闭   Close(true);   FConnObj->SetData(NULL);   // 执行释放方法   DoFreeing();   // 释放对象   FreeAndNil(FConnObj);   FreeAndNil(FLock);}// ---------------- 私有函数 ----------------// 执行初始化void TRFConnObj::DoInit(){   // 初始化   FLastTick            = 0;   FDeltaTime           = 0.0;   // 初始化信息   FRFInfo.StartTime    = 0.0;   // 清除   FRFInfo.Version.Clear();}// 执行打开long TRFConnObj::DoOpen(){   // 初始化   long result = crFailure;   // 打开连接   switch (RCConnOpen(FConnObj->ConnID()))   {   case CRet_Success:      // 加载命令      if (FConnObj->LoadCmd())      {         // 取 RF 信息         GetRFInfo();         // 返回成功         result = crSuccess;      }      else         FConnObj->Close();      break;   case CRet_Failure:            result = crNotConnect;  break;   case CRet_AppPass_Error:      result = crPassword;    break;   case CRet_AppName_NotExist:   result = crNotStart;    break;   case CRet_ConnID_NotExist:    result = crNotExist;    break;   default:                      result = crUnknown;   }   // 返回结果   return result;}// 执行关闭void TRFConnObj::DoClose(bool ANeedClose){   // 判断是否需要关闭   if (ANeedClose)      FConnObj->Close();   // 等待 RefCount == 0   while (FRefCount > 0)      Sleep(15);   // 执行初始化   DoInit();}// 执行连接的断开事件void TRFConnObj::DoDisconnect(){   // 初始化   bool boolNext  = false;   bool boolClose = false;   // 检查状态   Lock();   if (FState >= osOpened)   {      boolClose   = true;      boolNext    = true;      FState      = osClosing;   }   else if (FState == osOpening)      FState      = osClosing;   else      boolNext    = (FState == osClosing);   Unlock();   // 判断是否继续   if (boolNext)   {      // 激发 OnDisconnect 事件方法      FEventObj->DoDisconnect(this);      // 激发 OnDiconnect 函数指针      CallOnDisconnect();      // 判断是否需要关闭      if (boolClose)      {         // 执行关闭         DoClose(false);         // 更改状态(不需要加锁)         FState = osInactive;      }   }}// 激发 OnDiconnect 函数指针void TRFConnObj::CallOnDisconnect(){   if (FOnDisconnect != NULL)      __try      {         (*FOnDisconnect)(this);      }      __except (_ExceptIgnore) {}}// 执行 RC 接收自定义数据方法void TRFConnObj::DoRecvData(const void* AData, long ASize){}// ---------------- 保护函数 ----------------// 引用计数加 1bool TRFConnObj::IncRefCount_Valid(){   // 初始化   bool result = false;   // 操作   Lock();   if (FState >= osOpening)   {      FRefCount++;      result = true;   }   Unlock();   // 返回结果   return result;}// 引用计数加 1bool TRFConnObj::IncRefCount(){   // 初始化   bool result = false;   // 操作   Lock();   if (FState == osOpened)   {      FRefCount++;      result = true;   }   Unlock();   // 返回结果   return result;}// 引用计数减 1void TRFConnObj::DecRefCount(){   Lock();   if (FRefCount > 0)      FRefCount--;   Unlock();}// ---------------- 公有函数 ----------------// 打开long TRFConnObj::Open(){   // 初始化   long  result   = crStateInvalid;   bool  boolNext = false;   // 更改状态   Lock();   if (FState == osInactive)   {      FState   = osOpening;      boolNext = true;      FRefCount= 0;   }   else if (FState == osOpened)      result   = crSuccess;   Unlock();   // 判断是否继续   if (boolNext)   {      // 执行打开      boolNext = false;      result   = DoOpen();      // 更改状态      Lock();      if (FState != osOpening)         boolNext = true;      else if (result == crSuccess)         FState   = osOpened;      else         FState   = osInactive;      Unlock();      // 判断是否需要关闭      if (boolNext)      {         // 执行关闭         if (result == crSuccess)         {            DoClose(true);            result = crFailure;         }         // 更改状态(不需要加锁)         FState = osInactive;      }   }   // 返回结果   return result;}// 关闭void TRFConnObj::Close(bool ANeedWait){   // 初始化   bool boolNext  = false;   bool boolWait  = false;   // 更改状态   Lock();   if (FState == osOpened)   {      FState   = osClosing;      boolNext = true;   }   else if (FState == osOpening)   {      FState   = osClosing;      boolWait = true;   }   else if (FState == osClosing)      boolWait = ANeedWait;   Unlock();   // 判断是否等待   if (boolWait)   {      // 等待 Close 结束      while (FState == osClosing)         Sleep(15);   }   else if (boolNext)   {      // 执行关闭      DoClose(true);      // 更改状态(不需要加锁)      FState = osInactive;   }}// 文件存在long TRFConnObj::FileExisted(const char* AFileName){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((AFileName == NULL) || (*AFileName == '\0'))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 文件删除long TRFConnObj::DeleteFile(const char* AFileName){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((AFileName == NULL) || (*AFileName == '\0'))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 移动文件或目录long TRFConnObj::MoveFile(const char* AOldName, const char* ANewName){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((AOldName == NULL) || (*AOldName == '\0')                          || (ANewName == NULL) || (*ANewName == '\0'))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 目录存在long TRFConnObj::DirExisted(const char* APathName){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((APathName == NULL) || (*APathName == '\0'))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 目录创建long TRFConnObj::CreateDir(const char* APathName, bool AForced){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((APathName == NULL) || (*APathName == '\0'))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 目录删除long TRFConnObj::RemoveDir(const char* APathName, bool AForced){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((APathName == NULL) || (*APathName == '\0'))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 取驱动器类型long TRFConnObj::DriveType(const char* ADrive, long* AType){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((ADrive == NULL) || (*ADrive == '\0') || (AType == NULL))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}// 取磁盘空闲空间long TRFConnObj::DiskSpace(const char* APath, __int64* ATotalSpace,                                              __int64* AFreeSpace){   // 初始化   long result = crStateInvalid;   // 检查参数   if ((APath == NULL) || (*APath == '\0')                       || ((ATotalSpace == NULL) && (AFreeSpace == NULL)))      result = crIsIllegal;   else if (IncRefCount()) // 引用计数加 1   {      // 操作      // ??? ... ...      // 引用计数减 1      DecRefCount();   }   // 返回结果   return result;}

--------------------------------------------

读书人网 >编程

热点推荐