读书人

从crtmpserver受看具体处理rtmp协议的

发布时间: 2012-12-24 10:43:14 作者: rapoo

从crtmpserver中看具体处理rtmp协议的流程2

2.client-----发送1776个bytes数据---->server

握手第二步

ignore the client's last handshake part buffer.Ignore(1536)

在处理

_handshakeCompleted = true;

_rtmpState = RTMP_STATE_DONE;

剩下240bytes数据在bool BaseRTMPProtocol::SignalInputData(IOBuffer &buffer)中处理(其实也还是原来的地方)

result = ProcessBytes(buffer);

bool BaseRTMPProtocol::ProcessBytes(IOBuffer &buffer)中处理

在这里每次只能处理128bytes的数据

a.先判断0x03的头switch (GETIBPOINTER(buffer)[0]&0x3f)

_selectedChannel = GETIBPOINTER(buffer)[0]&0x3f;

_channels[_selectedChannel].lastInHeaderType = GETIBPOINTER(buffer)[0] >> 6;

buffer.Ignore(1);

availableBytesCount -= 1;

读RMPT的head

跳过前面大于128bytes的字节,等到和下次的100bytes何在一起(100bytes要独自去掉0x03的包头),然后再一起进入中

bool BaseRTMPAppProtocolHandler::InboundMessageAvailable(BaseRTMPProtocol *pFrom, Header &header, IOBuffer &inputBuffer)

{

1).调用_rtmpProtocolSerializer.Deserialize(header, inputBuffer, request)

函数为

bool RTMPProtocolSerializer::Deserialize(Header &header, IOBuffer &buffer, Variant &message)

{

根据H_MT(header)来判断消息的类型,这里为

RM_HEADER_MESSAGETYPE_FLEX

最终还需要在bool RTMPProtocolSerializer::DeserializeInvoke(IOBuffer &buffer, Variant &message)中利用_amf0来解码数据(分别是得到string类型的方法名和double类型的参数)

}

2).得到解码后的数据,存放到request中

3).进入return InboundMessageAvailable(pFrom, request)对消息进行响应

switch ((uint8_t) VH_MT(request)) {

case RM_HEADER_MESSAGETYPE_INVOKE:

{

return ProcessInvoke(pFrom, request);

}

}

4).具体需要处理的方法调用在

bool BaseRTMPAppProtocolHandler::ProcessInvoke(BaseRTMPProtocol *pFrom,

Variant &request) {

string functionName = request[RM_INVOKE][RM_INVOKE_FUNCTION];

uint32_t currentInvokeId = M_INVOKE_ID(request);

if (currentInvokeId != 0) {

if (_nextInvokeId[pFrom->GetId()] <= currentInvokeId) {

_nextInvokeId[pFrom->GetId()] = currentInvokeId + 1;

}

}

if (functionName == RM_INVOKE_FUNCTION_CONNECT) {

return ProcessInvokeConnect(pFrom, request);

}...

}中

这样就得出了291bytes的数据进行发送了...

5)对connect方法调用进行处理

bool RTMPAppProtocolHandler::ProcessInvokeConnect(BaseRTMPProtocol *pFrom,

Variant &request) {

找到对应处理的app

获取flvplayback

再返回到中处理:

bool BaseRTMPAppProtocolHandler::InboundMessageAvailable(BaseRTMPProtocol *pFrom,Variant &request) {

case RM_HEADER_MESSAGETYPE_INVOKE:

{

return ProcessInvoke(pFrom, request);

}

}

}

6)再去ProcessInvoke中处理消息的调用

bool BaseRTMPAppProtocolHandler::ProcessInvoke(BaseRTMPProtocol *pFrom,

Variant &request) {

if (functionName == RM_INVOKE_FUNCTION_CONNECT) {

return ProcessInvokeConnect(pFrom, request);

} ...

}

7)真正的发送消息在这里完成

bool BaseRTMPAppProtocolHandler::ProcessInvokeConnect(BaseRTMPProtocol *pFrom,Variant & request) {//1. Send the channel specific messagesVariant response = GenericMessageFactory::GetWinAckSize(2500000);if (!SendRTMPMessage(pFrom, response)) {FATAL("Unable to send message to client");return false;}response = GenericMessageFactory::GetPeerBW(2500000, RM_PEERBW_TYPE_DYNAMIC);if (!SendRTMPMessage(pFrom, response)) {FATAL("Unable to send message to client");return false;}//2. Initialize stream 0response = StreamMessageFactory::GetUserControlStreamBegin(0);if (!SendRTMPMessage(pFrom, response)) {FATAL("Unable to send message to client");return false;}//3. Send the connect resultresponse = ConnectionMessageFactory::GetInvokeConnectResult(request);if (!SendRTMPMessage(pFrom, response)) {FATAL("Unable to send message to client");return false;}//4. Send onBWDoneresponse = GenericMessageFactory::GetInvokeOnBWDone(1024 * 8);if (!SendRTMPMessage(pFrom, response)) {FATAL("Unable to send message to client");return false;}//5. Donereturn true;}


8)看看具体是如何发送的

bool BaseRTMPAppProtocolHandler::SendRTMPMessage(BaseRTMPProtocol *pTo,Variant message, bool trackResponse) {case RM_HEADER_MESSAGETYPE_ABORTMESSAGE:{return pTo->SendMessage(message);}}

又回到了

bool BaseRTMPProtocol::SendMessage(Variant & message) {//2. Send the messageif (!_rtmpProtocolSerializer.Serialize(_channels[(uint32_t) VH_CI(message)],message, _outputBuffer, _outboundChunkSize)) {FATAL("Unable to serialize RTMP message");return false;}#endif  /* ENFORCE_RTMP_OUTPUT_CHECKS */_txInvokes++;//3. Mark the connection as ready for outbound transferreturn EnqueueForOutbound();}


9)可以看到消息是需要序列化的

bool RTMPProtocolSerializer::Serialize(Channel &channel,Variant &message, IOBuffer &buffer, uint32_t chunkSize) {case RM_HEADER_MESSAGETYPE_WINACKSIZE:{result = SerializeWinAckSize(_internalBuffer, message[RM_WINACKSIZE]);break;}}//2. 检查调用结果if (!result) {FATAL("Unable to serialize message body");return false;}//3. 更新的消息长度VH_ML(message) = GETAVAILABLEBYTESCOUNT(_internalBuffer);//4. 提取头Header header;if (!Header::GetFromVariant(header, message[RM_HEADER])) {FATAL("Unable to read RTMP header: %s", STR(message.ToString()));return false;}//5. 检查和发送数据uint32_t available = 0;while ((available = GETAVAILABLEBYTESCOUNT(_internalBuffer)) != 0) {if (!header.Write(channel, buffer)) {FATAL("Unable to serialize message header");result = false;}如果超过了128,则发送两次,但是当前只有4bytes,所以进了elseif (available >= chunkSize) {buffer.ReadFromInputBuffer(&_internalBuffer, 0, chunkSize);channel.lastOutProcBytes += chunkSize;_internalBuffer.Ignore(chunkSize);} else {buffer.ReadFromInputBuffer(&_internalBuffer, 0, available);channel.lastOutProcBytes += available;_internalBuffer.Ignore(available);}}channel.lastOutProcBytes = 0;//6. Donereturn result;}


10)看看SerializeWinAckSize是如何处理的

bool RTMPProtocolSerializer::SerializeWinAckSize(IOBuffer &buffer, uint32_t value) {if (!_amf0.WriteUInt32(buffer, value, false)) {FATAL("Unable to write uint32_t value: %u", value);return false;}return true;}

value这个时候的值为2500000,在这里他被amf序列化了

11)ProcessInvokeConnect在完成SerializeWinAckSize的第二步是GetPeerBW

bool BaseRTMPAppProtocolHandler::SendRTMPMessage(BaseRTMPProtocol *pTo, Variant message, bool trackResponse) {case RM_HEADER_MESSAGETYPE_ABORTMESSAGE:{return pTo->SendMessage(message);}}}


写了4bytes数据

还是直接发送,也是走bool BaseRTMPProtocol::SendMessage(Variant & message) {}-----bool RTMPProtocolSerializer::Serialize(Channel &channel,Variant &message, IOBuffer &buffer, uint32_t chunkSize) {

case RM_HEADER_MESSAGETYPE_PEERBW:

{

result = SerializeClientBW(_internalBuffer, message[RM_PEERBW]);

}

}------------------

bool RTMPProtocolSerializer::SerializeClientBW(IOBuffer &buffer, Variant value) {if (!_amf0.WriteUInt32(buffer, value[RM_PEERBW_VALUE], false)) {FATAL("Unable to write uint32_t value: %u",(uint32_t) value[RM_PEERBW_VALUE]);return false;}if (!_amf0.WriteUInt8(buffer, value[RM_PEERBW_TYPE], false)) {FATAL("Unable to write uint8_t value: %hhu",(uint8_t) value[RM_PEERBW_TYPE]);return false;}return true;}

这回写了5bytes数据

12)ProcessInvokeConnect的第三步

//2. Initialize stream 0response = StreamMessageFactory::GetUserControlStreamBegin(0);bool BaseRTMPProtocol::SendMessage(Variant & message) {}-----bool RTMPProtocolSerializer::Serialize(Channel &channel,Variant &message, IOBuffer &buffer, uint32_t chunkSize) {case RM_HEADER_MESSAGETYPE_USRCTRL:{result = SerializeUsrCtrl(_internalBuffer, message[RM_USRCTRL]);break;}}


bool RTMPProtocolSerializer::SerializeUsrCtrl(IOBuffer &buffer, Variant message) {if (!_amf0.WriteInt16(buffer, message[RM_USRCTRL_TYPE], false)) {FATAL("Unable to write user control message type value");return false;}switch ((uint16_t) message[RM_USRCTRL_TYPE]) {...case RM_USRCTRL_TYPE_STREAM_IS_RECORDED:{if (!_amf0.WriteInt32(buffer, message[RM_USRCTRL_STREAMID], false)) {FATAL("Unable to write stream id from user control message");return false;}return true;}...}}

这回写了6bytes数据

13)ProcessInvokeConnect的第四步

//3. Send the connect resultresponse = ConnectionMessageFactory::GetInvokeConnectResult(request);bool BaseRTMPProtocol::SendMessage(Variant & message) {}-----bool RTMPProtocolSerializer::Serialize(Channel &channel,Variant &message, IOBuffer &buffer, uint32_t chunkSize) {case RM_HEADER_MESSAGETYPE_INVOKE:{result = SerializeInvoke(_internalBuffer, message[RM_INVOKE]);break;}}


bool RTMPProtocolSerializer::SerializeInvoke(IOBuffer &buffer,Variant &message) {functionName 这里为_resultstring functionName = message[RM_INVOKE_FUNCTION];if (!_amf0.WriteShortString(buffer, functionName)) {FATAL("Unable to write %s", STR(RM_INVOKE_FUNCTION));return false;}if (!_amf0.WriteDouble(buffer, message[RM_INVOKE_ID])) {FATAL("Unable to write %s", STR(RM_INVOKE_ID));return false;}FOR_MAP(message[RM_INVOKE_PARAMS], string, Variant, i) {if (!_amf0.Write(buffer, MAP_VAL(i))) {FATAL("Unable to serialize invoke parameter %s: %s",STR(MAP_KEY(i)),STR(message.ToString()));return false;}}return true;}

这回写了189bytes数据,这里超过了128bytes,Serialize中的//5. Chunk and send the data要走if的

14)ProcessInvokeConnect的第五步

response = GenericMessageFactory::GetInvokeOnBWDone(1024 * 8);bool BaseRTMPAppProtocolHandler::SendRTMPMessage(BaseRTMPProtocol *pTo,Variant message, bool trackResponse) {switch ((uint8_t) VH_MT(message)) {case RM_HEADER_MESSAGETYPE_INVOKE:{if (M_INVOKE_FUNCTION(message) != RM_INVOKE_FUNCTION_RESULT) {uint32_t invokeId = 0;if (!MAP_HAS1(_nextInvokeId, pTo->GetId())) {FATAL("Unable to get next invoke ID");return false;}if (trackResponse) {invokeId = _nextInvokeId[pTo->GetId()];_nextInvokeId[pTo->GetId()] = invokeId + 1;M_INVOKE_ID(message) = invokeId;//do not store stupid useless amount of data needed by onbwcheckif (M_INVOKE_FUNCTION(message) == RM_INVOKE_FUNCTION_ONBWCHECK)_resultMessageTracking[pTo->GetId()][invokeId] = _onBWCheckStrippedMessage;else_resultMessageTracking[pTo->GetId()][invokeId] = message;} else {M_INVOKE_ID(message) = (uint32_t) 0;}return pTo->SendMessage(message);}}}}}


bool RTMPProtocolSerializer::Serialize(Channel &channel,Variant &message, IOBuffer &buffer, uint32_t chunkSize) {bool result = false;_internalBuffer.Ignore(GETAVAILABLEBYTESCOUNT(_internalBuffer));switch ((uint32_t) VH_MT(message)) {case RM_HEADER_MESSAGETYPE_INVOKE:{result = SerializeInvoke(_internalBuffer, message[RM_INVOKE]);break;}....}


bool RTMPProtocolSerializer::SerializeInvoke(IOBuffer &buffer,Variant &message) {functionName 这里为onBWDonestring functionName = message[RM_INVOKE_FUNCTION];if (!_amf0.WriteShortString(buffer, functionName)) {FATAL("Unable to write %s", STR(RM_INVOKE_FUNCTION));return false;}if (!_amf0.WriteDouble(buffer, message[RM_INVOKE_ID])) {FATAL("Unable to write %s", STR(RM_INVOKE_ID));return false;}FOR_MAP(message[RM_INVOKE_PARAMS], string, Variant, i) {if (!_amf0.Write(buffer, MAP_VAL(i))) {FATAL("Unable to serialize invoke parameter %s: %s",STR(MAP_KEY(i)),STR(message.ToString()));return false;}}return true;}

最后一个Invoke写了30bytes数据

15)ProcessInvokeConnect终于执行完毕了

一共得到291bytes的数据

具体是这样的读取io----更加具体的细节

原来是4bytes+head的12bytes=16bytes

原来是5bytes+head的12bytes=17bytes

原来是6bytes+head的12bytes=18bytes

这里还是写12bytes,但是这里是要写functionName的_result

原来是189bytes+head的12bytes=201bytes

但是超过了chunkSize,只能先写128bytes,现在一共有191bytes,那么临时存储区还有61bytes数据,需要再写一个1bytes的头,再写后面的61bytes,处理完之后一共有253bytes

下来还是要写functionName的onBWDone

因为之前写的也是functionName,所以这里的head长度是8bytes

原来是30bytes+head的8bytes=38bytes

最终得到的buffer为4+12+5+12+6+12+128+12+1+61+30+8bytes=291bytes

把这291bytes数据直接send给client

1楼zengraoli2012-12-13 17:19
改变了两次的字体属性,发出来的cpp就会含有span标记,这让我情何以堪。算了 不改了,缺陷美吧

读书人网 >互联网

热点推荐