zeroMQ初体验-7.优雅的卸载工作进程
关掉一个进程有很多种方式,而在ZeroMQ中则推崇通过使用信号通知,可控的卸载、关闭进程。在这里,要援引之前的"分而治之"例子(具体可以见这里)。
例图:
显然,信号发送是由能够掌握整个进度的"水槽"(下游)来控制,在原有基础上做少许变更即可。
Worker(数据处理):
import sysimport timeimport zmqcontext = zmq.Context()receiver = context.socket(zmq.PULL)receiver.connect("tcp://localhost:5557")sender = context.socket(zmq.PUSH)sender.connect("tcp://localhost:5558")controller = context.socket(zmq.SUB)controller.connect("tcp://localhost:5559")controller.setsockopt(zmq.SUBSCRIBE, "")poller = zmq.Poller()poller.register(receiver, zmq.POLLIN)poller.register(controller, zmq.POLLIN)while True: socks = dict(poller.poll()) if socks.get(receiver) == zmq.POLLIN: message = receiver.recv() workload = int(message) # Workload in msecs time.sleep(workload / 1000.0) sender.send(message) sys.stdout.write(".") sys.stdout.flush() if socks.get(controller) == zmq.POLLIN: break
水槽(下游):
import sysimport timeimport zmqcontext = zmq.Context()receiver = context.socket(zmq.PULL)receiver.bind("tcp://*:5558")controller = context.socket(zmq.PUB)controller.bind("tcp://*:5559")receiver.recv()tstart = time.time()for task_nbr in xrange(100): receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(":") else: sys.stdout.write(".") sys.stdout.flush()tend = time.time()tdiff = tend - tstarttotal_msec = tdiff * 1000print "Total elapsed time: %d msec" % total_mseccontroller.send("KILL")time.sleep(1)
注意:
正常情况下,即使进程被关闭,可能端口并没有被清除(那是有ZeroMQ维护的),原文中调用了这么两句
zmq_close (server)
zmq_term (context)
python中对应为zmq.close(),zmq.term(),不过python的垃圾回收会替俺们解决后顾之忧的~
(未完待续)