OpenRTMFP/Cumulus Primer(8)CumulusServer主进程主循环分析
OpenRTMFP/Cumulus Primer(8)CumulusServer主进程主循环分析- 作者:柳大Poechant(钟超)
- 博客:Blog.CSDN.net/Poechant
- 邮箱:zhongchao.ustc#gmail.com (# -> @)
- 日期:April 14th, 2012
该主循环在RTMFPServer::run(const volatile bool& terminate)函数中。RTMFPServer覆盖Startable的run(const volatile bool &terminate)方法。
void RTMFPServer::run(const volatile bool& terminate) {
1 绑定地址
CumulusServer的 IP 地址和端口:
SocketAddress address("0.0.0.0",_port); _socket.bind(address,true);
绑定CumulusEdge的 IP 地址和端口:
SocketAddress edgesAddress("0.0.0.0",_edgesPort); if (_edgesPort>0) _edgesSocket.bind(edgesAddress,true);
发送者(Client)的 IP 地址和端口:
SocketAddress sender; UInt8 buff[PACKETRECV_SIZE]; int size = 0; while (!terminate) { bool stop=false; bool idle = realTime(stop); if(stop) break; _handshake.isEdges=false;
2?CumulusServer接收数据:
CumulusServer的 socket 有数据可读:
if (_socket.available() > 0) { try {
从 socket 读取:
- 把数据存到 buff,
- 把发送者地址赋给 sender,
把所读长度返回给 size */
size = _socket.receiveFrom(buff,sizeof(buff),sender);
处理CumulusServer的 socket 产生的异常:
} catch(Exception& ex) { DEBUG("Main socket reception : %s",ex.displayText().c_str()); _socket.close(); _socket.bind(address,true); continue; }
2 如果CumulusEdge端口存在且 edge socket 可用。
CumulusEdge的 socket 有数据可读:
} else if (_edgesPort > 0 && _edgesSocket.available() > 0) { try { size = _edgesSocket.receiveFrom(buff, sizeof(buff), sender); _handshake.isEdges = true; } catch(Exception& ex) { DEBUG("Main socket reception : %s", ex.displayText().c_str()); _edgesSocket.close(); _edgesSocket.bind(edgesAddress, true); continue; } Edge* pEdge = edges(sender); if (pEdge) pEdge->update();
3?CumulusServer和CumulusEdge的 socket 都没有数据: } else {
CumulusServer空闲:
if (idle) {
主线程等待一秒。
Thread::sleep(1); if (!_timeLastManage.isElapsed(_freqManage)) {
Just middle session
if (_middle) { Sessions::Iterator it; for (it = _sessions.begin(); it != _sessions.end(); ++it) { Middle* pMiddle = dynamic_cast<Middle*>(it->second); if (pMiddle) pMiddle->manage(); } } } else { _timeLastManage.update(); manage(); } } continue; }
4 发送方的 ip 被禁: if (isBanned(sender.host())) { INFO("Data rejected because client %s is banned", sender.host().toString().c_str()); continue; }
5 数据包长度小于可能的最小值(12) if (size < RTMFP_MIN_PACKET_SIZE) { ERROR("Invalid packet"); continue; } PacketReader packet(buff,size); Session* pSession = findSession(RTMFP::Unpack(packet)); if (!pSession) continue; if (!pSession->checked) _handshake.commitCookie(*pSession);
给CumulusEdge或者自己(CumulusServer)的 socket:
pSession->setEndPoint(_handshake.isEdges ? _edgesSocket : _socket,sender); pSession->receive(packet); } _handshake.clear(); _sessions.clear(); _socket.close(); if (_edgesPort>0) _edgesSocket.close(); if(_pCirrus) { delete _pCirrus; _pCirrus = NULL; }}
-
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant
-
该主循环在RTMFPServer::run(const volatile bool& terminate)函数中。RTMFPServer覆盖Startable的run(const volatile bool &terminate)方法。
void RTMFPServer::run(const volatile bool& terminate) {1 绑定地址
CumulusServer的 IP 地址和端口:
SocketAddress address("0.0.0.0",_port); _socket.bind(address,true);绑定CumulusEdge的 IP 地址和端口:
SocketAddress edgesAddress("0.0.0.0",_edgesPort); if (_edgesPort>0) _edgesSocket.bind(edgesAddress,true);发送者(Client)的 IP 地址和端口:
SocketAddress sender; UInt8 buff[PACKETRECV_SIZE]; int size = 0; while (!terminate) { bool stop=false; bool idle = realTime(stop); if(stop) break; _handshake.isEdges=false;2?CumulusServer接收数据:
CumulusServer的 socket 有数据可读:
if (_socket.available() > 0) { try {从 socket 读取:
- 把数据存到 buff,
- 把发送者地址赋给 sender,
把所读长度返回给 size */
size = _socket.receiveFrom(buff,sizeof(buff),sender);
处理CumulusServer的 socket 产生的异常:
} catch(Exception& ex) { DEBUG("Main socket reception : %s",ex.displayText().c_str()); _socket.close(); _socket.bind(address,true); continue; }2 如果CumulusEdge端口存在且 edge socket 可用。
CumulusEdge的 socket 有数据可读:
} else if (_edgesPort > 0 && _edgesSocket.available() > 0) { try { size = _edgesSocket.receiveFrom(buff, sizeof(buff), sender); _handshake.isEdges = true; } catch(Exception& ex) { DEBUG("Main socket reception : %s", ex.displayText().c_str()); _edgesSocket.close(); _edgesSocket.bind(edgesAddress, true); continue; } Edge* pEdge = edges(sender); if (pEdge) pEdge->update();3?CumulusServer和CumulusEdge的 socket 都没有数据: } else {
} else {CumulusServer空闲:
if (idle) {主线程等待一秒。
Thread::sleep(1); if (!_timeLastManage.isElapsed(_freqManage)) {Just middle session
if (_middle) { Sessions::Iterator it; for (it = _sessions.begin(); it != _sessions.end(); ++it) { Middle* pMiddle = dynamic_cast<Middle*>(it->second); if (pMiddle) pMiddle->manage(); } } } else { _timeLastManage.update(); manage(); } } continue; }4 发送方的 ip 被禁: if (isBanned(sender.host())) { INFO("Data rejected because client %s is banned", sender.host().toString().c_str()); continue; }
5 数据包长度小于可能的最小值(12) if (size < RTMFP_MIN_PACKET_SIZE) { ERROR("Invalid packet"); continue; } PacketReader packet(buff,size); Session* pSession = findSession(RTMFP::Unpack(packet)); if (!pSession) continue; if (!pSession->checked) _handshake.commitCookie(*pSession);
if (isBanned(sender.host())) { INFO("Data rejected because client %s is banned", sender.host().toString().c_str()); continue; } if (size < RTMFP_MIN_PACKET_SIZE) { ERROR("Invalid packet"); continue; } PacketReader packet(buff,size); Session* pSession = findSession(RTMFP::Unpack(packet)); if (!pSession) continue; if (!pSession->checked) _handshake.commitCookie(*pSession);给CumulusEdge或者自己(CumulusServer)的 socket:
pSession->setEndPoint(_handshake.isEdges ? _edgesSocket : _socket,sender); pSession->receive(packet); } _handshake.clear(); _sessions.clear(); _socket.close(); if (_edgesPort>0) _edgesSocket.close(); if(_pCirrus) { delete _pCirrus; _pCirrus = NULL; }}-
转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant
-