nginx-push-stream模块源码学习(三)——发布
一、概述
发布:发布者将MSG post到某一特定通道上,channel将信息缓存
在说明发布流程之前有必要说明下channel和msg的数据结构。
二、数据结构
2.1 MSG
发布时,模块先将消息转化为ngx_http_push_stream_msg_t的数据结构进行存储。
对于删除channel和获取channel info的流程比较简单,不做阐述,具体说明下发布消息流程,流程图如图所示:

需要说明的是“向所有订阅者发送MSG”的过程:
向每个有该channel订阅者的worker(workers_with_subscriber)的消息链表中插入一条消息向上述worker发送CHECK_MESSAGES指令,触发msg发送流程(ngx_http_push_stream_process_worker_message)
MSG发送(ngx_http_push_stream_process_worker_message):
// now let's respond to some requests! //对于该channel上的所有订阅者 while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != subscribers_sentinel) { ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value; //如果订阅者为longpolling模式 if (subscriber->longpolling) { ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue); //发送longpolling头(last Modified/Etag) ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->request->pool); ngx_http_send_header(subscriber->request); //发送模块配置header模板 ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module)); //发送响应MSG ngx_http_push_stream_send_response_message(subscriber->request, channel, msg); //发送footer模板,last chunck("\0"CRLF CRLF) ngx_http_push_stream_send_response_finalize(subscriber->request); cur = prev; } else {//stream或polling模式 if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg) == NGX_ERROR) { ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur->queue); ngx_http_push_stream_send_response_finalize(subscriber->request); cur = prev; } }说明:
可以看出push stream模块在发布过程中针对longpolling和stream两种模式的不同:
Longpolling模式下,每次发布消息时会发送longpolling头:last modified和etag,使得客户端下次请求时可据此判断服务端是否有更新的消息待发布。longpolling模式下,订阅者每次请求都会在获得数据后断开重连,所以每次发布时都会发送header模板ngx_http_push_stream_send_response_finalize同时会清理订阅者