Projects
Kolab:3.4
bonnie
Log In
Username
Password
We truncated the diff of some files because they were too big. If you want to see the full diff for every file,
click here
.
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 35
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.1.3 +Version: 0.2.0 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware @@ -60,7 +60,7 @@ Summary: The Bonnie broker Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-sqlalchemy +Requires: python-sqlalchemy >= 0.8.0 Requires: python-tornado Requires: python-zmq @@ -71,7 +71,6 @@ Summary: The Bonnie collector Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-tornado Requires: python-zmq %description collector @@ -92,7 +91,6 @@ Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} Requires: python-elasticsearch >= 1.0 -Requires: python-tornado Requires: python-zmq %description worker
View file
bonnie-0.1.3.tar.gz/bonnie/broker/persistence.py
Deleted
@@ -1,128 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) -# -# Thomas Bruederli <bruederli at kolabsys.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; version 3 or, at your option, any later -# version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Library General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, -# USA. -# - -import sqlalchemy as db -from sqlalchemy.orm import sessionmaker, relationship -from sqlalchemy.orm.util import has_identity -from sqlalchemy.orm.attributes import init_collection -from sqlalchemy.ext.declarative import declarative_base - -import bonnie -conf = bonnie.getConf() -log = bonnie.getLogger('bonnie.broker.persistence') - -PersistentBase = declarative_base() - -# an db engine, which the Session will use for connection resources -engine = db.create_engine(conf.get('broker', 'persistence_sql_uri', 'sqlite://')) - -# create a configured "Session" class -Session = sessionmaker(bind=engine) -session = Session() - - -class PlistCollection(list): - """ - Extended list collection to add some handy utility methods - """ - def delete(self, item): - self.remove(item) - if has_identity(item): - session.delete(item) - else: - session.expunge(item) - -class PersistentList(PersistentBase): - """ - Container class representing a persistent list object - """ - __tablename__ = 'plists' - listname = db.Column(db.String(32), primary_key=True) - # items = relationship('PlistItem') - - def __init__(self, name): - self.listname = name - - -#### module functions - -__list_classes = {} -__list_instances = {} - -def List(name, _type): - """ - Factory function to return a list-like collection with persistent storage capabilities - """ - if __list_instances.has_key(name): - return __list_instances[name].items - - # create new list class to handle items (relations) of the given type - _class_name = 'PersistentList' + _type.__name__ - - if __list_classes.has_key(_class_name): - _plistclass = __list_classes[_class_name] - else: - # determine foreign key type - if hasattr(_type, '__key__') and _type.__table__.columns.has_key(_type.__key__): - reltype = _type.__table__.columns[_type.__key__].type - elif hasattr(_type.__table__, 'primary_key'): - for col in _type.__table__.primary_key.columns: - _type.__key__ = col.name - reltype = col.type - break - else: - _type.__key__ = 'id' - reltype = db.String(256) - - # we establish a many-to-many relation using an association table - association_table = db.Table('plistitems_' + _type.__tablename__, PersistentBase.metadata, - db.Column('listname', db.Integer, db.ForeignKey('plists.listname'), index=True), - db.Column('ref', reltype, db.ForeignKey(_type.__tablename__ + '.' + _type.__key__), index=True) - ) - - # create a list container class with a relationship to the list item type - _plistclass = type(_class_name, (PersistentList,), { - 'items': relationship(_type, secondary=association_table, backref='list', collection_class=PlistCollection) - }) - __list_classes[_class_name] = _plistclass - - PersistentBase.metadata.create_all(engine) - - try: - lst = session.query(_plistclass).filter(PersistentList.listname == name).one() - except: - lst = _plistclass(name) - session.add(lst) - - # remember this instance for later calls and to avoid garbage collection - __list_instances[name] = lst - - # return the collection og list items - return lst.items - - -def syncronize(): - """ - Synchronize to persistent storage - """ - if len(session.new) > 0 or len(session.dirty) > 0 or len(session.deleted) > 0: - log.debug("session.commit(); new=%r; dirty=%r; deleted=%r" % (session.new, session.dirty, session.deleted), level=9) - session.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -23,345 +23,473 @@ This is the ZMQ broker implementation for Bonnie. """ -import copy +from multiprocessing import Process import random import re import sys +import threading import time import zmq +from zmq.eventloop import ioloop, zmqstream import bonnie conf = bonnie.getConf() log = bonnie.getLogger('bonnie.broker.ZMQBroker') -from job import Job -from collector import Collector -from worker import Worker -from bonnie.broker import persistence +import collector +import job +import worker class ZMQBroker(object): - MAX_RETRIES = 5 - running = False - def __init__(self): - self.worker_jobs = persistence.List('worker_jobs', Job) - self.collect_jobs = persistence.List('collect_jobs', Job) - self.collectors = {} - self.workers = {} - self.collector_interests = [] + """ + A ZMQ Broker for Bonnie. + """ + self.context = zmq.Context(io_threads=128) + self.poller = zmq.Poller() + + self.routers = {} def register(self, callback): callback({ '_all': self.run }) - def collector_add(self, _collector_id, _state, _interests): - log.debug("Adding collector %s for %r" % (_collector_id, _interests), level=5) + def create_router(self, name, default, **kw): + """ + Create a regular router that remains + available for the base ZMQBroker. + """ + bind_address = conf.get( + 'broker', + 'zmq_%s_router_bind_address' % (name) + ) - collector = Collector(_collector_id, _state, _interests) - self.collectors[_collector_id] = collector + if bind_address == None: + bind_address = default - # regisrer the reported interests - if len(_interests) > 0: - _interests.extend(self.collector_interests) - self.collector_interests = list(set(_interests)) + router = self.context.socket(zmq.ROUTER) + router.bind(bind_address) - def collect_job_allocate(self, _collector_id): - jobs = [x for x in self.collect_jobs if x.collector_id == _collector_id and x.state == b"PENDING"] + self.routers[name] = { + 'router': router, + 'callbacks': dict(kw) + } - if len(jobs) < 1: - return + self.poller.register(router, zmq.POLLIN) - job = jobs.pop() - job.set_status(b"ALLOC") - return job + return router - def collect_jobs_with_status(self, _state, collector_id=None): - return [x for x in self.collect_jobs if x.state == _state and x.collector_id == collector_id] + def create_router_process(self, name, default, callback): + """ + Create a router process (that is no + longer available to the base ZMQBroker). + """ - def collector_set_status(self, _collector_id, _state, _interests): - if not self.collectors.has_key(_collector_id): - self.collector_add(_collector_id, _state, _interests) - else: - self.collectors[_collector_id].set_status(_state) + bind_address = conf.get( + 'broker', + 'zmq_%s_router_bind_address' % (name) + ) - def collectors_with_status(self, _state): - return [collector_id for collector_id, collector in self.collectors.iteritems() if collector.state == _state] + if bind_address == None: + bind_address = default - def worker_job_add(self, _notification, client_id=None, collector_id=None): - """ - Add a new job. - """ - job = Job( - notification=_notification, - state=b"PENDING", - client_id=client_id, - collector_id=collector_id, + setattr( + self, + '%s_router' % (name), + Process( + target=self._run_router_process, + args=(callback, bind_address) + ) ) - if not job.uuid in [x.uuid for x in self.worker_jobs]: - self.worker_jobs.append(job) + getattr(self, '%s_router' % (name)).start() - log.debug("New worker job: %s; client=%s, collector=%s" % (job.uuid, client_id, collector_id), level=8) + def find_router(self, socket): + for name, attrs in self.routers.iteritems(): + if attrs['router'] == socket: + return name, attrs['router'], attrs['callbacks'] - def worker_job_allocate(self, _worker_id): - """ - Allocate a job to a worker, should a job be available. - """ + def run(self): + self.create_router( + 'collector', + 'tcp://*:5571', + recv_multipart = self._cb_cr_recv_multipart + ) - if len(self.worker_jobs) < 1: - return None + self.create_router_process( + 'dealer', + 'tcp://*:5570', + self._cb_dr_on_recv_stream + ) - jobs = self.worker_jobs_with_status(b"PENDING") - if len(jobs) < 1: - return None + self.create_router( + 'worker', + 'tcp://*:5573', + recv_multipart = self._cb_wr_recv_multipart + ) - # take the first job in the queue - job = jobs[0] + self.create_router( + 'worker_controller', + 'tcp://*:5572', + recv_multipart = self._cb_wcr_recv_multipart + ) - job.set_status(b"ALLOC") - job.set_worker(_worker_id) + last_expire = time.time() + last_state = time.time() + + while True: + # TODO: adjust polling timout according to the number of pending jobs + sockets = dict(self.poller.poll(100)) + + for socket, event in sockets.iteritems(): + if event == zmq.POLLIN: + name, router, callbacks = self.find_router(socket) + + for callforward, callback in callbacks.iteritems(): + result = getattr(router, '%s' % callforward)() + callback(router, result) + + for _collector in collector.select_by_state(b'READY'): + self._send_collector_job(_collector.identity) + + for _worker in worker.select_by_state(b'READY'): + self._send_worker_job(_worker.identity) + + if last_expire < (time.time() - 30): + collector.expire() + worker.expire()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py
Changed
@@ -1,15 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime import time -class Collector(object): - state = None - job = None +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +from bonnie.broker.state import init_db, Collector, Job, Interest + +def add(identity, state = b'READY', interests = []): + db = init_db('collectors') + db.add(Collector(identity, state)) + db.commit() + collector = db.query(Collector).filter_by(identity=identity).first() + for cmd in interests: + interest = db.query(Interest).filter_by(cmd=cmd).first() + + if interest == None: + db.add(Interest(cmd)) + db.commit() + interest = db.query(Interest).filter_by(cmd=cmd).first() + + collector.interests.append(interest) + + db.commit() + +def count(): + db = init_db('collectors') + result = db.query(Collector).count() + return result + +def count_by_state(state): + db = init_db('collectors') + result = db.query(Collector).filter_by(state=state).count() + return result + +def select(identity): + db = init_db('collectors') + collector = db.query(Collector).filter_by(identity=identity).first() + return collector + +def select_by_state(state): + db = init_db('collectors') + collectors = db.query(Collector).filter_by(state=state).all() + return collectors + +def set_state(identity, state, interests = []): + db = init_db('collectors') + + collector = db.query(Collector).filter_by(identity=identity).first() + + if collector == None: + db.add(Collector(identity, state)) + db.commit() + collector = db.query(Collector).filter_by(identity=identity).first() + else: + collector.state = state + collector.timestamp = datetime.datetime.utcnow() + db.commit() + + if state == b'READY': + collector.job = None + + for cmd in interests: + interest = db.query(Interest).filter_by(cmd=cmd).first() + + if interest == None: + db.add(Interest(cmd)) + db.commit() + interest = db.query(Interest).filter_by(cmd=cmd).first() + + collector.interests.append(interest) + + db.commit() + +def update(identity, **kw): + db = init_db('collectors') + + collector = db.query(Collector).filter_by(identity=identity).first() + + for attr, value in kw.iteritems(): + setattr(collector, attr, value) + + db.commit() + +def expire(): + db = init_db('collectors') - def __init__(self, collector_id=None, state=b"READY", interests=[]): - self.collector_id = collector_id - self.state = state - self.interests = interests - self.timestamp = time.time() + for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state != b'STALE').all(): + log.debug("Marking collector %s as stale" % (collector.identity), level=7) + if not collector.job == None: + _job = db.query(Job).filter_by(id=collector.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() - def set_status(self, state): - self.state = state - self.timestamp = time.time() + collector.state = b'STALE' + collector.timestamp = datetime.datetime.utcnow() + db.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -19,47 +19,188 @@ # USA. # -import hashlib +import datetime import time -from bonnie.broker.persistence import db, PersistentBase - -class Job(PersistentBase): - __tablename__ = 'jobs' - # use binary types because ZMQ requires binary strings - uuid = db.Column(db.LargeBinary(128), primary_key=True) - type = db.Column(db.String(16)) - state = db.Column(db.String(16)) - timestamp = db.Column(db.Float) - notification = db.Column(db.LargeBinary) - worker_id = db.Column(db.String(64)) - client_id = db.Column(db.String(64)) - collector_id = db.Column(db.LargeBinary(64)) - command = db.Column(db.LargeBinary(32)) - - def __init__(self, state=None, notification=None, worker_id=None, client_id=None, collector_id=None): - self.uuid = "%s.%s" % (hashlib.sha224(notification).hexdigest(), time.time()) - self.state = state - self.notification = notification - self.worker_id = worker_id - self.client_id = client_id - self.collector_id = collector_id - self.timestamp = time.time() - self.command = None - self.retries = 0 - - if self.client_id == None: - if self.collector_id == None: - self.type = None - else: - self.type = 'Collector' - else: - self.type = 'Dealer' - - def set_status(self, state): - self.state = state - - def set_worker(self, worker_id): - self.worker_id = worker_id - - def set_command(self, cmd): - self.command = cmd + +from sqlalchemy.exc import IntegrityError + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +from bonnie.broker.state import init_db, Collector, Job, Worker + +def add(dealer, notification, job_type='worker'): + """ + Add a new job. + """ + db = init_db('jobs') + try: + db.add(Job(dealer, notification, job_type)) + db.commit() + except IntegrityError, errmsg: + db.rollback() + +def count(): + db = init_db('jobs') + result = db.query(Job).count() + return result + +def count_by_state(state): + db = init_db('jobs') + result = db.query(Job).filter_by(state=state).count() + return result + +def count_by_type(job_type): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type).all() + return result + +def count_by_type_and_state(job_type, state): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type, state=state).count() + return result + +def select(job_uuid): + db = init_db('jobs') + result = db.query(Job).filter_by(uuid=job_uuid).first() + return result + +def select_all(): + db = init_db('jobs') + result = db.query(Job).all() + return result + +def select_by_state(state): + db = init_db('jobs') + result = db.query(Job).filter_by(state=state).all() + return result + +def select_by_type(job_type): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type).all() + return result + +def select_for_collector(identity): + db = init_db('jobs') + + collector = db.query(Collector).filter(Collector.identity==identity, Collector.job == None, Collector.state == b'READY').first() + + if collector == None: + return + + # This is influenced by .update(), which resets the .timestamp to + # .utcnow(), effectively pushing all updated jobs to the back of the + # queue. + # + # Practical result is a massive amount of metadata gathering + # followed by a sudden surge of jobs getting DONE. + #job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').order_by(Job.timestamp).first() + + # This would result in "most recent first, work your way backwards." + #job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').order_by(Job.timestamp.desc()).first() + + # This results in following the storage order and is by far the + # fastest methodology. + job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').first() + + if not job == None: + job.state = b'ALLOC' + job.timestamp = datetime.datetime.utcnow() + db.commit() + + return job + +def select_for_worker(identity): + db = init_db('jobs') + + worker = db.query(Worker).filter(Worker.identity == identity, Worker.job == None, Worker.state == b'READY').first() + + if worker == None: + return + + # This is influenced by .update(), which resets the .timestamp to + # .utcnow(), effectively pushing all updated jobs to the back of the + # queue. + # + # Practical result is a massive amount of metadata gathering + # followed by a sudden surge of jobs getting DONE. + #job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').order_by(Job.timestamp).first() + + # This would result in "most recent first, work your way backwards." + #job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').order_by(Job.timestamp.desc()).first() + + # This results in following the storage order and is by far the + # fastest methodology. + job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').first() + + if not job == None: + job.state = b'ALLOC' + job.timestamp = datetime.datetime.utcnow() + db.commit() + + return job + +def set_state(uuid, state): + db = init_db('jobs') + for job in db.query(Job).filter_by(uuid=uuid).all(): + job.state = state + job.timestamp = datetime.datetime.utcnow() + db.commit() + +def set_job_type(uuid, job_type): + db = init_db('jobs') + job = db.query(Job).filter_by(uuid=uuid).first() + job.job_type = job_type + job.timestamp = datetime.datetime.utcnow() + db.commit() + +def update(job_uuid, **kw): + db = init_db('jobs') + job = db.query(Job).filter_by(uuid=job_uuid).first() + + if job == None: + return + + for attr, value in kw.iteritems(): + setattr(job, attr, value) + + job.timestamp = datetime.datetime.utcnow() + + db.commit() + +def expire(): + """ + Unlock jobs that have been allocated to some worker way too long
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/worker.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/worker.py
Changed
@@ -1,15 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime import time -class Worker(object): - state = None - job = None +from bonnie.broker.state import init_db, Job, Worker + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +def add(identity, state = b'READY'): + db = init_db('workers') + db.add(Worker(identity, state)) + db.commit() + +def count(): + db = init_db('workers') + result = db.query(Worker).count() + return result + +def count_by_state(state): + db = init_db('workers') + result = db.query(Worker).filter_by(state=state).count() + return result + +def select_by_state(state): + db = init_db('workers') + result = db.query(Worker).filter_by(state=state).all() + return result + +def select(identity): + db = init_db('workers') + result = db.query(Worker).filter_by(identity=identity).first() + return result + +def set_job(identity, job_uuid): + db = init_db('workers') + job = db.query(Job).filter_by(uuid=job_uuid).first() + + if job == None: + return + + worker = db.query(Worker).filter_by(identity=identity).first() + worker.job = job.id + job.worker = worker.id + db.commit() + +def set_state(identity, state): + db = init_db('workers') + worker = db.query(Worker).filter_by(identity=identity).first() + + if worker == None: + db.add(Worker(identity, state)) + db.commit() + + else: + worker.state = state + worker.timestamp = datetime.datetime.utcnow() + db.commit() + +def update(identity, **kw): + db = init_db('identity') + worker = db.query(Worker).filter_by(identity=identity).first() + + if worker == None: + db.add(Worker(identity, b'READY')) + else: + for attr, value in kw.iteritems(): + setattr(worker, attr, value) + + worker.timestamp = datetime.datetime.utcnow() + + db.commit() + +def expire(): + db = init_db('workers') + for worker in db.query(Worker).filter(Worker.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Worker.state == b'STALE').all(): + log.debug("Purging worker %s as very stale" % (worker.identity), level=7) + + if not worker.job == None: + _job = db.query(Job).filter_by(id=worker.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() + + db.delete(worker) + db.commit() - def __init__(self, worker_id=None, state=b"READY", job=None): - self.worker_id = worker_id - self.state = state - self.job = job - self.timestamp = time.time() + for worker in db.query(Worker).filter(Worker.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Worker.state != b'STALE').all(): + log.debug("Marking worker %s as stale" % (worker.identity), level=7) + if not worker.job == None: + _job = db.query(Job).filter_by(id=worker.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() - def set_status(self, state): - self.state = state - self.timestamp = time.time() + worker.state = b'STALE' + worker.timestamp = datetime.datetime.utcnow() + db.commit()
View file
bonnie-0.2.0.tar.gz/bonnie/broker/state.py
Added
@@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime +import hashlib +import re + +import sqlalchemy + +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import ForeignKey +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import Text + +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship + +try: + from sqlalchemy.orm import relationship +except: + from sqlalchemy.orm import relation as relationship + +try: + from sqlalchemy.orm import sessionmaker +except: + from sqlalchemy.orm import create_session + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.state') + +DeclarativeBase = declarative_base() + +db = None + +collector_interest_table = Table( + 'collector_interest', + DeclarativeBase.metadata, + Column('collector_id', Integer, ForeignKey('collector.id')), + Column('interest_id', Integer, ForeignKey('interest.id')) + ) + +## +## Classes +## + +class Collector(DeclarativeBase): + __tablename__ = 'collector' + + id = Column(Integer, primary_key=True) + identity = Column(String(128)) + state = Column(String(16)) + job = Column(Integer, ForeignKey('job.id')) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + interests = relationship( + "Interest", + secondary=collector_interest_table + ) + + def __init__(self, identity, state = b'READY'): + DeclarativeBase.__init__(self) + self.identity = identity + self.state = state + +class Interest(DeclarativeBase): + __tablename__ = 'interest' + + id = Column(Integer, primary_key=True) + cmd = Column(String(16), nullable=False) + + def __init__(self, cmd): + self.cmd = cmd + +class Job(DeclarativeBase): + __tablename__ = 'job' + + id = Column(Integer, primary_key=True) + uuid = Column(String(128), nullable=False, unique=True) + dealer = Column(String(128), nullable=False) + collector = Column(String(128), nullable=False) + notification = Column(Text) + job_type = Column(String(16), default='worker') + state = Column(String(16)) + cmd = Column(String(256)) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + pushbacks = Column(Integer, default=0) + + def __init__(self, dealer, notification, job_type='worker'): + DeclarativeBase.__init__(self) + self.uuid = hashlib.sha224(notification).hexdigest() + self.dealer = dealer + self.collector = re.sub(r'-[0-9]+$', '', dealer).replace('Dealer', 'Collector') + self.notification = notification + self.state = b'PENDING' + self.job_type = job_type + self.timestamp = datetime.datetime.utcnow() + self.cmd = None + self.pushbacks = 0 + +class Worker(DeclarativeBase): + __tablename__ = 'worker' + + id = Column(Integer, primary_key=True) + identity = Column(String(128)) + state = Column(String(16)) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + job = Column(Integer, ForeignKey('job.id')) + + def __init__(self, identity, state): + DeclarativeBase.__init__(self) + self.identity = identity + self.state = state + +def init_db(name, reinit=False): + """ + Returns a SQLAlchemy Session() instance. + """ + global db + + if not db == None and not reinit: + return db + + if reinit: + import os + if os.path.isfile('sqlite:////var/lib/bonnie/state.db'): + os.unlink('sqlite:////var/lib/bonnie/state.db') + + db_uri = 'sqlite:////var/lib/bonnie/state.db' + echo = conf.debuglevel > 8 + + try: + engine = create_engine(db_uri, echo=echo) + DeclarativeBase.metadata.create_all(engine) + except: + engine = create_engine('sqlite:////tmp/state.db') + DeclarativeBase.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) + db = Session() + + return db
View file
bonnie-0.1.3.tar.gz/bonnie/collector/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/collector/__init__.py
Changed
@@ -43,15 +43,16 @@ self.handler_interests = {} self.handler_modules = {} - def execute(self, command, notification): + def execute(self, commands, notification): """ Dispatch collector job to the according handler(s) """ - log.debug("Executing collection command %s" % (command), level=8) + log.debug("Executing collection for %s" % (commands), level=8) - if self.handler_interests.has_key(command): - for interest in self.handler_interests[command]: - notification = interest['callback'](notification=notification) + for command in commands.split(): + if self.handler_interests.has_key(command): + for interest in self.handler_interests[command]: + notification = interest['callback'](notification=notification) return notification
View file
bonnie-0.1.3.tar.gz/bonnie/collector/handlers/ldapdata.py -> bonnie-0.2.0.tar.gz/bonnie/collector/handlers/ldapdata.py
Changed
@@ -40,7 +40,7 @@ self.pykolab_conf.finalize_conf(fatal=False) self.ldap = Auth() - self.connections = 0 + self.ldap.connect() def register(self, callback): interests = { @@ -54,10 +54,7 @@ log.debug("GETUSERDATA for %r" % (notification), level=9) if notification.has_key('user'): - self.connections += 1 - try: - self.ldap.connect() user_dn = self.ldap.find_user_dn(notification['user'], True) log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8) except Exception, e: @@ -78,9 +75,4 @@ notification['user_data'] = user_rec - self.connections -= 1 - - if self.connections == 0: - self.ldap.disconnect() - return json.dumps(notification)
View file
bonnie-0.1.3.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.2.0.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -65,7 +65,7 @@ def report_state(self, interests=[]): log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) - self.collector.send_multipart([b"STATE", self.state, ",".join(self.interests)]) + self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) self.report_timestamp = time.time() def run(self, callback=None, interests=[]): @@ -79,7 +79,7 @@ while self.running: try: - sockets = dict(self.poller.poll(1000)) + sockets = dict(self.poller.poll(1)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -108,6 +108,7 @@ if not callback == None: result = callback(_message[0], _notification) + self.report_timestamp = time.time() self.collector.send_multipart([b"DONE", _job_uuid, result]) log.info("[%s] shutting down", self.identity)
View file
bonnie-0.1.3.tar.gz/bonnie/worker/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/worker/__init__.py
Changed
@@ -177,12 +177,11 @@ jobs.extend(_jobs) # trigger storage modules which registered interest in particular notification properties - if len(jobs) == 0: - for prop,storage_interests in self.storage_interests.iteritems(): - if notification.has_key(prop): - for interest in storage_interests: - (notification, _jobs) = self.interest_callback(interest, notification) - jobs.extend(_jobs) + for prop,storage_interests in self.storage_interests.iteritems(): + if notification.has_key(prop): + for interest in storage_interests: + (notification, _jobs) = self.interest_callback(interest, notification) + jobs.extend(_jobs) # finally send notification to output handlers if no jobs remaining if len(jobs) == 0 and not notification.has_key('_suppress_output') and not event in self.output_exclude_events: @@ -196,6 +195,8 @@ (notification, _jobs) = self.interest_callback(interest, notification) jobs.extend(_jobs) + log.debug("Event notification %r processed. Jobs = %r" % (event, jobs), level=7) + return notification, list(set(jobs)) def input_report(self):
View file
bonnie-0.1.3.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.2.0.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -40,7 +40,7 @@ def __init__(self, *args, **kw): self.state = b"READY" - self.job_id = None + self.job_uuid = None self.lastping = 0 self.report_timestamp = 0 @@ -52,7 +52,12 @@ def report_state(self): log.debug("[%s] reporting state: %s" % (self.identity, self.state), level=8) - self.controller.send_multipart([b"STATE", self.state]) + message = [b"STATE", self.state] + + if not self.job_uuid == None and self.state == b'BUSY': + message.append(self.job_uuid) + + self.controller.send_multipart(message) self.report_timestamp = time.time() def run(self, callback=None, report=None): @@ -89,7 +94,7 @@ while self.running: try: - sockets = dict(self.poller.poll(1000)) + sockets = dict(self.poller.poll(1)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -115,8 +120,8 @@ self.report_state() else: - _job_id = _message[1] - self.take_job(_job_id) + self.job_uuid = _message[1] + self.take_job(self.job_uuid) if self.worker in sockets: if sockets[self.worker] == zmq.POLLIN: @@ -124,22 +129,21 @@ log.debug("[%s] Worker message: %r" % (self.identity, _message), level=9) if _message[0] == "JOB": - _job_uuid = _message[1] + self.job_uuid = _message[1] # TODO: Sanity checking - #if _message[1] == self.job_id: + #if _message[1] == self.job_uuid: + #jobs = [] if not callback == None: (notification, jobs) = callback(_message[2]) else: jobs = [] if len(jobs) == 0: - self.controller.send_multipart([b"DONE", _job_uuid]) + self.controller.send_multipart([b"DONE", self.job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) - - for job in jobs: - self.controller.send_multipart([job, _job_uuid]) + self.controller.send_multipart([b"COLLECT", self.job_uuid, b" ".join(jobs)]) self.set_state_ready() @@ -150,20 +154,19 @@ log.info("[%s] shutting down", self.identity) self.worker.close() - def set_state_busy(self): + def set_state_busy(self, _job_id): log.debug("[%s] Set state to BUSY" % (self.identity), level=9) - self.controller.send_multipart([b"STATE", b"BUSY", b"%s" % (self.job_id)]) + self.report_timestamp = time.time() self.state = b"BUSY" + self.job_uuid = _job_id def set_state_ready(self): log.debug("[%s] Set state to READY" % (self.identity), level=9) - self.controller.send_multipart([b"STATE", b"READY"]) + self.report_timestamp = time.time() self.state = b"READY" - self.job_id = None + self.job_uuid = None def take_job(self, _job_id): log.debug("[%s] Accept job %s" % (self.identity, _job_id), level=9) - self.set_state_busy() + self.set_state_busy(_job_id) self.worker.send_multipart([b"GET", _job_id]) - self.job_id = _job_id -
View file
bonnie-0.1.3.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.2.0.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -34,27 +34,28 @@ Convert the given event notification record into a valid log entry """ keymap = { - 'timestamp': None, - 'clientIP': 'client_ip', - 'clientPort': None, - 'serverPort': None, - 'serverDomain': 'domain', - 'aclRights': 'acl_rights', - 'aclSubject': 'acl_subject', - 'mailboxID': 'mailbox_id', - 'messageSize': 'message_size', - 'messageHeaders': None, - 'messageContent': None, - 'bodyStructure': None, - 'metadata': None, - 'acl': None, - 'flagNames': 'flag_names', - 'diskUsed': 'disk_used', - 'vnd.cmu.oldUidset': 'olduidset', - 'vnd.cmu.sessionId': 'session_id', - 'vnd.cmu.midset': 'message_id', - 'vnd.cmu.unseenMessages': 'unseen_messages', - } + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': None, + 'messageContent': None, + 'bodyStructure': None, + 'metadata': None, + 'acl': None, + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'olduidset', + 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', + } + log = { '@version': bonnie.API_VERSION } for key,val in notification.iteritems(): newkey = keymap[key] if keymap.has_key(key) else key
View file
bonnie-0.1.3.tar.gz/bonnie/worker/storage/elasticsearch_storage.py -> bonnie-0.2.0.tar.gz/bonnie/worker/storage/elasticsearch_storage.py
Changed
@@ -364,6 +364,7 @@ now = int(time.time()) base_uri = re.sub(';.+$', '', notification[attrib]) + jobs = [] log.debug("Resolve folder for %r = %r" % (attrib, base_uri), level=8) @@ -375,12 +376,16 @@ # mailbox resolving requires metadata if not notification.has_key('metadata'): log.debug("Adding GETMETADATA job", level=8) - return (notification, [ b"GETMETADATA" ]) + jobs.append(b"GETMETADATA") # before creating a folder entry, we should collect folder ACLs if not notification.has_key('acl'): log.debug("Adding GETACL", level=8) - return (notification, [ b"GETACL" ]) + jobs.append(b"GETACL") + + # reject notification with additional collector jobs + if len(jobs) > 0: + return (notification, jobs) # extract folder properties and a unique identifier from the notification folder = self.notificaton2folder(notification)
View file
bonnie-0.1.3.tar.gz/conf/bonnie.conf -> bonnie-0.2.0.tar.gz/conf/bonnie.conf
Changed
@@ -1,7 +1,7 @@ [broker] zmq_dealer_router_bind_address = tcp://*:5570 zmq_collector_router_bind_address = tcp://*:5571 -zmq_controller_bind_address = tcp://*:5572 +zmq_worker_controller_router_bind_address = tcp://*:5572 zmq_worker_router_bind_address = tcp://*:5573 persistence_sql_uri = sqlite://
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.