Projects
Kolab:3.4
bonnie
0002-Upgrade-the-ZMQ-output-to-run-under-a-torn...
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0002-Upgrade-the-ZMQ-output-to-run-under-a-tornado-ioloop.patch of Package bonnie (Revision 33)
Currently displaying revision
33
,
Show latest
From 15b69a5c85dd5d96bbc664e4050379b847eeb13a Mon Sep 17 00:00:00 2001 From: "Jeroen van Meeuwen (Kolab Systems)" <vanmeeuwen@kolabsys.com> Date: Fri, 21 Nov 2014 22:39:56 +0100 Subject: [PATCH 2/2] Upgrade the ZMQ output to run under a tornado ioloop rather than polling --- bonnie/dealer/outputs/zmq_output.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/bonnie/dealer/outputs/zmq_output.py b/bonnie/dealer/outputs/zmq_output.py index d5783ee..cd5b117 100644 --- a/bonnie/dealer/outputs/zmq_output.py +++ b/bonnie/dealer/outputs/zmq_output.py @@ -23,6 +23,7 @@ import os import socket import zmq +from zmq.eventloop import ioloop, zmqstream import bonnie conf = bonnie.getConf() @@ -32,6 +33,8 @@ class ZMQOutput(object): def __init__(self, *args, **kw): self.context = zmq.Context() + ioloop.install() + zmq_broker_address = conf.get('dealer', 'zmq_broker_address') if zmq_broker_address == None: @@ -41,8 +44,8 @@ class ZMQOutput(object): self.dealer.identity = (u"Dealer-%s-%s" % (socket.getfqdn(), os.getpid())).encode('ascii') self.dealer.connect(zmq_broker_address) - self.poller = zmq.Poller() - self.poller.register(self.dealer, zmq.POLLIN) + self.dealer_stream = zmqstream.ZMQStream(self.dealer) + self.dealer_stream.on_recv(self.stop) def name(self): return 'zmq_output' @@ -54,15 +57,13 @@ class ZMQOutput(object): log.debug("[%s] Notification received: %r" % (self.dealer.identity, notification), level=9) self.dealer.send(notification) - received_reply = False - while not received_reply: - sockets = dict(self.poller.poll(1000)) - if self.dealer in sockets: - if sockets[self.dealer] == zmq.POLLIN: - _reply = self.dealer.recv_multipart() - log.debug("[%s] Reply: %r" % (self.dealer.identity, _reply), level=9) - if _reply[0] == b"ACK": - received_reply = True + ioloop.IOLoop.instance().start() + + def stop(self, message, *args, **kw): + cmd = message[0] + + if not cmd == b'ACL': + log.error("Unknown cmd %s" % (cmd)) self.dealer.close() self.context.term() -- 1.9.3
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.