读书人

zeroMQ初体验-2.公布订阅模式(pub/sub)

发布时间: 2012-09-08 10:48:07 作者: rapoo

zeroMQ初体验-2.发布订阅模式(pub/sub)
pub/sub模式:



发布端(pub)

import itertoolsimport sys  import time             import zmq              def main():     if len (sys.argv) != 2:        print 'usage: publisher <bind-to>'        sys.exit (1)        bind_to = sys.argv[1]        all_topics = ['sports.general','sports.football','sports.basketball',                  'stocks.general','stocks.GOOG','stocks.AAPL',                  'weather']        ctx = zmq.Context()    s = ctx.socket(zmq.PUB)    s.bind(bind_to)    print "Starting broadcast on topics:"    print "   %s" % all_topics    print "Hit Ctrl-C to stop broadcasting."    print "Waiting so subscriber sockets can connect..."    print    time.sleep(1.0)        msg_counter = itertools.count()    try:        for topic in itertools.cycle(all_topics):            msg_body = str(msg_counter.next())            print '   Topic: %s, msg:%s' % (topic, msg_body)            #s.send_multipart([topic, msg_body])            s.send_pyobj([topic, msg_body])            # short wait so we don't hog the cpu            time.sleep(0.1)    except KeyboardInterrupt:        pass    print "Waiting for message queues to flush..."    time.sleep(0.5)    s.close()    print "Done."if __name__ == "__main__":    main()


订阅端(sub):
import sysimport timeimport zmqdef main():    if len (sys.argv) < 2:        print 'usage: subscriber <connect_to> [topic topic ...]'        sys.exit (1)    connect_to = sys.argv[1]    topics = sys.argv[2:]    ctx = zmq.Context()    s = ctx.socket(zmq.SUB)    s.connect(connect_to)    # manage subscriptions    if not topics:        print "Receiving messages on ALL topics..."        s.setsockopt(zmq.SUBSCRIBE,'')    else:        print "Receiving messages on topics: %s ..." % topics        for t in topics:            s.setsockopt(zmq.SUBSCRIBE,t)    print    try:        while True:            #topic, msg = s.recv_multipart()            topic, msg = s.recv_pyobj()            print '   Topic: %s, msg:%s' % (topic, msg)    except KeyboardInterrupt:        pass    print "Done."if __name__ == "__main__":    main()


注意:
这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
官网还提供了一种可能出现的问题:当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积(有朋友指出是堆积在消费端,或许是新版本改进,需要读者的尝试和反馈,thx!),显然,这是不可以被接受的。至于解决方案,或许后面的"分而治之"就是吧。

(未完待续)
1 楼 guozhiwei 2011-12-06 "当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的. 2 楼 guozhiwei 2011-12-06 guozhiwei 写道"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.



我刚才试验了 是在订阅端堆积的.. 3 楼 iyuan 2011-12-07 guozhiwei 写道guozhiwei 写道"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.



我刚才试验了 是在订阅端堆积的..

现在zmq好像版本很高了,文档已经陈旧了。谢谢你的反馈,相关内容已经做出修正~

读书人网 >互联网

热点推荐