Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
We truncated the diff of some files because they were too big. If you want to see the full diff for every file,
click here
.
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 37
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.2.1 +Version: 0.2.2 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware
View file
bonnie-0.2.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.2.2.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -24,8 +24,10 @@ """ from multiprocessing import Process +import os import random import re +import signal import sys import threading import time @@ -41,6 +43,8 @@ import worker class ZMQBroker(object): + running = False + def __init__(self): """ A ZMQ Broker for Bonnie. @@ -49,6 +53,8 @@ self.poller = zmq.Poller() self.routers = {} + self.router_processes = {} + self.collector_interests = [] def register(self, callback): callback({ '_all': self.run }) @@ -92,16 +98,12 @@ if bind_address == None: bind_address = default - setattr( - self, - '%s_router' % (name), - Process( - target=self._run_router_process, - args=(callback, bind_address) - ) + self.router_processes[name] = Process( + target=self._run_router_process, + args=(callback, bind_address) ) - getattr(self, '%s_router' % (name)).start() + self.router_processes[name].start() def find_router(self, socket): for name, attrs in self.routers.iteritems(): @@ -133,12 +135,21 @@ recv_multipart = self._cb_wcr_recv_multipart ) + self.running = True last_expire = time.time() last_state = time.time() - - while True: - # TODO: adjust polling timout according to the number of pending jobs - sockets = dict(self.poller.poll(100)) + poller_timeout = int(conf.get('broker', 'zmq_poller_timeout', 100)) + + while self.running: + try: + # TODO: adjust polling timout according to the number of pending jobs + sockets = dict(self.poller.poll(poller_timeout)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break + except Exception, e: + log.error("zmq.Poller error: %r", e) + sockets = dict() for socket, event in sockets.iteritems(): if event == zmq.POLLIN: @@ -155,6 +166,9 @@ self._send_worker_job(_worker.identity) if last_expire < (time.time() - 30): + for _collector in collector.select_by_state(b'READY'): + self._request_collector_state(_collector.identity) + collector.expire() worker.expire() job.unlock() @@ -179,7 +193,8 @@ 'jp': sum([jcp, jwp]), 'jwa': jwa, 'jwp': jwp, - 'jo': job.count_by_state(b'ORPHANED'), + 'jf': job.count_by_state(b'FAILED'), + 'jo': job.count_by_state(b'POSTPONED'), 'wb': worker.count_by_state(b'BUSY'), 'wr': worker.count_by_state(b'READY'), 'ws': worker.count_by_state(b'STALE'), @@ -190,15 +205,28 @@ log.info(""" Jobs: done=%(jd)d, pending=%(jp)d, alloc=%(ja)d, - orphaned=%(jo)d. + postponed=%(jo)d, failed=%(jf)d. Workers: ready=%(wr)d, busy=%(wb)d, stale=%(ws)d, pending=%(jwp)d, alloc=%(jwa)d. Collectors: ready=%(cr)d, busy=%(cb)d, stale=%(cs)d, pending=%(jcp)d, alloc=%(jca)d. Took: seconds=%(duration)s.""" % stats) + self._write_stats(stats) + last_state = time.time() + + log.info("Shutting down") + + for attrs in self.routers.values(): + attrs['router'].close() + + for proc in self.router_processes.values(): + proc.terminate() + + self.context.term() + def _cb_cr_recv_multipart(self, router, message): """ Receive a message on the Collector Router. @@ -241,15 +269,23 @@ def _cb_wcr_recv_multipart(self, router, message): log.debug("Worker Controller Router Message: %r" % (message), level=8) worker_identity = message[0] - cmd = message[1] + commands = message[1].split(" ") - if not hasattr(self, '_handle_wcr_%s' % (cmd)): - log.error("Unhandled WCR cmd %s" % (cmd)) - self._handle_wcr_unknown(router, identity, message[2:]) - return + # check for aggregated collector commands + collector_commands = [c for c in commands if c in self.collector_interests] - handler = getattr(self, '_handle_wcr_%s' % (cmd)) - handler(router, worker_identity, message[2:]) + if len(collector_commands) == len(commands): + self._handle_wcr_COLLECT(router, worker_identity, message[1], message[2:]) + else: + cmd = message[1] + + if not hasattr(self, '_handle_wcr_%s' % (cmd)): + log.error("Unknown WCR cmd %s for job %s" % (cmd, message[2])) + self._handle_wcr_UNKNOWN(router, worker_identity, message[2:]) + return + + handler = getattr(self, '_handle_wcr_%s' % (cmd)) + handler(router, worker_identity, message[2:]) ## ## Collector Router command functions @@ -293,7 +329,13 @@ log.debug("Handling STATE for identity %s (message: %r)" % (identity, message), level=7) state = message[0] - interests = message[1].split(",") + interests = message[1].split(" ") + + # register the reported interests + if len(interests) > 0: + _interests = list(interests) + _interests.extend(self.collector_interests) + self.collector_interests = list(set(_interests)) collector.set_state(identity, state, interests) @@ -321,19 +363,19 @@ self._send_worker_job(identity) - def _handle_wcr_COLLECT(self, router, identity, message): - log.debug("Handing COLLECT for identity %s (message: %r)" % (identity, message), level=7) + def _handle_wcr_COLLECT(self, router, identity, commands, message): + log.debug("Handing COLLECT for identity %s (commands: %r, message: %r)" % (identity, commands, message), level=7) job_uuid = message[0] - log.info("Job %s COLLECT by %s" % (job_uuid, identity)) + log.info("Job %s COLLECT (%r) by %s" % (job_uuid, commands, identity)) updates = dict( - cmd = message[1], + cmd = commands, state = b'PENDING', job_type = 'collector' ) - if len(message) > 2: - updates['notification'] = message[2] + if len(message) > 1: + updates['notification'] = message[1] job.update( job_uuid, @@ -348,23 +390,15 @@
View file
bonnie-0.2.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.2.2.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -30,6 +30,8 @@ from bonnie.broker.state import init_db, Collector, Job, Worker +MAX_RETRIES = 5 + def add(dealer, notification, job_type='worker'): """ Add a new job. @@ -203,4 +205,18 @@ for worker in db.query(Worker).filter_by(job=job.id).all(): worker.job = None + # process postponed jobs + for job in db.query(Job).filter( + Job.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 30)), + Job.state == b'POSTPONED' + ).all(): + + if job.pushbacks >= MAX_RETRIES: + log.error("Too many pushbacks for job %s" % (job.uuid)) + job.state = b'FAILED' + else: + log.debug("Re-activating postponed job %s" % (job.uuid), level=7) + job.state = b'PENDING' + job.pushbacks += 1 + db.commit()
View file
bonnie-0.2.1.tar.gz/bonnie/broker/state.py -> bonnie-0.2.2.tar.gz/bonnie/broker/state.py
Changed
@@ -134,21 +134,20 @@ self.identity = identity self.state = state -def init_db(name, reinit=False): +def init_db(name): """ Returns a SQLAlchemy Session() instance. """ global db - if not db == None and not reinit: + if not db == None: return db - if reinit: - import os - if os.path.isfile('sqlite:////var/lib/bonnie/state.db'): - os.unlink('sqlite:////var/lib/bonnie/state.db') + db_uri = conf.get('broker', 'state_sql_uri') + + if not db_uri: + db_uri = 'sqlite:////var/lib/bonnie/state.db' - db_uri = 'sqlite:////var/lib/bonnie/state.db' echo = conf.debuglevel > 8 try:
View file
bonnie-0.2.1.tar.gz/bonnie/collector/__init__.py -> bonnie-0.2.2.tar.gz/bonnie/collector/__init__.py
Changed
@@ -21,8 +21,11 @@ # import os +import sys import inputs import handlers +import multiprocessing +from distutils import version from bonnie.utils import parse_imap_uri from bonnie.daemon import BonnieDaemon @@ -41,20 +44,42 @@ self.input_modules = {} self.handler_interests = {} - self.handler_modules = {} - def execute(self, commands, notification): + num_threads = int(conf.get('collector', 'num_threads', 5)) + + if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): + self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, (), 1) + else: + self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, ()) + + def _worker_process_start(self, *args, **kw): + log.debug("Worker process %s initializing" % (multiprocessing.current_process().name), level=1) + + def execute(self, commands, job_uuid, notification): """ Dispatch collector job to the according handler(s) """ log.debug("Executing collection for %s" % (commands), level=8) - for command in commands.split(): - if self.handler_interests.has_key(command): - for interest in self.handler_interests[command]: - notification = interest['callback'](notification=notification) - - return notification + # execute this asynchronously in a child process + self.pool.apply_async( + async_execute_handlers, + ( + commands.split(), + notification, + job_uuid + ), + callback = self._execute_callback + ) + + def _execute_callback(self, result): + (notification, job_uuid) = result + + # pass result back to input module(s) + input_modules = conf.get('collector', 'input_modules').split(',') + for _input in self.input_modules.values(): + if _input.name() in input_modules: + _input.callback_done(job_uuid, notification) def register_input(self, interests): self.input_interests = interests @@ -67,17 +92,14 @@ self.handler_interests[interest].append(how) def run(self): - # TODO: read active input module from config collector.input_modules for _class in inputs.list_classes(): module = _class() module.register(callback=self.register_input) self.input_modules[_class] = module - # TODO: read active handler module from config collector.handler_modules for _class in handlers.list_classes(): handler = _class() handler.register(callback=self.register_handler) - self.handler_modules[_class] = handler input_modules = conf.get('collector', 'input_modules').split(',') for _input in self.input_modules.values(): @@ -91,3 +113,35 @@ else: _input.running = False + self.pool.close() + + +def async_execute_handlers(commands, notification, job_uuid): + """ + Routine to execute handlers for the given commands and notification + + To be run an an asynchronous child process. + """ + # register handlers with the interrests again in this subprocess + handler_interests = {} + + def register_handler(interests={}): + for interest,how in interests.iteritems(): + if not handler_interests.has_key(interest): + handler_interests[interest] = [] + + handler_interests[interest].append(how) + + for _class in handlers.list_classes(): + handler = _class() + handler.register(callback=register_handler) + + log.debug("async_execute_handlers %r for job %r" % (commands, job_uuid), level=8) + + for command in commands: + if handler_interests.has_key(command): + for interest in handler_interests[command]: + notification = interest['callback'](notification=notification) + + return (notification, job_uuid) +
View file
bonnie-0.2.1.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.2.2.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -31,6 +31,7 @@ import socket import time import zmq +from zmq.eventloop import ioloop, zmqstream import bonnie conf = bonnie.getConf() @@ -38,7 +39,6 @@ class ZMQInput(object): state = b"READY" - running = False def __init__(self, *args, **kw): self.interests = [] @@ -54,8 +54,7 @@ self.collector.identity = (self.identity).encode('ascii') self.collector.connect(zmq_broker_address) - self.poller = zmq.Poller() - self.poller.register(self.collector, zmq.POLLIN) + self.stream = zmqstream.ZMQStream(self.collector) def name(self): return 'zmq_input' @@ -71,45 +70,37 @@ def run(self, callback=None, interests=[]): log.info("[%s] starting", self.identity) - self.running = True - # report READY state with interests self.interests = interests + self.notify_callback = callback self.report_state() - while self.running: - try: - sockets = dict(self.poller.poll(1)) - except KeyboardInterrupt, e: - log.info("zmq.Poller KeyboardInterrupt") - break - except Exception, e: - log.error("zmq.Poller error: %r", e) - sockets = dict() - - if self.report_timestamp < (time.time() - 60): - self.report_state() - - if self.collector in sockets: - if sockets[self.collector] == zmq.POLLIN: - _message = self.collector.recv_multipart() + self.stream.on_recv(self._cb_on_recv_multipart) + ioloop.IOLoop.instance().start() - if _message[0] == b"STATE": - self.report_state() + def _cb_on_recv_multipart(self, message): + """ + Receive a message on the Collector Router. + """ + log.debug("Collector Router Message: %r" % (message), level=8) + collector_identity = message[0] - else: - if not self.state == b"READY": - self.report_state() + if message[0] == b"STATE" or not self.state == b"READY": + self.report_state() + else: + job_uuid = message[1] + notification = message[2] - else: - _job_uuid = _message[1] - _notification = _message[2] + if not self.notify_callback == None: + self.notify_callback(message[0], job_uuid, notification) - if not callback == None: - result = callback(_message[0], _notification) - - self.report_timestamp = time.time() - self.collector.send_multipart([b"DONE", _job_uuid, result]) + def callback_done(self, job_uuid, result): + log.debug("Handler callback done for job %s: %r" % (job_uuid, result), level=8) + self.report_timestamp = time.time() + self.collector.send_multipart([b"DONE", job_uuid, result]) + log.debug("Handler results sent for job %s" % (job_uuid), level=7) + def terminate(self): log.info("[%s] shutting down", self.identity) + ioloop.IOLoop.instance().stop() self.collector.close()
View file
bonnie-0.2.1.tar.gz/bonnie/conf.py -> bonnie-0.2.2.tar.gz/bonnie/conf.py
Changed
@@ -171,18 +171,7 @@ self.read_config() return self.cfg_parser.get(section, key) - if hasattr(self, "get_%s_%s" % (section,key)): - try: - exec("retval = self.get_%s_%s(quiet)" % (section,key)) - except Exception, e: - log.error(_("Could not execute configuration function: %s") % ("get_%s_%s(quiet=%r)" % (section,key,quiet))) - return None - - return retval - - if quiet: - return "" - else: + if not quiet: log.warning(_("Option %s/%s does not exist in config file %s, pulling from defaults") % (section, key, self.config_file)) if hasattr(self.defaults, "%s_%s" % (section,key)): return getattr(self.defaults, "%s_%s" % (section,key)) @@ -192,10 +181,10 @@ return _dict[key] else: log.warning(_("Option does not exist in defaults.")) - return None else: log.warning(_("Option does not exist in defaults.")) - return None + + return default def load_config(self, config): """
View file
bonnie-0.2.1.tar.gz/bonnie/worker/handlers/logout.py -> bonnie-0.2.2.tar.gz/bonnie/worker/handlers/logout.py
Changed
@@ -78,6 +78,6 @@ time.sleep(1) # wait for storage and try again # push back into the job queue, the corresponding Login event may not yet have been processed. - return (notification, [b"PUSHBACK"]) + return (notification, [b"POSTPONE"]) return super(LogoutHandler, self).run(notification) \ No newline at end of file
View file
bonnie-0.2.1.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.2.2.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -92,9 +92,11 @@ self.lastping = time.time() self.report_state() + poller_timeout = int(conf.get('worker', 'zmq_poller_timeout', 100)) + while self.running: try: - sockets = dict(self.poller.poll(1)) + sockets = dict(self.poller.poll(poller_timeout)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -143,7 +145,7 @@ self.controller.send_multipart([b"DONE", self.job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) - self.controller.send_multipart([b"COLLECT", self.job_uuid, b" ".join(jobs)]) + self.controller.send_multipart([b" ".join(jobs), self.job_uuid]) self.set_state_ready()
View file
bonnie-0.2.1.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.2.2.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -8,6 +8,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.ElasticSearchOutput') class ElasticSearchOutput(object): def __init__(self, *args, **kw): @@ -79,6 +80,7 @@ notification['@timestamp'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") index = 'logstash-%s' % (datetime.datetime.strftime(timestamp, "%Y.%m.%d")) + jobs = [] # for notifications concerning multiple messages, create entries for each message if notification.has_key('messageHeaders') and isinstance(notification['messageHeaders'], dict) and len(notification['messageHeaders']) > 0: @@ -94,16 +96,26 @@ # remove vnd.cmu.envelope if we have headers notification.pop('vnd.cmu.envelope', None) + try: + self.es.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, e: + log.warning("ES create exception: %r", e) + jobs.append(b'POSTPONE') + break + + else: + try: self.es.create( index=index, doc_type='logs', body=self.notification2log(notification) ) - else: - self.es.create( - index=index, - doc_type='logs', - body=self.notification2log(notification) - ) + except Exception, e: + log.warning("ES create exception: %r", e) + jobs.append(b'POSTPONE') - return (notification, []) + return (notification, jobs)
View file
bonnie-0.2.1.tar.gz/conf/bonnie.conf -> bonnie-0.2.2.tar.gz/conf/bonnie.conf
Changed
@@ -3,11 +3,14 @@ zmq_collector_router_bind_address = tcp://*:5571 zmq_worker_controller_router_bind_address = tcp://*:5572 zmq_worker_router_bind_address = tcp://*:5573 -persistence_sql_uri = sqlite:// +zmq_poller_timeout = 100 +state_sql_uri = sqlite:////var/lib/bonnie/state.db [collector] +num_threads = 5 input_modules = zmq_input zmq_broker_address = tcp://localhost:5571 +zmq_poller_timeout = 100 [dealer] output_modules = zmq_output @@ -24,6 +27,7 @@ output_exclude_events = MessageExpunge zmq_controller_address = tcp://localhost:5572 zmq_worker_router_address = tcp://localhost:5573 +zmq_poller_timeout = 100 elasticsearch_output_address = localhost elasticsearch_storage_address = localhost
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.