Projects
Kolab:3.4
bonnie
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 35
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.1.3 +Version: 0.2.0 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware @@ -60,7 +60,7 @@ Summary: The Bonnie broker Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-sqlalchemy +Requires: python-sqlalchemy >= 0.8.0 Requires: python-tornado Requires: python-zmq @@ -71,7 +71,6 @@ Summary: The Bonnie collector Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-tornado Requires: python-zmq %description collector @@ -92,7 +91,6 @@ Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} Requires: python-elasticsearch >= 1.0 -Requires: python-tornado Requires: python-zmq %description worker
View file
bonnie-0.1.3.tar.gz/bonnie/broker/persistence.py
Deleted
@@ -1,128 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) -# -# Thomas Bruederli <bruederli at kolabsys.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; version 3 or, at your option, any later -# version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Library General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, -# USA. -# - -import sqlalchemy as db -from sqlalchemy.orm import sessionmaker, relationship -from sqlalchemy.orm.util import has_identity -from sqlalchemy.orm.attributes import init_collection -from sqlalchemy.ext.declarative import declarative_base - -import bonnie -conf = bonnie.getConf() -log = bonnie.getLogger('bonnie.broker.persistence') - -PersistentBase = declarative_base() - -# an db engine, which the Session will use for connection resources -engine = db.create_engine(conf.get('broker', 'persistence_sql_uri', 'sqlite://')) - -# create a configured "Session" class -Session = sessionmaker(bind=engine) -session = Session() - - -class PlistCollection(list): - """ - Extended list collection to add some handy utility methods - """ - def delete(self, item): - self.remove(item) - if has_identity(item): - session.delete(item) - else: - session.expunge(item) - -class PersistentList(PersistentBase): - """ - Container class representing a persistent list object - """ - __tablename__ = 'plists' - listname = db.Column(db.String(32), primary_key=True) - # items = relationship('PlistItem') - - def __init__(self, name): - self.listname = name - - -#### module functions - -__list_classes = {} -__list_instances = {} - -def List(name, _type): - """ - Factory function to return a list-like collection with persistent storage capabilities - """ - if __list_instances.has_key(name): - return __list_instances[name].items - - # create new list class to handle items (relations) of the given type - _class_name = 'PersistentList' + _type.__name__ - - if __list_classes.has_key(_class_name): - _plistclass = __list_classes[_class_name] - else: - # determine foreign key type - if hasattr(_type, '__key__') and _type.__table__.columns.has_key(_type.__key__): - reltype = _type.__table__.columns[_type.__key__].type - elif hasattr(_type.__table__, 'primary_key'): - for col in _type.__table__.primary_key.columns: - _type.__key__ = col.name - reltype = col.type - break - else: - _type.__key__ = 'id' - reltype = db.String(256) - - # we establish a many-to-many relation using an association table - association_table = db.Table('plistitems_' + _type.__tablename__, PersistentBase.metadata, - db.Column('listname', db.Integer, db.ForeignKey('plists.listname'), index=True), - db.Column('ref', reltype, db.ForeignKey(_type.__tablename__ + '.' + _type.__key__), index=True) - ) - - # create a list container class with a relationship to the list item type - _plistclass = type(_class_name, (PersistentList,), { - 'items': relationship(_type, secondary=association_table, backref='list', collection_class=PlistCollection) - }) - __list_classes[_class_name] = _plistclass - - PersistentBase.metadata.create_all(engine) - - try: - lst = session.query(_plistclass).filter(PersistentList.listname == name).one() - except: - lst = _plistclass(name) - session.add(lst) - - # remember this instance for later calls and to avoid garbage collection - __list_instances[name] = lst - - # return the collection og list items - return lst.items - - -def syncronize(): - """ - Synchronize to persistent storage - """ - if len(session.new) > 0 or len(session.dirty) > 0 or len(session.deleted) > 0: - log.debug("session.commit(); new=%r; dirty=%r; deleted=%r" % (session.new, session.dirty, session.deleted), level=9) - session.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -23,345 +23,473 @@ This is the ZMQ broker implementation for Bonnie. """ -import copy +from multiprocessing import Process import random import re import sys +import threading import time import zmq +from zmq.eventloop import ioloop, zmqstream import bonnie conf = bonnie.getConf() log = bonnie.getLogger('bonnie.broker.ZMQBroker') -from job import Job -from collector import Collector -from worker import Worker -from bonnie.broker import persistence +import collector +import job +import worker class ZMQBroker(object): - MAX_RETRIES = 5 - running = False - def __init__(self): - self.worker_jobs = persistence.List('worker_jobs', Job) - self.collect_jobs = persistence.List('collect_jobs', Job) - self.collectors = {} - self.workers = {} - self.collector_interests = [] + """ + A ZMQ Broker for Bonnie. + """ + self.context = zmq.Context(io_threads=128) + self.poller = zmq.Poller() + + self.routers = {} def register(self, callback): callback({ '_all': self.run }) - def collector_add(self, _collector_id, _state, _interests): - log.debug("Adding collector %s for %r" % (_collector_id, _interests), level=5) + def create_router(self, name, default, **kw): + """ + Create a regular router that remains + available for the base ZMQBroker. + """ + bind_address = conf.get( + 'broker', + 'zmq_%s_router_bind_address' % (name) + ) - collector = Collector(_collector_id, _state, _interests) - self.collectors[_collector_id] = collector + if bind_address == None: + bind_address = default - # regisrer the reported interests - if len(_interests) > 0: - _interests.extend(self.collector_interests) - self.collector_interests = list(set(_interests)) + router = self.context.socket(zmq.ROUTER) + router.bind(bind_address) - def collect_job_allocate(self, _collector_id): - jobs = [x for x in self.collect_jobs if x.collector_id == _collector_id and x.state == b"PENDING"] + self.routers[name] = { + 'router': router, + 'callbacks': dict(kw) + } - if len(jobs) < 1: - return + self.poller.register(router, zmq.POLLIN) - job = jobs.pop() - job.set_status(b"ALLOC") - return job + return router - def collect_jobs_with_status(self, _state, collector_id=None): - return [x for x in self.collect_jobs if x.state == _state and x.collector_id == collector_id] + def create_router_process(self, name, default, callback): + """ + Create a router process (that is no + longer available to the base ZMQBroker). + """ - def collector_set_status(self, _collector_id, _state, _interests): - if not self.collectors.has_key(_collector_id): - self.collector_add(_collector_id, _state, _interests) - else: - self.collectors[_collector_id].set_status(_state) + bind_address = conf.get( + 'broker', + 'zmq_%s_router_bind_address' % (name) + ) - def collectors_with_status(self, _state): - return [collector_id for collector_id, collector in self.collectors.iteritems() if collector.state == _state] + if bind_address == None: + bind_address = default - def worker_job_add(self, _notification, client_id=None, collector_id=None): - """ - Add a new job. - """ - job = Job( - notification=_notification, - state=b"PENDING", - client_id=client_id, - collector_id=collector_id, + setattr( + self, + '%s_router' % (name), + Process( + target=self._run_router_process, + args=(callback, bind_address) + ) ) - if not job.uuid in [x.uuid for x in self.worker_jobs]: - self.worker_jobs.append(job) + getattr(self, '%s_router' % (name)).start() - log.debug("New worker job: %s; client=%s, collector=%s" % (job.uuid, client_id, collector_id), level=8) + def find_router(self, socket): + for name, attrs in self.routers.iteritems(): + if attrs['router'] == socket: + return name, attrs['router'], attrs['callbacks'] - def worker_job_allocate(self, _worker_id): - """ - Allocate a job to a worker, should a job be available. - """ + def run(self): + self.create_router( + 'collector', + 'tcp://*:5571', + recv_multipart = self._cb_cr_recv_multipart + ) - if len(self.worker_jobs) < 1: - return None + self.create_router_process( + 'dealer', + 'tcp://*:5570', + self._cb_dr_on_recv_stream + ) - jobs = self.worker_jobs_with_status(b"PENDING") - if len(jobs) < 1: - return None + self.create_router( + 'worker', + 'tcp://*:5573', + recv_multipart = self._cb_wr_recv_multipart + ) - # take the first job in the queue - job = jobs[0] + self.create_router( + 'worker_controller', + 'tcp://*:5572', + recv_multipart = self._cb_wcr_recv_multipart + ) - job.set_status(b"ALLOC") - job.set_worker(_worker_id) + last_expire = time.time() + last_state = time.time() + + while True: + # TODO: adjust polling timout according to the number of pending jobs + sockets = dict(self.poller.poll(100)) + + for socket, event in sockets.iteritems(): + if event == zmq.POLLIN: + name, router, callbacks = self.find_router(socket) + + for callforward, callback in callbacks.iteritems(): + result = getattr(router, '%s' % callforward)() + callback(router, result) + + for _collector in collector.select_by_state(b'READY'): + self._send_collector_job(_collector.identity) + + for _worker in worker.select_by_state(b'READY'): + self._send_worker_job(_worker.identity) + + if last_expire < (time.time() - 30): + collector.expire() + worker.expire() + job.unlock() + job.expire() + last_expire = time.time() + + if last_state < (time.time() - 10): + stats_start = time.time() + jcp = job.count_by_type_and_state('collector', b'PENDING') + jwp = job.count_by_type_and_state('worker', b'PENDING') + jca = job.count_by_type_and_state('collector', b'ALLOC') + jwa = job.count_by_type_and_state('worker', b'ALLOC') + + stats = { + 'cb': collector.count_by_state(b'BUSY'), + 'cr': collector.count_by_state(b'READY'), + 'cs': collector.count_by_state(b'STALE'), + 'ja': sum([jca, jwa]), + 'jca': jca, + 'jcp': jcp, + 'jd': job.count_by_state(b'DONE'), + 'jp': sum([jcp, jwp]), + 'jwa': jwa, + 'jwp': jwp, + 'jo': job.count_by_state(b'ORPHANED'), + 'wb': worker.count_by_state(b'BUSY'), + 'wr': worker.count_by_state(b'READY'), + 'ws': worker.count_by_state(b'STALE'), + } + stats_end = time.time() + + stats['duration'] = "%.4f" % (stats_end - stats_start) + + log.info(""" + Jobs: done=%(jd)d, pending=%(jp)d, alloc=%(ja)d, + orphaned=%(jo)d. + Workers: ready=%(wr)d, busy=%(wb)d, stale=%(ws)d, + pending=%(jwp)d, alloc=%(jwa)d. + Collectors: ready=%(cr)d, busy=%(cb)d, stale=%(cs)d, + pending=%(jcp)d, alloc=%(jca)d. + Took: seconds=%(duration)s.""" % stats + + log.info(info) + + last_state = time.time() + + def _cb_cr_recv_multipart(self, router, message): + """ + Receive a message on the Collector Router. + """ + log.debug("Collector Router Message: %r" % (message), level=8) + collector_identity = message[0] + cmd = message[1] - return job.uuid + if not hasattr(self, '_handle_cr_%s' % (cmd)): + log.error("Unhandled CR cmd %s" % (cmd)) - def worker_job_done(self, _job_uuid): - for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: - self.worker_jobs.delete(job) - log.debug("Worker job done: %s;" % (_job_uuid), level=8) + handler = getattr(self, '_handle_cr_%s' % (cmd)) + handler(router, collector_identity, message[2:]) - def worker_job_free(self, _job_uuid, pushback=False): - for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: - job.set_status(b"PENDING") - job.set_worker(None) + def _cb_dr_on_recv_stream(self, stream, message): + """ + Callback on the Dealer Router process. - if pushback: - # increment retry count on pushback - job.retries += 1 - log.debug("Push back job %s for %d. time" % (_job_uuid, job.retries), level=8) - if job.retries > self.MAX_RETRIES: - # delete job after MAX retries - self.worker_jobs.delete(job) - log.info("Delete pushed back job %s" % (_job_uuid)) - else: - # move it to the end of the job queue - self.worker_jobs.remove(job) - self.worker_jobs.append(job) + Responds as fast as possible. + """ + log.debug("Dealer Router Message: %r" % (message), level=8) + dealer_identity = message[0] + notification = message[1] + stream.send_multipart([dealer_identity, b'ACK']) - def worker_job_send(self, _job_uuid, _worker_id): - # TODO: Sanity check on job state, worker assignment, etc. - for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: - self.worker_router.send_multipart([_worker_id, b"JOB", _job_uuid, job.notification]) + job.add(dealer_identity, notification) - log.debug("Sent job %s to worker %s;" % (_job_uuid, _worker_id), level=8) + def _cb_wr_recv_multipart(self, router, message): + log.debug("Worker Router Message: %r" % (message), level=8) + worker_identity = message[0] + cmd = message[1] - def worker_jobs_with_status(self, _state): - return [x for x in self.worker_jobs if x.state == _state] + if not hasattr(self, '_handle_wr_%s' % (cmd)): + log.error("Unhandled WR cmd %s" % (cmd)) + return - def worker_jobs_with_worker(self, _worker_id): - return [x for x in self.worker_jobs if x.worker_id == _worker_id] + handler = getattr(self, '_handle_wr_%s' % (cmd)) + handler(router, worker_identity, message[2:]) - def worker_add(self, _worker_id, _state): - log.debug("Adding worker %s (%s)" % (_worker_id, _state), level=5) + def _cb_wcr_recv_multipart(self, router, message): + log.debug("Worker Controller Router Message: %r" % (message), level=8) + worker_identity = message[0] + cmd = message[1] - worker = Worker(_worker_id, _state) - self.workers[_worker_id] = worker + if not hasattr(self, '_handle_wcr_%s' % (cmd)): + log.error("Unhandled WCR cmd %s" % (cmd)) + self._handle_wcr_unknown(router, identity, message[2:]) + return - def worker_set_status(self, _worker_id, _state): - if not self.workers.has_key(_worker_id): - self.worker_add(_worker_id, _state) - else: - self.workers[_worker_id].set_status(_state) + handler = getattr(self, '_handle_wcr_%s' % (cmd)) + handler(router, worker_identity, message[2:]) - def workers_expire(self): - delete_workers = [] - for worker_id, worker in self.workers.iteritems(): - if worker.timestamp < (time.time() - 10): - self.controller.send_multipart([worker_id, b"STATE"]) + ## + ## Collector Router command functions + ## - if worker.timestamp < (time.time() - 30): - if worker.state == b"READY": - self.worker_set_status(worker_id, b"STALE") - elif worker.state == b"BUSY": - self.worker_set_status(worker_id, b"STALE") - else: - delete_workers.append(worker_id) + def _handle_cr_DONE(self, router, identity, message): + """ + A collector has indicated it is done performing a job, or + actually a command for a job. + """ + if conf.debuglevel > 7: + log.debug("Handling DONE for identity %s (message: %r)" % (identity, message), level=8) + elif conf.debuglevel > 6: + log.debug("Handling DONE for identity %s (message: %r)" % (identity, message[:-1]), level=7) + + job_uuid = message[0] + log.info("Job %s DONE by %s" % (job_uuid, identity)) + + notification = message[1] + + job.update( + job_uuid, + state = b'PENDING', + job_type = 'worker', + notification = notification, + cmd = None + ) - for job in self.worker_jobs_with_worker(worker_id): - self.worker_job_free(job) + collector.update( + identity, + state = b'READY', + job = None + ) - for worker in delete_workers: - log.debug("Deleting worker %s" % (worker), level=5) - del self.workers[worker] + self._send_collector_job(identity) - def workers_with_status(self, _state): - return [worker_id for worker_id, worker in self.workers.iteritems() if worker.state == _state] + def _handle_cr_STATE(self, router, identity, message): + """ + A collector is reporting its state. + """ + log.debug("Handling STATE for identity %s (message: %r)" % (identity, message), level=7) - def run(self): - log.info("Starting") - self.running = True + state = message[0] + interests = message[1].split(",") - context = zmq.Context() + collector.set_state(identity, state, interests) - dealer_router_bind_address = conf.get('broker', 'zmq_dealer_router_bind_address') + if state == b'READY': + self._send_collector_job(identity) - if dealer_router_bind_address == None: - dealer_router_bind_address = "tcp://*:5570" + ## + ## Worker Controller Router command functions + ## - dealer_router = context.socket(zmq.ROUTER) - dealer_router.bind(dealer_router_bind_address) + def _handle_wcr_DONE(self, router, identity, message): + log.debug("Handing DONE for identity %s (message: %r)" % (identity, message), level=7) + job_uuid = message[0] + log.info("Job %s DONE by %s" % (job_uuid, identity)) + job.update( + job_uuid, + state = b'DONE' + ) - collector_router_bind_address = conf.get('broker', 'zmq_collector_router_bind_address') + worker.update( + identity, + state = b'READY', + job = None + ) - if collector_router_bind_address == None: - collector_router_bind_address = "tcp://*:5571" + self._send_worker_job(identity) - self.collector_router = context.socket(zmq.ROUTER) - self.collector_router.bind(collector_router_bind_address) + def _handle_wcr_COLLECT(self, router, identity, message): + log.debug("Handing COLLECT for identity %s (message: %r)" % (identity, message), level=7) + job_uuid = message[0] + log.info("Job %s COLLECT by %s" % (job_uuid, identity)) - controller_bind_address = conf.get('broker', 'zmq_controller_bind_address') + updates = dict( + cmd = message[1], + state = b'PENDING', + job_type = 'collector' + ) - if controller_bind_address == None: - controller_bind_address = "tcp://*:5572" + if len(message) > 2: + updates['notification'] = message[2] - self.controller = context.socket(zmq.ROUTER) - self.controller.bind(controller_bind_address) + job.update( + job_uuid, + **updates + ) - worker_router_bind_address = conf.get('broker', 'zmq_worker_router_bind_address') + worker.update( + identity, + state = b'READY', + job = None + ) - if worker_router_bind_address == None: - worker_router_bind_address = "tcp://*:5573" + self._send_worker_job(identity) - self.worker_router = context.socket(zmq.ROUTER) - self.worker_router.bind(worker_router_bind_address) + def _handle_wcr_PUSHBACK(self, router, identity, message): + log.debug("Handing PUSHBACK for identity %s (message: %r)" % (identity, message), level=8) + job_uuid = message[0] + job_ = job.select(job_uuid) - poller = zmq.Poller() - poller.register(dealer_router, zmq.POLLIN) - poller.register(self.collector_router, zmq.POLLIN) - poller.register(self.worker_router, zmq.POLLIN) - poller.register(self.controller, zmq.POLLIN) + if job_ is not None and job_.pushbacks < 5: + job.update( + job_uuid, + state = b'PENDING', + pushbacks = job_.pushbacks + 1 + ) + else: + log.error("Job %s pushed back too many times" % (job_uuid)) + job.update( + job_uuid, + state = b'ORPHANED' + ) + + worker.update( + identity, + state = b'READY', + job = None + ) - # reset existing jobs in self.worker_jobs and self.collect_jobs to status PENDING (?) - # this will re-assign them to workers and collectors after a broker restart - for job in self.worker_jobs: - job.set_status(b"PENDING") + self._send_worker_job(identity) - for job in self.collect_jobs: - job.set_status(b"PENDING") + def _handle_wcr_STATE(self, router, identity, message): + log.debug("Handing STATE for identity %s (message: %r)" % (identity, message), level=7) + state = message[0] - persistence.syncronize() + if state == b'BUSY': + job_uuid = message[1] + _job = job.select(job_uuid) - while self.running: - try: - sockets = dict(poller.poll(1000)) - except KeyboardInterrupt, e: - log.info("zmq.Poller KeyboardInterrupt") - break - except Exception, e: - log.error("zmq.Poller error: %r", e) - sockets = dict() + if not _job == None: + _job_id = _job.id + else: + _job_id = None + state = b'READY' - self.workers_expire() + worker.update( + identity, + state = state, + job = _job_id + ) + else: + worker.update( + identity, + state = state, + job = None + ) + + if state == b'READY': + self._send_worker_job(identity) + + def _handle_wcr_unknown(self, router, identity, message): + job_uuid = message[0] + job.update( + job_uuid, + state = b'ORPHANED' + ) - if self.controller in sockets: - if sockets[self.controller] == zmq.POLLIN: - _message = self.controller.recv_multipart() - log.debug("Controller message: %r" % (_message), level=9) + worker.update( + identity, + state = b'READY', + job = None + ) - if _message[1] == b"STATE": - _worker_id = _message[0] - _state = _message[2] - self.worker_set_status(_worker_id, _state) + ## + ## Worker Router command functions + ## + + def _handle_wr_GET(self, router, identity, message): + log.debug("Handing GET for worker %s (message: %r)" % (identity, message), level=7) + job_uuid = message[0] + _job = job.select(job_uuid) + + router.send_multipart( + [ + identity, + b'JOB', + (_job.uuid).encode('ascii'), + (_job.notification).encode('ascii') + ] + ) - if _message[1] == b"DONE": - self.worker_job_done(_message[2]) + def _run_router_process(self, callback, bind_address): + """ + Run a multiprocessing.Process with this function + as a target. + """ - if _message[1] == b"PUSHBACK": - self.worker_job_free(_message[2], True) + router = zmq.Context().socket(zmq.ROUTER) + router.bind(bind_address) - if _message[1] in self.collector_interests: - _job_uuid = _message[2] - self.transit_job_collect(_job_uuid, _message[1]) + stream = zmqstream.ZMQStream(router) + stream.on_recv_stream(callback) + ioloop.IOLoop.instance().start() - if dealer_router in sockets: - if sockets[dealer_router] == zmq.POLLIN: - _message = dealer_router.recv_multipart() - log.debug("Dealer message: %r" % (_message), level=9) + def _send_collector_job(self, identity): + _job = job.select_for_collector(identity) - _client_id = _message[0] - _notification = _message[1] - _collector_id = _client_id.replace('Dealer', 'Collector') - _collector_id = re.sub(r'-[0-9]+$', '', _collector_id) - self.worker_job_add(_notification, client_id=_client_id, collector_id=_collector_id) + if _job == None: + return - dealer_router.send_multipart([_message[0], b"ACK"]) + collector.update( + identity, + state = b'BUSY', + job = _job.id + ) + + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) - if self.collector_router in sockets: - if sockets[self.collector_router] == zmq.POLLIN: - _message = self.collector_router.recv_multipart() - log.debug("Collector message: %r" % (_message), level=9) + self.routers['collector']['router'].send_multipart( + [ + (_job.collector).encode('ascii'), + (_job.cmd).encode('ascii'), + (_job.uuid).encode('ascii'), + (_job.notification).encode('ascii') + ] + ) - if _message[1] == b"STATE": - _collector_id = _message[0] - _state = _message[2] - _interests = _message[3] - self.collector_set_status(_collector_id, _state, _interests.split(",")) + def _send_worker_job(self, identity): + _job = job.select_for_worker(identity) - if _message[1] == b"DONE": - _collector_id = _message[0] - _job_uuid = _message[2] - _notification = _message[3] - self.transit_job_worker(_job_uuid, _notification=_notification) + if _job == None: + return - if self.worker_router in sockets: - if sockets[self.worker_router] == zmq.POLLIN: - _message = self.worker_router.recv_multipart() - log.debug("Worker message: %r" % (_message), level=9) + worker.update( + identity, + state = b'BUSY', + job = _job.id + ) - _worker_id = _message[0] - _command = _message[1] - _job_uuid = _message[2] - self.worker_job_send(_job_uuid, _worker_id) - - ready_workers = self.workers_with_status(b"READY") - pending_jobs = self.worker_jobs_with_status(b"PENDING") - - if len(pending_jobs) > 0 and len(ready_workers) > 0: - _worker_id = random.choice(ready_workers) - _job_uuid = self.worker_job_allocate(_worker_id) - - if not _job_uuid == None: - self.controller.send_multipart([_worker_id, b"TAKE", _job_uuid]) - - ready_collectors = self.collectors_with_status(b"READY") - - for collector in ready_collectors: - pending_jobs = self.collect_jobs_with_status(b"PENDING", collector_id=collector) - if len(pending_jobs) > 0: - job = self.collect_job_allocate(collector) - self.collector_router.send_multipart([job.collector_id, job.command, job.uuid, job.notification]) - - # synchronize job lists to persistent storage - persistence.syncronize() - - - log.info("Shutting down") - - persistence.syncronize() - dealer_router.close() - self.controller.close() - self.collector_router.close() - self.worker_router.close() - context.term() - - def transit_job_collect(self, _job_uuid, _command): - for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: - job.set_status(b"PENDING") - job.set_command(_command) - self.collect_jobs.append(job) - self.worker_jobs.remove(job) - - def transit_job_worker(self, _job_uuid, _notification): - for job in [x for x in self.collect_jobs if x.uuid == _job_uuid]: - job.set_status(b"PENDING") - job.notification = _notification - self.worker_jobs.append(job) - self.collect_jobs.remove(job) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + self.routers['worker_controller']['router'].send_multipart( + [ + (identity).encode('ascii'), + b'TAKE', + (_job.uuid).encode('ascii') + ] + )
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py
Changed
@@ -1,15 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime import time -class Collector(object): - state = None - job = None +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +from bonnie.broker.state import init_db, Collector, Job, Interest + +def add(identity, state = b'READY', interests = []): + db = init_db('collectors') + db.add(Collector(identity, state)) + db.commit() + collector = db.query(Collector).filter_by(identity=identity).first() + for cmd in interests: + interest = db.query(Interest).filter_by(cmd=cmd).first() + + if interest == None: + db.add(Interest(cmd)) + db.commit() + interest = db.query(Interest).filter_by(cmd=cmd).first() + + collector.interests.append(interest) + + db.commit() + +def count(): + db = init_db('collectors') + result = db.query(Collector).count() + return result + +def count_by_state(state): + db = init_db('collectors') + result = db.query(Collector).filter_by(state=state).count() + return result + +def select(identity): + db = init_db('collectors') + collector = db.query(Collector).filter_by(identity=identity).first() + return collector + +def select_by_state(state): + db = init_db('collectors') + collectors = db.query(Collector).filter_by(state=state).all() + return collectors + +def set_state(identity, state, interests = []): + db = init_db('collectors') + + collector = db.query(Collector).filter_by(identity=identity).first() + + if collector == None: + db.add(Collector(identity, state)) + db.commit() + collector = db.query(Collector).filter_by(identity=identity).first() + else: + collector.state = state + collector.timestamp = datetime.datetime.utcnow() + db.commit() + + if state == b'READY': + collector.job = None + + for cmd in interests: + interest = db.query(Interest).filter_by(cmd=cmd).first() + + if interest == None: + db.add(Interest(cmd)) + db.commit() + interest = db.query(Interest).filter_by(cmd=cmd).first() + + collector.interests.append(interest) + + db.commit() + +def update(identity, **kw): + db = init_db('collectors') + + collector = db.query(Collector).filter_by(identity=identity).first() + + for attr, value in kw.iteritems(): + setattr(collector, attr, value) + + db.commit() + +def expire(): + db = init_db('collectors') - def __init__(self, collector_id=None, state=b"READY", interests=[]): - self.collector_id = collector_id - self.state = state - self.interests = interests - self.timestamp = time.time() + for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state != b'STALE').all(): + log.debug("Marking collector %s as stale" % (collector.identity), level=7) + if not collector.job == None: + _job = db.query(Job).filter_by(id=collector.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() - def set_status(self, state): - self.state = state - self.timestamp = time.time() + collector.state = b'STALE' + collector.timestamp = datetime.datetime.utcnow() + db.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -19,47 +19,188 @@ # USA. # -import hashlib +import datetime import time -from bonnie.broker.persistence import db, PersistentBase - -class Job(PersistentBase): - __tablename__ = 'jobs' - # use binary types because ZMQ requires binary strings - uuid = db.Column(db.LargeBinary(128), primary_key=True) - type = db.Column(db.String(16)) - state = db.Column(db.String(16)) - timestamp = db.Column(db.Float) - notification = db.Column(db.LargeBinary) - worker_id = db.Column(db.String(64)) - client_id = db.Column(db.String(64)) - collector_id = db.Column(db.LargeBinary(64)) - command = db.Column(db.LargeBinary(32)) - - def __init__(self, state=None, notification=None, worker_id=None, client_id=None, collector_id=None): - self.uuid = "%s.%s" % (hashlib.sha224(notification).hexdigest(), time.time()) - self.state = state - self.notification = notification - self.worker_id = worker_id - self.client_id = client_id - self.collector_id = collector_id - self.timestamp = time.time() - self.command = None - self.retries = 0 - - if self.client_id == None: - if self.collector_id == None: - self.type = None - else: - self.type = 'Collector' - else: - self.type = 'Dealer' - - def set_status(self, state): - self.state = state - - def set_worker(self, worker_id): - self.worker_id = worker_id - - def set_command(self, cmd): - self.command = cmd + +from sqlalchemy.exc import IntegrityError + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +from bonnie.broker.state import init_db, Collector, Job, Worker + +def add(dealer, notification, job_type='worker'): + """ + Add a new job. + """ + db = init_db('jobs') + try: + db.add(Job(dealer, notification, job_type)) + db.commit() + except IntegrityError, errmsg: + db.rollback() + +def count(): + db = init_db('jobs') + result = db.query(Job).count() + return result + +def count_by_state(state): + db = init_db('jobs') + result = db.query(Job).filter_by(state=state).count() + return result + +def count_by_type(job_type): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type).all() + return result + +def count_by_type_and_state(job_type, state): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type, state=state).count() + return result + +def select(job_uuid): + db = init_db('jobs') + result = db.query(Job).filter_by(uuid=job_uuid).first() + return result + +def select_all(): + db = init_db('jobs') + result = db.query(Job).all() + return result + +def select_by_state(state): + db = init_db('jobs') + result = db.query(Job).filter_by(state=state).all() + return result + +def select_by_type(job_type): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type).all() + return result + +def select_for_collector(identity): + db = init_db('jobs') + + collector = db.query(Collector).filter(Collector.identity==identity, Collector.job == None, Collector.state == b'READY').first() + + if collector == None: + return + + # This is influenced by .update(), which resets the .timestamp to + # .utcnow(), effectively pushing all updated jobs to the back of the + # queue. + # + # Practical result is a massive amount of metadata gathering + # followed by a sudden surge of jobs getting DONE. + #job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').order_by(Job.timestamp).first() + + # This would result in "most recent first, work your way backwards." + #job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').order_by(Job.timestamp.desc()).first() + + # This results in following the storage order and is by far the + # fastest methodology. + job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').first() + + if not job == None: + job.state = b'ALLOC' + job.timestamp = datetime.datetime.utcnow() + db.commit() + + return job + +def select_for_worker(identity): + db = init_db('jobs') + + worker = db.query(Worker).filter(Worker.identity == identity, Worker.job == None, Worker.state == b'READY').first() + + if worker == None: + return + + # This is influenced by .update(), which resets the .timestamp to + # .utcnow(), effectively pushing all updated jobs to the back of the + # queue. + # + # Practical result is a massive amount of metadata gathering + # followed by a sudden surge of jobs getting DONE. + #job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').order_by(Job.timestamp).first() + + # This would result in "most recent first, work your way backwards." + #job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').order_by(Job.timestamp.desc()).first() + + # This results in following the storage order and is by far the + # fastest methodology. + job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').first() + + if not job == None: + job.state = b'ALLOC' + job.timestamp = datetime.datetime.utcnow() + db.commit() + + return job + +def set_state(uuid, state): + db = init_db('jobs') + for job in db.query(Job).filter_by(uuid=uuid).all(): + job.state = state + job.timestamp = datetime.datetime.utcnow() + db.commit() + +def set_job_type(uuid, job_type): + db = init_db('jobs') + job = db.query(Job).filter_by(uuid=uuid).first() + job.job_type = job_type + job.timestamp = datetime.datetime.utcnow() + db.commit() + +def update(job_uuid, **kw): + db = init_db('jobs') + job = db.query(Job).filter_by(uuid=job_uuid).first() + + if job == None: + return + + for attr, value in kw.iteritems(): + setattr(job, attr, value) + + job.timestamp = datetime.datetime.utcnow() + + db.commit() + +def expire(): + """ + Unlock jobs that have been allocated to some worker way too long + ago. + """ + db = init_db('jobs') + + for job in db.query(Job).filter(Job.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 300)), Job.state == b'DONE').all(): + log.debug("Purging job %s" % (job.uuid), level=7) + db.delete(job) + + db.commit() + +def unlock(): + """ + Unlock jobs that have been allocated to some worker way too long + ago. + """ + db = init_db('jobs') + + for job in db.query(Job).filter( + Job.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 120)), + Job.state == b'ALLOC' + ).all(): + + log.debug("Unlocking %s job %s" % (job.job_type, job.uuid), level=7) + job.state = b'PENDING' + + for collector in db.query(Collector).filter_by(job=job.id).all(): + collector.job = None + + for worker in db.query(Worker).filter_by(job=job.id).all(): + worker.job = None + + db.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/worker.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/worker.py
Changed
@@ -1,15 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime import time -class Worker(object): - state = None - job = None +from bonnie.broker.state import init_db, Job, Worker + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +def add(identity, state = b'READY'): + db = init_db('workers') + db.add(Worker(identity, state)) + db.commit() + +def count(): + db = init_db('workers') + result = db.query(Worker).count() + return result + +def count_by_state(state): + db = init_db('workers') + result = db.query(Worker).filter_by(state=state).count() + return result + +def select_by_state(state): + db = init_db('workers') + result = db.query(Worker).filter_by(state=state).all() + return result + +def select(identity): + db = init_db('workers') + result = db.query(Worker).filter_by(identity=identity).first() + return result + +def set_job(identity, job_uuid): + db = init_db('workers') + job = db.query(Job).filter_by(uuid=job_uuid).first() + + if job == None: + return + + worker = db.query(Worker).filter_by(identity=identity).first() + worker.job = job.id + job.worker = worker.id + db.commit() + +def set_state(identity, state): + db = init_db('workers') + worker = db.query(Worker).filter_by(identity=identity).first() + + if worker == None: + db.add(Worker(identity, state)) + db.commit() + + else: + worker.state = state + worker.timestamp = datetime.datetime.utcnow() + db.commit() + +def update(identity, **kw): + db = init_db('identity') + worker = db.query(Worker).filter_by(identity=identity).first() + + if worker == None: + db.add(Worker(identity, b'READY')) + else: + for attr, value in kw.iteritems(): + setattr(worker, attr, value) + + worker.timestamp = datetime.datetime.utcnow() + + db.commit() + +def expire(): + db = init_db('workers') + for worker in db.query(Worker).filter(Worker.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Worker.state == b'STALE').all(): + log.debug("Purging worker %s as very stale" % (worker.identity), level=7) + + if not worker.job == None: + _job = db.query(Job).filter_by(id=worker.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() + + db.delete(worker) + db.commit() - def __init__(self, worker_id=None, state=b"READY", job=None): - self.worker_id = worker_id - self.state = state - self.job = job - self.timestamp = time.time() + for worker in db.query(Worker).filter(Worker.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Worker.state != b'STALE').all(): + log.debug("Marking worker %s as stale" % (worker.identity), level=7) + if not worker.job == None: + _job = db.query(Job).filter_by(id=worker.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() - def set_status(self, state): - self.state = state - self.timestamp = time.time() + worker.state = b'STALE' + worker.timestamp = datetime.datetime.utcnow() + db.commit()
View file
bonnie-0.2.0.tar.gz/bonnie/broker/state.py
Added
@@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime +import hashlib +import re + +import sqlalchemy + +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import ForeignKey +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import Text + +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship + +try: + from sqlalchemy.orm import relationship +except: + from sqlalchemy.orm import relation as relationship + +try: + from sqlalchemy.orm import sessionmaker +except: + from sqlalchemy.orm import create_session + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.state') + +DeclarativeBase = declarative_base() + +db = None + +collector_interest_table = Table( + 'collector_interest', + DeclarativeBase.metadata, + Column('collector_id', Integer, ForeignKey('collector.id')), + Column('interest_id', Integer, ForeignKey('interest.id')) + ) + +## +## Classes +## + +class Collector(DeclarativeBase): + __tablename__ = 'collector' + + id = Column(Integer, primary_key=True) + identity = Column(String(128)) + state = Column(String(16)) + job = Column(Integer, ForeignKey('job.id')) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + interests = relationship( + "Interest", + secondary=collector_interest_table + ) + + def __init__(self, identity, state = b'READY'): + DeclarativeBase.__init__(self) + self.identity = identity + self.state = state + +class Interest(DeclarativeBase): + __tablename__ = 'interest' + + id = Column(Integer, primary_key=True) + cmd = Column(String(16), nullable=False) + + def __init__(self, cmd): + self.cmd = cmd + +class Job(DeclarativeBase): + __tablename__ = 'job' + + id = Column(Integer, primary_key=True) + uuid = Column(String(128), nullable=False, unique=True) + dealer = Column(String(128), nullable=False) + collector = Column(String(128), nullable=False) + notification = Column(Text) + job_type = Column(String(16), default='worker') + state = Column(String(16)) + cmd = Column(String(256)) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + pushbacks = Column(Integer, default=0) + + def __init__(self, dealer, notification, job_type='worker'): + DeclarativeBase.__init__(self) + self.uuid = hashlib.sha224(notification).hexdigest() + self.dealer = dealer + self.collector = re.sub(r'-[0-9]+$', '', dealer).replace('Dealer', 'Collector') + self.notification = notification + self.state = b'PENDING' + self.job_type = job_type + self.timestamp = datetime.datetime.utcnow() + self.cmd = None + self.pushbacks = 0 + +class Worker(DeclarativeBase): + __tablename__ = 'worker' + + id = Column(Integer, primary_key=True) + identity = Column(String(128)) + state = Column(String(16)) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + job = Column(Integer, ForeignKey('job.id')) + + def __init__(self, identity, state): + DeclarativeBase.__init__(self) + self.identity = identity + self.state = state + +def init_db(name, reinit=False): + """ + Returns a SQLAlchemy Session() instance. + """ + global db + + if not db == None and not reinit: + return db + + if reinit: + import os + if os.path.isfile('sqlite:////var/lib/bonnie/state.db'): + os.unlink('sqlite:////var/lib/bonnie/state.db') + + db_uri = 'sqlite:////var/lib/bonnie/state.db' + echo = conf.debuglevel > 8 + + try: + engine = create_engine(db_uri, echo=echo) + DeclarativeBase.metadata.create_all(engine) + except: + engine = create_engine('sqlite:////tmp/state.db') + DeclarativeBase.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) + db = Session() + + return db
View file
bonnie-0.1.3.tar.gz/bonnie/collector/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/collector/__init__.py
Changed
@@ -43,15 +43,16 @@ self.handler_interests = {} self.handler_modules = {} - def execute(self, command, notification): + def execute(self, commands, notification): """ Dispatch collector job to the according handler(s) """ - log.debug("Executing collection command %s" % (command), level=8) + log.debug("Executing collection for %s" % (commands), level=8) - if self.handler_interests.has_key(command): - for interest in self.handler_interests[command]: - notification = interest['callback'](notification=notification) + for command in commands.split(): + if self.handler_interests.has_key(command): + for interest in self.handler_interests[command]: + notification = interest['callback'](notification=notification) return notification
View file
bonnie-0.1.3.tar.gz/bonnie/collector/handlers/ldapdata.py -> bonnie-0.2.0.tar.gz/bonnie/collector/handlers/ldapdata.py
Changed
@@ -40,7 +40,7 @@ self.pykolab_conf.finalize_conf(fatal=False) self.ldap = Auth() - self.connections = 0 + self.ldap.connect() def register(self, callback): interests = { @@ -54,10 +54,7 @@ log.debug("GETUSERDATA for %r" % (notification), level=9) if notification.has_key('user'): - self.connections += 1 - try: - self.ldap.connect() user_dn = self.ldap.find_user_dn(notification['user'], True) log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8) except Exception, e: @@ -78,9 +75,4 @@ notification['user_data'] = user_rec - self.connections -= 1 - - if self.connections == 0: - self.ldap.disconnect() - return json.dumps(notification)
View file
bonnie-0.1.3.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.2.0.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -65,7 +65,7 @@ def report_state(self, interests=[]): log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) - self.collector.send_multipart([b"STATE", self.state, ",".join(self.interests)]) + self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) self.report_timestamp = time.time() def run(self, callback=None, interests=[]): @@ -79,7 +79,7 @@ while self.running: try: - sockets = dict(self.poller.poll(1000)) + sockets = dict(self.poller.poll(1)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -108,6 +108,7 @@ if not callback == None: result = callback(_message[0], _notification) + self.report_timestamp = time.time() self.collector.send_multipart([b"DONE", _job_uuid, result]) log.info("[%s] shutting down", self.identity)
View file
bonnie-0.1.3.tar.gz/bonnie/worker/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/worker/__init__.py
Changed
@@ -177,12 +177,11 @@ jobs.extend(_jobs) # trigger storage modules which registered interest in particular notification properties - if len(jobs) == 0: - for prop,storage_interests in self.storage_interests.iteritems(): - if notification.has_key(prop): - for interest in storage_interests: - (notification, _jobs) = self.interest_callback(interest, notification) - jobs.extend(_jobs) + for prop,storage_interests in self.storage_interests.iteritems(): + if notification.has_key(prop): + for interest in storage_interests: + (notification, _jobs) = self.interest_callback(interest, notification) + jobs.extend(_jobs) # finally send notification to output handlers if no jobs remaining if len(jobs) == 0 and not notification.has_key('_suppress_output') and not event in self.output_exclude_events: @@ -196,6 +195,8 @@ (notification, _jobs) = self.interest_callback(interest, notification) jobs.extend(_jobs) + log.debug("Event notification %r processed. Jobs = %r" % (event, jobs), level=7) + return notification, list(set(jobs)) def input_report(self):
View file
bonnie-0.1.3.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.2.0.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -40,7 +40,7 @@ def __init__(self, *args, **kw): self.state = b"READY" - self.job_id = None + self.job_uuid = None self.lastping = 0 self.report_timestamp = 0 @@ -52,7 +52,12 @@ def report_state(self): log.debug("[%s] reporting state: %s" % (self.identity, self.state), level=8) - self.controller.send_multipart([b"STATE", self.state]) + message = [b"STATE", self.state] + + if not self.job_uuid == None and self.state == b'BUSY': + message.append(self.job_uuid) + + self.controller.send_multipart(message) self.report_timestamp = time.time() def run(self, callback=None, report=None): @@ -89,7 +94,7 @@ while self.running: try: - sockets = dict(self.poller.poll(1000)) + sockets = dict(self.poller.poll(1)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -115,8 +120,8 @@ self.report_state() else: - _job_id = _message[1] - self.take_job(_job_id) + self.job_uuid = _message[1] + self.take_job(self.job_uuid) if self.worker in sockets: if sockets[self.worker] == zmq.POLLIN: @@ -124,22 +129,21 @@ log.debug("[%s] Worker message: %r" % (self.identity, _message), level=9) if _message[0] == "JOB": - _job_uuid = _message[1] + self.job_uuid = _message[1] # TODO: Sanity checking - #if _message[1] == self.job_id: + #if _message[1] == self.job_uuid: + #jobs = [] if not callback == None: (notification, jobs) = callback(_message[2]) else: jobs = [] if len(jobs) == 0: - self.controller.send_multipart([b"DONE", _job_uuid]) + self.controller.send_multipart([b"DONE", self.job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) - - for job in jobs: - self.controller.send_multipart([job, _job_uuid]) + self.controller.send_multipart([b"COLLECT", self.job_uuid, b" ".join(jobs)]) self.set_state_ready() @@ -150,20 +154,19 @@ log.info("[%s] shutting down", self.identity) self.worker.close() - def set_state_busy(self): + def set_state_busy(self, _job_id): log.debug("[%s] Set state to BUSY" % (self.identity), level=9) - self.controller.send_multipart([b"STATE", b"BUSY", b"%s" % (self.job_id)]) + self.report_timestamp = time.time() self.state = b"BUSY" + self.job_uuid = _job_id def set_state_ready(self): log.debug("[%s] Set state to READY" % (self.identity), level=9) - self.controller.send_multipart([b"STATE", b"READY"]) + self.report_timestamp = time.time() self.state = b"READY" - self.job_id = None + self.job_uuid = None def take_job(self, _job_id): log.debug("[%s] Accept job %s" % (self.identity, _job_id), level=9) - self.set_state_busy() + self.set_state_busy(_job_id) self.worker.send_multipart([b"GET", _job_id]) - self.job_id = _job_id -
View file
bonnie-0.1.3.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.2.0.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -34,27 +34,28 @@ Convert the given event notification record into a valid log entry """ keymap = { - 'timestamp': None, - 'clientIP': 'client_ip', - 'clientPort': None, - 'serverPort': None, - 'serverDomain': 'domain', - 'aclRights': 'acl_rights', - 'aclSubject': 'acl_subject', - 'mailboxID': 'mailbox_id', - 'messageSize': 'message_size', - 'messageHeaders': None, - 'messageContent': None, - 'bodyStructure': None, - 'metadata': None, - 'acl': None, - 'flagNames': 'flag_names', - 'diskUsed': 'disk_used', - 'vnd.cmu.oldUidset': 'olduidset', - 'vnd.cmu.sessionId': 'session_id', - 'vnd.cmu.midset': 'message_id', - 'vnd.cmu.unseenMessages': 'unseen_messages', - } + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': None, + 'messageContent': None, + 'bodyStructure': None, + 'metadata': None, + 'acl': None, + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'olduidset', + 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', + } + log = { '@version': bonnie.API_VERSION } for key,val in notification.iteritems(): newkey = keymap[key] if keymap.has_key(key) else key
View file
bonnie-0.1.3.tar.gz/bonnie/worker/storage/elasticsearch_storage.py -> bonnie-0.2.0.tar.gz/bonnie/worker/storage/elasticsearch_storage.py
Changed
@@ -364,6 +364,7 @@ now = int(time.time()) base_uri = re.sub(';.+$', '', notification[attrib]) + jobs = [] log.debug("Resolve folder for %r = %r" % (attrib, base_uri), level=8) @@ -375,12 +376,16 @@ # mailbox resolving requires metadata if not notification.has_key('metadata'): log.debug("Adding GETMETADATA job", level=8) - return (notification, [ b"GETMETADATA" ]) + jobs.append(b"GETMETADATA") # before creating a folder entry, we should collect folder ACLs if not notification.has_key('acl'): log.debug("Adding GETACL", level=8) - return (notification, [ b"GETACL" ]) + jobs.append(b"GETACL") + + # reject notification with additional collector jobs + if len(jobs) > 0: + return (notification, jobs) # extract folder properties and a unique identifier from the notification folder = self.notificaton2folder(notification)
View file
bonnie-0.1.3.tar.gz/conf/bonnie.conf -> bonnie-0.2.0.tar.gz/conf/bonnie.conf
Changed
@@ -1,7 +1,7 @@ [broker] zmq_dealer_router_bind_address = tcp://*:5570 zmq_collector_router_bind_address = tcp://*:5571 -zmq_controller_bind_address = tcp://*:5572 +zmq_worker_controller_router_bind_address = tcp://*:5572 zmq_worker_router_bind_address = tcp://*:5573 persistence_sql_uri = sqlite://
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.