Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 37
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.2.1 +Version: 0.2.2 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware
View file
bonnie-0.2.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.2.2.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -24,8 +24,10 @@ """ from multiprocessing import Process +import os import random import re +import signal import sys import threading import time @@ -41,6 +43,8 @@ import worker class ZMQBroker(object): + running = False + def __init__(self): """ A ZMQ Broker for Bonnie. @@ -49,6 +53,8 @@ self.poller = zmq.Poller() self.routers = {} + self.router_processes = {} + self.collector_interests = [] def register(self, callback): callback({ '_all': self.run }) @@ -92,16 +98,12 @@ if bind_address == None: bind_address = default - setattr( - self, - '%s_router' % (name), - Process( - target=self._run_router_process, - args=(callback, bind_address) - ) + self.router_processes[name] = Process( + target=self._run_router_process, + args=(callback, bind_address) ) - getattr(self, '%s_router' % (name)).start() + self.router_processes[name].start() def find_router(self, socket): for name, attrs in self.routers.iteritems(): @@ -133,12 +135,21 @@ recv_multipart = self._cb_wcr_recv_multipart ) + self.running = True last_expire = time.time() last_state = time.time() - - while True: - # TODO: adjust polling timout according to the number of pending jobs - sockets = dict(self.poller.poll(100)) + poller_timeout = int(conf.get('broker', 'zmq_poller_timeout', 100)) + + while self.running: + try: + # TODO: adjust polling timout according to the number of pending jobs + sockets = dict(self.poller.poll(poller_timeout)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break + except Exception, e: + log.error("zmq.Poller error: %r", e) + sockets = dict() for socket, event in sockets.iteritems(): if event == zmq.POLLIN: @@ -155,6 +166,9 @@ self._send_worker_job(_worker.identity) if last_expire < (time.time() - 30): + for _collector in collector.select_by_state(b'READY'): + self._request_collector_state(_collector.identity) + collector.expire() worker.expire() job.unlock() @@ -179,7 +193,8 @@ 'jp': sum([jcp, jwp]), 'jwa': jwa, 'jwp': jwp, - 'jo': job.count_by_state(b'ORPHANED'), + 'jf': job.count_by_state(b'FAILED'), + 'jo': job.count_by_state(b'POSTPONED'), 'wb': worker.count_by_state(b'BUSY'), 'wr': worker.count_by_state(b'READY'), 'ws': worker.count_by_state(b'STALE'), @@ -190,15 +205,28 @@ log.info(""" Jobs: done=%(jd)d, pending=%(jp)d, alloc=%(ja)d, - orphaned=%(jo)d. + postponed=%(jo)d, failed=%(jf)d. Workers: ready=%(wr)d, busy=%(wb)d, stale=%(ws)d, pending=%(jwp)d, alloc=%(jwa)d. Collectors: ready=%(cr)d, busy=%(cb)d, stale=%(cs)d, pending=%(jcp)d, alloc=%(jca)d. Took: seconds=%(duration)s.""" % stats) + self._write_stats(stats) + last_state = time.time() + + log.info("Shutting down") + + for attrs in self.routers.values(): + attrs['router'].close() + + for proc in self.router_processes.values(): + proc.terminate() + + self.context.term() + def _cb_cr_recv_multipart(self, router, message): """ Receive a message on the Collector Router. @@ -241,15 +269,23 @@ def _cb_wcr_recv_multipart(self, router, message): log.debug("Worker Controller Router Message: %r" % (message), level=8) worker_identity = message[0] - cmd = message[1] + commands = message[1].split(" ") - if not hasattr(self, '_handle_wcr_%s' % (cmd)): - log.error("Unhandled WCR cmd %s" % (cmd)) - self._handle_wcr_unknown(router, identity, message[2:]) - return + # check for aggregated collector commands + collector_commands = [c for c in commands if c in self.collector_interests] - handler = getattr(self, '_handle_wcr_%s' % (cmd)) - handler(router, worker_identity, message[2:]) + if len(collector_commands) == len(commands): + self._handle_wcr_COLLECT(router, worker_identity, message[1], message[2:]) + else: + cmd = message[1] + + if not hasattr(self, '_handle_wcr_%s' % (cmd)): + log.error("Unknown WCR cmd %s for job %s" % (cmd, message[2])) + self._handle_wcr_UNKNOWN(router, worker_identity, message[2:]) + return + + handler = getattr(self, '_handle_wcr_%s' % (cmd)) + handler(router, worker_identity, message[2:]) ## ## Collector Router command functions @@ -293,7 +329,13 @@ log.debug("Handling STATE for identity %s (message: %r)" % (identity, message), level=7) state = message[0] - interests = message[1].split(",") + interests = message[1].split(" ") + + # register the reported interests + if len(interests) > 0: + _interests = list(interests) + _interests.extend(self.collector_interests) + self.collector_interests = list(set(_interests)) collector.set_state(identity, state, interests) @@ -321,19 +363,19 @@ self._send_worker_job(identity) - def _handle_wcr_COLLECT(self, router, identity, message): - log.debug("Handing COLLECT for identity %s (message: %r)" % (identity, message), level=7) + def _handle_wcr_COLLECT(self, router, identity, commands, message): + log.debug("Handing COLLECT for identity %s (commands: %r, message: %r)" % (identity, commands, message), level=7) job_uuid = message[0] - log.info("Job %s COLLECT by %s" % (job_uuid, identity)) + log.info("Job %s COLLECT (%r) by %s" % (job_uuid, commands, identity)) updates = dict( - cmd = message[1], + cmd = commands, state = b'PENDING', job_type = 'collector' ) - if len(message) > 2: - updates['notification'] = message[2] + if len(message) > 1: + updates['notification'] = message[1] job.update( job_uuid, @@ -348,23 +390,15 @@ self._send_worker_job(identity) - def _handle_wcr_PUSHBACK(self, router, identity, message): - log.debug("Handing PUSHBACK for identity %s (message: %r)" % (identity, message), level=8) + def _handle_wcr_POSTPONE(self, router, identity, message): + log.debug("Handing POSTPONE for identity %s (message: %r)" % (identity, message), level=7) job_uuid = message[0] - _job = job.select(job_uuid) + log.info("Job %s POSTPONE by %s" % (job_uuid, identity)) - if _job is not None and _job.pushbacks < 5: - job.update( - job_uuid, - state = b'PENDING', - pushbacks = _job.pushbacks + 1 - ) - else: - log.error("Job %s pushed back too many times" % (job_uuid)) - job.update( - job_uuid, - state = b'ORPHANED' - ) + job.update( + job_uuid, + state = b'POSTPONED' + ) worker.update( identity, @@ -403,11 +437,11 @@ if state == b'READY': self._send_worker_job(identity) - def _handle_wcr_unknown(self, router, identity, message): + def _handle_wcr_UNKNOWN(self, router, identity, message): job_uuid = message[0] job.update( job_uuid, - state = b'ORPHANED' + state = b'FAILED' ) worker.update( @@ -445,7 +479,21 @@ stream = zmqstream.ZMQStream(router) stream.on_recv_stream(callback) - ioloop.IOLoop.instance().start() + + # catch sigterm and terminate the ioloop + def _terminate(*args, **kw): + log.info("ioloop.IOLoop shutting down") + ioloop.IOLoop.instance().stop() + + signal.signal(signal.SIGTERM, _terminate) + + try: + ioloop.IOLoop.instance().start() + except KeyboardInterrupt, e: + log.info("ioloop.IOLoop KeyboardInterrupt") + except Exception, e: + log.error("ioloop.IOLoop error: %r", e) + def _send_collector_job(self, identity): _job = job.select_for_collector(identity) @@ -491,3 +539,36 @@ (_job.uuid).encode('ascii') ] ) + + def _write_stats(self, stats): + if os.access("/var/lib/bonnie/state.state", os.W_OK): + try: + fp = open("/var/lib/bonnie/state.stats", "w") + fp.write("""# Source this file in your script +jobs_done=%(jd)d +jobs_pending=%(jp)d +jobs_alloc=%(ja)d +jobs_orphaned=%(jo)d +workers_ready=%(wr)d +workers_busy=%(wb)d +workers_stale=%(ws)d +collectors_ready=%(cr)d +collectors_busy=%(cb)d +collectors_stale=%(cs)d +collector_jobs_pending=%(jcp)d +collector_jobs_alloc=%(jcp)d +worker_jobs_pending=%(jwp)d +worker_jobs_alloc=%(jwa)d +""" % stats) + fp.close() + + except Exception, errmsg: + log.error( + "An error occurred writing out the stats file:\n%s" % ( + errmsg + ) + ) + + def _request_collector_state(self, identity): + log.debug("Requesting state from %s" % (identity), level=7) + self.routers['collector']['router'].send_multipart([identity.encode('ascii'), b"STATE"])
View file
bonnie-0.2.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.2.2.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -30,6 +30,8 @@ from bonnie.broker.state import init_db, Collector, Job, Worker +MAX_RETRIES = 5 + def add(dealer, notification, job_type='worker'): """ Add a new job. @@ -203,4 +205,18 @@ for worker in db.query(Worker).filter_by(job=job.id).all(): worker.job = None + # process postponed jobs + for job in db.query(Job).filter( + Job.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 30)), + Job.state == b'POSTPONED' + ).all(): + + if job.pushbacks >= MAX_RETRIES: + log.error("Too many pushbacks for job %s" % (job.uuid)) + job.state = b'FAILED' + else: + log.debug("Re-activating postponed job %s" % (job.uuid), level=7) + job.state = b'PENDING' + job.pushbacks += 1 + db.commit()
View file
bonnie-0.2.1.tar.gz/bonnie/broker/state.py -> bonnie-0.2.2.tar.gz/bonnie/broker/state.py
Changed
@@ -134,21 +134,20 @@ self.identity = identity self.state = state -def init_db(name, reinit=False): +def init_db(name): """ Returns a SQLAlchemy Session() instance. """ global db - if not db == None and not reinit: + if not db == None: return db - if reinit: - import os - if os.path.isfile('sqlite:////var/lib/bonnie/state.db'): - os.unlink('sqlite:////var/lib/bonnie/state.db') + db_uri = conf.get('broker', 'state_sql_uri') + + if not db_uri: + db_uri = 'sqlite:////var/lib/bonnie/state.db' - db_uri = 'sqlite:////var/lib/bonnie/state.db' echo = conf.debuglevel > 8 try:
View file
bonnie-0.2.1.tar.gz/bonnie/collector/__init__.py -> bonnie-0.2.2.tar.gz/bonnie/collector/__init__.py
Changed
@@ -21,8 +21,11 @@ # import os +import sys import inputs import handlers +import multiprocessing +from distutils import version from bonnie.utils import parse_imap_uri from bonnie.daemon import BonnieDaemon @@ -41,20 +44,42 @@ self.input_modules = {} self.handler_interests = {} - self.handler_modules = {} - def execute(self, commands, notification): + num_threads = int(conf.get('collector', 'num_threads', 5)) + + if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): + self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, (), 1) + else: + self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, ()) + + def _worker_process_start(self, *args, **kw): + log.debug("Worker process %s initializing" % (multiprocessing.current_process().name), level=1) + + def execute(self, commands, job_uuid, notification): """ Dispatch collector job to the according handler(s) """ log.debug("Executing collection for %s" % (commands), level=8) - for command in commands.split(): - if self.handler_interests.has_key(command): - for interest in self.handler_interests[command]: - notification = interest['callback'](notification=notification) - - return notification + # execute this asynchronously in a child process + self.pool.apply_async( + async_execute_handlers, + ( + commands.split(), + notification, + job_uuid + ), + callback = self._execute_callback + ) + + def _execute_callback(self, result): + (notification, job_uuid) = result + + # pass result back to input module(s) + input_modules = conf.get('collector', 'input_modules').split(',') + for _input in self.input_modules.values(): + if _input.name() in input_modules: + _input.callback_done(job_uuid, notification) def register_input(self, interests): self.input_interests = interests @@ -67,17 +92,14 @@ self.handler_interests[interest].append(how) def run(self): - # TODO: read active input module from config collector.input_modules for _class in inputs.list_classes(): module = _class() module.register(callback=self.register_input) self.input_modules[_class] = module - # TODO: read active handler module from config collector.handler_modules for _class in handlers.list_classes(): handler = _class() handler.register(callback=self.register_handler) - self.handler_modules[_class] = handler input_modules = conf.get('collector', 'input_modules').split(',') for _input in self.input_modules.values(): @@ -91,3 +113,35 @@ else: _input.running = False + self.pool.close() + + +def async_execute_handlers(commands, notification, job_uuid): + """ + Routine to execute handlers for the given commands and notification + + To be run an an asynchronous child process. + """ + # register handlers with the interrests again in this subprocess + handler_interests = {} + + def register_handler(interests={}): + for interest,how in interests.iteritems(): + if not handler_interests.has_key(interest): + handler_interests[interest] = [] + + handler_interests[interest].append(how) + + for _class in handlers.list_classes(): + handler = _class() + handler.register(callback=register_handler) + + log.debug("async_execute_handlers %r for job %r" % (commands, job_uuid), level=8) + + for command in commands: + if handler_interests.has_key(command): + for interest in handler_interests[command]: + notification = interest['callback'](notification=notification) + + return (notification, job_uuid) +
View file
bonnie-0.2.1.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.2.2.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -31,6 +31,7 @@ import socket import time import zmq +from zmq.eventloop import ioloop, zmqstream import bonnie conf = bonnie.getConf() @@ -38,7 +39,6 @@ class ZMQInput(object): state = b"READY" - running = False def __init__(self, *args, **kw): self.interests = [] @@ -54,8 +54,7 @@ self.collector.identity = (self.identity).encode('ascii') self.collector.connect(zmq_broker_address) - self.poller = zmq.Poller() - self.poller.register(self.collector, zmq.POLLIN) + self.stream = zmqstream.ZMQStream(self.collector) def name(self): return 'zmq_input' @@ -71,45 +70,37 @@ def run(self, callback=None, interests=[]): log.info("[%s] starting", self.identity) - self.running = True - # report READY state with interests self.interests = interests + self.notify_callback = callback self.report_state() - while self.running: - try: - sockets = dict(self.poller.poll(1)) - except KeyboardInterrupt, e: - log.info("zmq.Poller KeyboardInterrupt") - break - except Exception, e: - log.error("zmq.Poller error: %r", e) - sockets = dict() - - if self.report_timestamp < (time.time() - 60): - self.report_state() - - if self.collector in sockets: - if sockets[self.collector] == zmq.POLLIN: - _message = self.collector.recv_multipart() + self.stream.on_recv(self._cb_on_recv_multipart) + ioloop.IOLoop.instance().start() - if _message[0] == b"STATE": - self.report_state() + def _cb_on_recv_multipart(self, message): + """ + Receive a message on the Collector Router. + """ + log.debug("Collector Router Message: %r" % (message), level=8) + collector_identity = message[0] - else: - if not self.state == b"READY": - self.report_state() + if message[0] == b"STATE" or not self.state == b"READY": + self.report_state() + else: + job_uuid = message[1] + notification = message[2] - else: - _job_uuid = _message[1] - _notification = _message[2] + if not self.notify_callback == None: + self.notify_callback(message[0], job_uuid, notification) - if not callback == None: - result = callback(_message[0], _notification) - - self.report_timestamp = time.time() - self.collector.send_multipart([b"DONE", _job_uuid, result]) + def callback_done(self, job_uuid, result): + log.debug("Handler callback done for job %s: %r" % (job_uuid, result), level=8) + self.report_timestamp = time.time() + self.collector.send_multipart([b"DONE", job_uuid, result]) + log.debug("Handler results sent for job %s" % (job_uuid), level=7) + def terminate(self): log.info("[%s] shutting down", self.identity) + ioloop.IOLoop.instance().stop() self.collector.close()
View file
bonnie-0.2.1.tar.gz/bonnie/conf.py -> bonnie-0.2.2.tar.gz/bonnie/conf.py
Changed
@@ -171,18 +171,7 @@ self.read_config() return self.cfg_parser.get(section, key) - if hasattr(self, "get_%s_%s" % (section,key)): - try: - exec("retval = self.get_%s_%s(quiet)" % (section,key)) - except Exception, e: - log.error(_("Could not execute configuration function: %s") % ("get_%s_%s(quiet=%r)" % (section,key,quiet))) - return None - - return retval - - if quiet: - return "" - else: + if not quiet: log.warning(_("Option %s/%s does not exist in config file %s, pulling from defaults") % (section, key, self.config_file)) if hasattr(self.defaults, "%s_%s" % (section,key)): return getattr(self.defaults, "%s_%s" % (section,key)) @@ -192,10 +181,10 @@ return _dict[key] else: log.warning(_("Option does not exist in defaults.")) - return None else: log.warning(_("Option does not exist in defaults.")) - return None + + return default def load_config(self, config): """
View file
bonnie-0.2.1.tar.gz/bonnie/worker/handlers/logout.py -> bonnie-0.2.2.tar.gz/bonnie/worker/handlers/logout.py
Changed
@@ -78,6 +78,6 @@ time.sleep(1) # wait for storage and try again # push back into the job queue, the corresponding Login event may not yet have been processed. - return (notification, [b"PUSHBACK"]) + return (notification, [b"POSTPONE"]) return super(LogoutHandler, self).run(notification) \ No newline at end of file
View file
bonnie-0.2.1.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.2.2.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -92,9 +92,11 @@ self.lastping = time.time() self.report_state() + poller_timeout = int(conf.get('worker', 'zmq_poller_timeout', 100)) + while self.running: try: - sockets = dict(self.poller.poll(1)) + sockets = dict(self.poller.poll(poller_timeout)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -143,7 +145,7 @@ self.controller.send_multipart([b"DONE", self.job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) - self.controller.send_multipart([b"COLLECT", self.job_uuid, b" ".join(jobs)]) + self.controller.send_multipart([b" ".join(jobs), self.job_uuid]) self.set_state_ready()
View file
bonnie-0.2.1.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.2.2.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -8,6 +8,7 @@ import bonnie conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.ElasticSearchOutput') class ElasticSearchOutput(object): def __init__(self, *args, **kw): @@ -79,6 +80,7 @@ notification['@timestamp'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") index = 'logstash-%s' % (datetime.datetime.strftime(timestamp, "%Y.%m.%d")) + jobs = [] # for notifications concerning multiple messages, create entries for each message if notification.has_key('messageHeaders') and isinstance(notification['messageHeaders'], dict) and len(notification['messageHeaders']) > 0: @@ -94,16 +96,26 @@ # remove vnd.cmu.envelope if we have headers notification.pop('vnd.cmu.envelope', None) + try: + self.es.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, e: + log.warning("ES create exception: %r", e) + jobs.append(b'POSTPONE') + break + + else: + try: self.es.create( index=index, doc_type='logs', body=self.notification2log(notification) ) - else: - self.es.create( - index=index, - doc_type='logs', - body=self.notification2log(notification) - ) + except Exception, e: + log.warning("ES create exception: %r", e) + jobs.append(b'POSTPONE') - return (notification, []) + return (notification, jobs)
View file
bonnie-0.2.1.tar.gz/conf/bonnie.conf -> bonnie-0.2.2.tar.gz/conf/bonnie.conf
Changed
@@ -3,11 +3,14 @@ zmq_collector_router_bind_address = tcp://*:5571 zmq_worker_controller_router_bind_address = tcp://*:5572 zmq_worker_router_bind_address = tcp://*:5573 -persistence_sql_uri = sqlite:// +zmq_poller_timeout = 100 +state_sql_uri = sqlite:////var/lib/bonnie/state.db [collector] +num_threads = 5 input_modules = zmq_input zmq_broker_address = tcp://localhost:5571 +zmq_poller_timeout = 100 [dealer] output_modules = zmq_output @@ -24,6 +27,7 @@ output_exclude_events = MessageExpunge zmq_controller_address = tcp://localhost:5572 zmq_worker_router_address = tcp://localhost:5573 +zmq_poller_timeout = 100 elasticsearch_output_address = localhost elasticsearch_storage_address = localhost
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.