读书人

zeroMQ初体验-32.发布/订阅模式进阶-克

发布时间: 2012-10-07 17:28:51 作者: rapoo

zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导致订阅者丢失了所需的数据,如此,便有了重新获得的需求。通常来说,这个会由订阅者来完成,不过"千百个哈姆雷特"从工程的角度来看,实在不忍睹,完全违背了"复用"的概念。于是乎,"克隆模式"便呼之待出了。在发布端存储下这些消息,为了避免队列的堆积这样的杯具,也为了更好的订阅体验,kev-value似乎是不错的选择。

注意:这里的kev-value并非目前红火的nosql(虽然有些类似),可以理解成发布者的数据仓库(应该可以这么理解吧)。

为了简单明了,这里将会对整个机制做一个拆解。

更新数据的存储
模型图:

服务器:



服务器:

服务器:
////??Clone?client?Model?Four////??Lets?us?build?this?source?without?creating?a?library#include?"kvsimple.c"#define?SUBTREE?"/client/"int?main?(void){????//??Prepare?our?context?and?subscriber????zctx_t?*ctx?=?zctx_new?();????void?*snapshot?=?zsocket_new?(ctx,?ZMQ_DEALER);????zsocket_connect?(snapshot,?"tcp://localhost:5556");????void?*subscriber?=?zsocket_new?(ctx,?ZMQ_SUB);????zsocket_connect?(subscriber,?"tcp://localhost:5557");????zsockopt_set_subscribe?(subscriber,?SUBTREE);????void?*publisher?=?zsocket_new?(ctx,?ZMQ_PUSH);????zsocket_connect?(publisher,?"tcp://localhost:5558");????zhash_t?*kvmap?=?zhash_new?();????srandom?((unsigned)?time?(NULL));????//??Get?state?snapshot????int64_t?sequence?=?0;????zstr_sendm?(snapshot,?"ICANHAZ?");????zstr_send??(snapshot,?SUBTREE);????while?(TRUE)?{????????kvmsg_t?*kvmsg?=?kvmsg_recv?(snapshot);????????if?(!kvmsg)????????????break;??????????//??Interrupted????????if?(streq?(kvmsg_key?(kvmsg),?"KTHXBAI"))?{????????????sequence?=?kvmsg_sequence?(kvmsg);????????????printf?("I:?received?snapshot=%d\n",?(int)?sequence);????????????kvmsg_destroy?(&kvmsg);????????????break;??????????//??Done????????}????????kvmsg_store?(&kvmsg,?kvmap);????}????int64_t?alarm?=?zclock_time?()?+?1000;????while?(!zctx_interrupted)?{????????zmq_pollitem_t?items?[]?=?{?{?subscriber,?0,?ZMQ_POLLIN,?0?}?};????????int?tickless?=?(int)?((alarm?-?zclock_time?()));????????if?(tickless?<?0)????????????tickless?=?0;????????int?rc?=?zmq_poll?(items,?1,?tickless?*?ZMQ_POLL_MSEC);????????if?(rc?==?-1)????????????break;??????????????//??Context?has?been?shut?down????????if?(items?[0].revents?&?ZMQ_POLLIN)?{????????????kvmsg_t?*kvmsg?=?kvmsg_recv?(subscriber);????????????if?(!kvmsg)????????????????break;??????????//??Interrupted????????????//??Discard?out-of-sequence?kvmsgs,?incl.?heartbeats????????????if?(kvmsg_sequence?(kvmsg)?>?sequence)?{????????????????sequence?=?kvmsg_sequence?(kvmsg);????????????????kvmsg_store?(&kvmsg,?kvmap);????????????????printf?("I:?received?update=%d\n",?(int)?sequence);????????????}????????????else????????????????kvmsg_destroy?(&kvmsg);????????}????????//??If?we?timed-out,?generate?a?random?kvmsg????????if?(zclock_time?()?>=?alarm)?{????????????kvmsg_t?*kvmsg?=?kvmsg_new?(0);????????????kvmsg_fmt_key??(kvmsg,?"%s%d",?SUBTREE,?randof?(10000));????????????kvmsg_fmt_body?(kvmsg,?"%d",?randof?(1000000));????????????kvmsg_send?????(kvmsg,?publisher);????????????kvmsg_destroy?(&kvmsg);????????????alarm?=?zclock_time?()?+?1000;????????}????}????printf?("?Interrupted\n%d?messages?in\n",?(int)?sequence);????zhash_destroy?(&kvmap);????zctx_destroy?(&ctx);????return?0;}

(未完待续)

读书人网 >其他相关

热点推荐