zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅者的消费速度慢,则会造成发布者端队列堆积,怎么办?本篇即是针对可能出现的"蜗牛"般的订阅者而生。
通常的做法:
在发布端用靠谱的队列承接来不及被消费的信息,这样会增大发布端的压力。
在订阅端用靠谱的队列承接来不及消费的信息,压力转嫁给各订阅端。
与前面相似,在队列中设置阈值,溢出则不收录。
压迫订阅端,当发布端发现订阅端过慢,给予惩罚性质的断开连接。
虽然上述4种方案都还算经典,不过总有欠妥之处。最好的方法莫过于让订阅者明白自个儿能力不足,作出主动性措施,比如:暂时性断开,优化了再来~
至于如何让订阅端检测是否消费能力不足,只需在订阅端设置一个有阈值的队列缓存数据,发布端给所有数据打上标号,如果订阅端读到“断号”,即可认定有数据超出阈值被舍弃了,那么,嘿嘿,主动整改吧。
虽然说中断订阅连接不是太理想,不过,总比不可知,不可控的一味流转数据要可靠的多,只少,都还掌控在自己手中~
////??Suicidal?Snail//#include?"czmq.h"//??---------------------------------//??This?is?our?subscriber//??It?connects?to?the?publisher?and?subscribes?to?everything.?It//??sleeps?for?a?short?time?between?messages?to?simulate?doing?too//??much?work.?If?a?message?is?more?than?1?second?late,?it?croaks.#define?MAX_ALLOWED_DELAY???1000????//??msecsstatic?voidsubscriber?(void?*args,?zctx_t?*ctx,?void?*pipe){????//??Subscribe?to?everything????void?*subscriber?=?zsocket_new?(ctx,?ZMQ_SUB);????zsocket_connect?(subscriber,?"tcp://localhost:5556");????//??Get?and?process?messages????while?(1)?{????????char?*string?=?zstr_recv?(subscriber);????????int64_t?clock;????????int?terms?=?sscanf?(string,?"%"?PRId64,?&clock);????????assert?(terms?==?1);????????free?(string);????????//??Suicide?snail?logic????????if?(zclock_time?()?-?clock?>?MAX_ALLOWED_DELAY)?{????????????fprintf?(stderr,?"E:?subscriber?cannot?keep?up,?aborting\n");????????????break;????????}????????//??Work?for?1?msec?plus?some?random?additional?time????????zclock_sleep?(1?+?randof?(2));????}????zstr_send?(pipe,?"gone?and?died");}//??---------------------------------//??This?is?our?server?task//??It?publishes?a?time-stamped?message?to?its?pub?socket?every?1ms.static?voidpublisher?(void?*args,?zctx_t?*ctx,?void?*pipe){????//??Prepare?publisher????void?*publisher?=?zsocket_new?(ctx,?ZMQ_PUB);????zsocket_bind?(publisher,?"tcp://*:5556");????while?(1)?{????????//??Send?current?clock?(msecs)?to?subscribers????????char?string?[20];????????sprintf?(string,?"%"?PRId64,?zclock_time?());????????zstr_send?(publisher,?string);????????char?*signal?=?zstr_recv_nowait?(pipe);????????if?(signal)?{????????????free?(signal);????????????break;????????}????????zclock_sleep?(1);????????????//??1msec?wait????}}//??This?main?thread?simply?starts?a?client,?and?a?server,?and?then//??waits?for?the?client?to?signal?it's?died.//int?main?(void){????zctx_t?*ctx?=?zctx_new?();????void?*pubpipe?=?zthread_fork?(ctx,?publisher,?NULL);????void?*subpipe?=?zthread_fork?(ctx,?subscriber,?NULL);????free?(zstr_recv?(subpipe));????zstr_send?(pubpipe,?"break");????zclock_sleep?(100);????zctx_destroy?(&ctx);????return?0;}注意:
这里并未用到数据编号,而是打上了时间戳来判定数据是否延迟到不可接受的地步了,如何自我判定,方法万千,择优即可。
(未完待续)