Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
We truncated the diff of some files because they were too big. If you want to see the full diff for every file,
click here
.
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 35
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.1.3 +Version: 0.2.0 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware @@ -60,7 +60,7 @@ Summary: The Bonnie broker Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-sqlalchemy +Requires: python-sqlalchemy >= 0.8.0 Requires: python-tornado Requires: python-zmq @@ -71,7 +71,6 @@ Summary: The Bonnie collector Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-tornado Requires: python-zmq %description collector @@ -92,7 +91,6 @@ Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} Requires: python-elasticsearch >= 1.0 -Requires: python-tornado Requires: python-zmq %description worker
View file
bonnie-0.1.3.tar.gz/bonnie/broker/persistence.py
Deleted
@@ -1,128 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) -# -# Thomas Bruederli <bruederli at kolabsys.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; version 3 or, at your option, any later -# version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Library General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, -# USA. -# - -import sqlalchemy as db -from sqlalchemy.orm import sessionmaker, relationship -from sqlalchemy.orm.util import has_identity -from sqlalchemy.orm.attributes import init_collection -from sqlalchemy.ext.declarative import declarative_base - -import bonnie -conf = bonnie.getConf() -log = bonnie.getLogger('bonnie.broker.persistence') - -PersistentBase = declarative_base() - -# an db engine, which the Session will use for connection resources -engine = db.create_engine(conf.get('broker', 'persistence_sql_uri', 'sqlite://')) - -# create a configured "Session" class -Session = sessionmaker(bind=engine) -session = Session() - - -class PlistCollection(list): - """ - Extended list collection to add some handy utility methods - """ - def delete(self, item): - self.remove(item) - if has_identity(item): - session.delete(item) - else: - session.expunge(item) - -class PersistentList(PersistentBase): - """ - Container class representing a persistent list object - """ - __tablename__ = 'plists' - listname = db.Column(db.String(32), primary_key=True) - # items = relationship('PlistItem') - - def __init__(self, name): - self.listname = name - - -#### module functions - -__list_classes = {} -__list_instances = {} - -def List(name, _type): - """ - Factory function to return a list-like collection with persistent storage capabilities - """ - if __list_instances.has_key(name): - return __list_instances[name].items - - # create new list class to handle items (relations) of the given type - _class_name = 'PersistentList' + _type.__name__ - - if __list_classes.has_key(_class_name): - _plistclass = __list_classes[_class_name] - else: - # determine foreign key type - if hasattr(_type, '__key__') and _type.__table__.columns.has_key(_type.__key__): - reltype = _type.__table__.columns[_type.__key__].type - elif hasattr(_type.__table__, 'primary_key'): - for col in _type.__table__.primary_key.columns: - _type.__key__ = col.name - reltype = col.type - break - else: - _type.__key__ = 'id' - reltype = db.String(256) - - # we establish a many-to-many relation using an association table - association_table = db.Table('plistitems_' + _type.__tablename__, PersistentBase.metadata, - db.Column('listname', db.Integer, db.ForeignKey('plists.listname'), index=True), - db.Column('ref', reltype, db.ForeignKey(_type.__tablename__ + '.' + _type.__key__), index=True) - ) - - # create a list container class with a relationship to the list item type - _plistclass = type(_class_name, (PersistentList,), { - 'items': relationship(_type, secondary=association_table, backref='list', collection_class=PlistCollection) - }) - __list_classes[_class_name] = _plistclass - - PersistentBase.metadata.create_all(engine) - - try: - lst = session.query(_plistclass).filter(PersistentList.listname == name).one() - except: - lst = _plistclass(name) - session.add(lst) - - # remember this instance for later calls and to avoid garbage collection - __list_instances[name] = lst - - # return the collection og list items - return lst.items - - -def syncronize(): - """ - Synchronize to persistent storage - """ - if len(session.new) > 0 or len(session.dirty) > 0 or len(session.deleted) > 0: - log.debug("session.commit(); new=%r; dirty=%r; deleted=%r" % (session.new, session.dirty, session.deleted), level=9) - session.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -23,345 +23,473 @@ This is the ZMQ broker implementation for Bonnie. """ -import copy +from multiprocessing import Process import random import re import sys +import threading import time import zmq +from zmq.eventloop import ioloop, zmqstream import bonnie conf = bonnie.getConf() log = bonnie.getLogger('bonnie.broker.ZMQBroker') -from job import Job -from collector import Collector -from worker import Worker -from bonnie.broker import persistence +import collector +import job +import worker class ZMQBroker(object): - MAX_RETRIES = 5 - running = False - def __init__(self): - self.worker_jobs = persistence.List('worker_jobs', Job) - self.collect_jobs = persistence.List('collect_jobs', Job) - self.collectors = {} - self.workers = {} - self.collector_interests = [] + """ + A ZMQ Broker for Bonnie. + """ + self.context = zmq.Context(io_threads=128) + self.poller = zmq.Poller() + + self.routers = {} def register(self, callback): callback({ '_all': self.run }) - def collector_add(self, _collector_id, _state, _interests): - log.debug("Adding collector %s for %r" % (_collector_id, _interests), level=5) + def create_router(self, name, default, **kw): + """ + Create a regular router that remains + available for the base ZMQBroker. + """ + bind_address = conf.get( + 'broker', + 'zmq_%s_router_bind_address' % (name) + ) - collector = Collector(_collector_id, _state, _interests) - self.collectors[_collector_id] = collector + if bind_address == None: + bind_address = default - # regisrer the reported interests - if len(_interests) > 0: - _interests.extend(self.collector_interests) - self.collector_interests = list(set(_interests)) + router = self.context.socket(zmq.ROUTER) + router.bind(bind_address) - def collect_job_allocate(self, _collector_id): - jobs = [x for x in self.collect_jobs if x.collector_id == _collector_id and x.state == b"PENDING"] + self.routers[name] = { + 'router': router, + 'callbacks': dict(kw) + } - if len(jobs) < 1: - return + self.poller.register(router, zmq.POLLIN) - job = jobs.pop() - job.set_status(b"ALLOC") - return job + return router - def collect_jobs_with_status(self, _state, collector_id=None): - return [x for x in self.collect_jobs if x.state == _state and x.collector_id == collector_id] + def create_router_process(self, name, default, callback): + """ + Create a router process (that is no + longer available to the base ZMQBroker). + """ - def collector_set_status(self, _collector_id, _state, _interests): - if not self.collectors.has_key(_collector_id): - self.collector_add(_collector_id, _state, _interests) - else: - self.collectors[_collector_id].set_status(_state) + bind_address = conf.get( + 'broker', + 'zmq_%s_router_bind_address' % (name) + ) - def collectors_with_status(self, _state): - return [collector_id for collector_id, collector in self.collectors.iteritems() if collector.state == _state] + if bind_address == None: + bind_address = default - def worker_job_add(self, _notification, client_id=None, collector_id=None): - """ - Add a new job. - """ - job = Job( - notification=_notification, - state=b"PENDING", - client_id=client_id, - collector_id=collector_id, + setattr( + self, + '%s_router' % (name), + Process( + target=self._run_router_process, + args=(callback, bind_address) + ) ) - if not job.uuid in [x.uuid for x in self.worker_jobs]: - self.worker_jobs.append(job) + getattr(self, '%s_router' % (name)).start() - log.debug("New worker job: %s; client=%s, collector=%s" % (job.uuid, client_id, collector_id), level=8) + def find_router(self, socket): + for name, attrs in self.routers.iteritems(): + if attrs['router'] == socket: + return name, attrs['router'], attrs['callbacks'] - def worker_job_allocate(self, _worker_id): - """ - Allocate a job to a worker, should a job be available. - """ + def run(self): + self.create_router( + 'collector', + 'tcp://*:5571', + recv_multipart = self._cb_cr_recv_multipart + ) - if len(self.worker_jobs) < 1: - return None + self.create_router_process( + 'dealer', + 'tcp://*:5570', + self._cb_dr_on_recv_stream + ) - jobs = self.worker_jobs_with_status(b"PENDING") - if len(jobs) < 1: - return None + self.create_router( + 'worker', + 'tcp://*:5573', + recv_multipart = self._cb_wr_recv_multipart + ) - # take the first job in the queue - job = jobs[0] + self.create_router( + 'worker_controller', + 'tcp://*:5572', + recv_multipart = self._cb_wcr_recv_multipart + ) - job.set_status(b"ALLOC") - job.set_worker(_worker_id) + last_expire = time.time() + last_state = time.time() + + while True: + # TODO: adjust polling timout according to the number of pending jobs + sockets = dict(self.poller.poll(100)) + + for socket, event in sockets.iteritems(): + if event == zmq.POLLIN: + name, router, callbacks = self.find_router(socket) + + for callforward, callback in callbacks.iteritems(): + result = getattr(router, '%s' % callforward)() + callback(router, result) + + for _collector in collector.select_by_state(b'READY'): + self._send_collector_job(_collector.identity) + + for _worker in worker.select_by_state(b'READY'): + self._send_worker_job(_worker.identity) + + if last_expire < (time.time() - 30): + collector.expire() + worker.expire()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py
Changed
@@ -1,15 +1,120 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime import time -class Collector(object): - state = None - job = None +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +from bonnie.broker.state import init_db, Collector, Job, Interest + +def add(identity, state = b'READY', interests = []): + db = init_db('collectors') + db.add(Collector(identity, state)) + db.commit() + collector = db.query(Collector).filter_by(identity=identity).first() + for cmd in interests: + interest = db.query(Interest).filter_by(cmd=cmd).first() + + if interest == None: + db.add(Interest(cmd)) + db.commit() + interest = db.query(Interest).filter_by(cmd=cmd).first() + + collector.interests.append(interest) + + db.commit() + +def count(): + db = init_db('collectors') + result = db.query(Collector).count() + return result + +def count_by_state(state): + db = init_db('collectors') + result = db.query(Collector).filter_by(state=state).count() + return result + +def select(identity): + db = init_db('collectors') + collector = db.query(Collector).filter_by(identity=identity).first() + return collector + +def select_by_state(state): + db = init_db('collectors') + collectors = db.query(Collector).filter_by(state=state).all() + return collectors + +def set_state(identity, state, interests = []): + db = init_db('collectors') + + collector = db.query(Collector).filter_by(identity=identity).first() + + if collector == None: + db.add(Collector(identity, state)) + db.commit() + collector = db.query(Collector).filter_by(identity=identity).first() + else: + collector.state = state + collector.timestamp = datetime.datetime.utcnow() + db.commit() + + if state == b'READY': + collector.job = None + + for cmd in interests: + interest = db.query(Interest).filter_by(cmd=cmd).first() + + if interest == None: + db.add(Interest(cmd)) + db.commit() + interest = db.query(Interest).filter_by(cmd=cmd).first() + + collector.interests.append(interest) + + db.commit() + +def update(identity, **kw): + db = init_db('collectors') + + collector = db.query(Collector).filter_by(identity=identity).first() + + for attr, value in kw.iteritems(): + setattr(collector, attr, value) + + db.commit() + +def expire(): + db = init_db('collectors') - def __init__(self, collector_id=None, state=b"READY", interests=[]): - self.collector_id = collector_id - self.state = state - self.interests = interests - self.timestamp = time.time() + for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state != b'STALE').all(): + log.debug("Marking collector %s as stale" % (collector.identity), level=7) + if not collector.job == None: + _job = db.query(Job).filter_by(id=collector.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() - def set_status(self, state): - self.state = state - self.timestamp = time.time() + collector.state = b'STALE' + collector.timestamp = datetime.datetime.utcnow() + db.commit()
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -19,47 +19,188 @@ # USA. # -import hashlib +import datetime import time -from bonnie.broker.persistence import db, PersistentBase - -class Job(PersistentBase): - __tablename__ = 'jobs' - # use binary types because ZMQ requires binary strings - uuid = db.Column(db.LargeBinary(128), primary_key=True) - type = db.Column(db.String(16)) - state = db.Column(db.String(16)) - timestamp = db.Column(db.Float) - notification = db.Column(db.LargeBinary) - worker_id = db.Column(db.String(64)) - client_id = db.Column(db.String(64)) - collector_id = db.Column(db.LargeBinary(64)) - command = db.Column(db.LargeBinary(32)) - - def __init__(self, state=None, notification=None, worker_id=None, client_id=None, collector_id=None): - self.uuid = "%s.%s" % (hashlib.sha224(notification).hexdigest(), time.time()) - self.state = state - self.notification = notification - self.worker_id = worker_id - self.client_id = client_id - self.collector_id = collector_id - self.timestamp = time.time() - self.command = None - self.retries = 0 - - if self.client_id == None: - if self.collector_id == None: - self.type = None - else: - self.type = 'Collector' - else: - self.type = 'Dealer' - - def set_status(self, state): - self.state = state - - def set_worker(self, worker_id): - self.worker_id = worker_id - - def set_command(self, cmd): - self.command = cmd + +from sqlalchemy.exc import IntegrityError + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +from bonnie.broker.state import init_db, Collector, Job, Worker + +def add(dealer, notification, job_type='worker'): + """ + Add a new job. + """ + db = init_db('jobs') + try: + db.add(Job(dealer, notification, job_type)) + db.commit() + except IntegrityError, errmsg: + db.rollback() + +def count(): + db = init_db('jobs') + result = db.query(Job).count() + return result + +def count_by_state(state): + db = init_db('jobs') + result = db.query(Job).filter_by(state=state).count() + return result + +def count_by_type(job_type): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type).all() + return result + +def count_by_type_and_state(job_type, state): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type, state=state).count() + return result + +def select(job_uuid): + db = init_db('jobs') + result = db.query(Job).filter_by(uuid=job_uuid).first() + return result + +def select_all(): + db = init_db('jobs') + result = db.query(Job).all() + return result + +def select_by_state(state): + db = init_db('jobs') + result = db.query(Job).filter_by(state=state).all() + return result + +def select_by_type(job_type): + db = init_db('jobs') + result = db.query(Job).filter_by(job_type=job_type).all() + return result + +def select_for_collector(identity): + db = init_db('jobs') + + collector = db.query(Collector).filter(Collector.identity==identity, Collector.job == None, Collector.state == b'READY').first() + + if collector == None: + return + + # This is influenced by .update(), which resets the .timestamp to + # .utcnow(), effectively pushing all updated jobs to the back of the + # queue. + # + # Practical result is a massive amount of metadata gathering + # followed by a sudden surge of jobs getting DONE. + #job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').order_by(Job.timestamp).first() + + # This would result in "most recent first, work your way backwards." + #job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').order_by(Job.timestamp.desc()).first() + + # This results in following the storage order and is by far the + # fastest methodology. + job = db.query(Job).filter_by(collector=identity, job_type='collector', state=b'PENDING').first() + + if not job == None: + job.state = b'ALLOC' + job.timestamp = datetime.datetime.utcnow() + db.commit() + + return job + +def select_for_worker(identity): + db = init_db('jobs') + + worker = db.query(Worker).filter(Worker.identity == identity, Worker.job == None, Worker.state == b'READY').first() + + if worker == None: + return + + # This is influenced by .update(), which resets the .timestamp to + # .utcnow(), effectively pushing all updated jobs to the back of the + # queue. + # + # Practical result is a massive amount of metadata gathering + # followed by a sudden surge of jobs getting DONE. + #job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').order_by(Job.timestamp).first() + + # This would result in "most recent first, work your way backwards." + #job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').order_by(Job.timestamp.desc()).first() + + # This results in following the storage order and is by far the + # fastest methodology. + job = db.query(Job).filter_by(job_type='worker', state=b'PENDING').first() + + if not job == None: + job.state = b'ALLOC' + job.timestamp = datetime.datetime.utcnow() + db.commit() + + return job + +def set_state(uuid, state): + db = init_db('jobs') + for job in db.query(Job).filter_by(uuid=uuid).all(): + job.state = state + job.timestamp = datetime.datetime.utcnow() + db.commit() + +def set_job_type(uuid, job_type): + db = init_db('jobs') + job = db.query(Job).filter_by(uuid=uuid).first() + job.job_type = job_type + job.timestamp = datetime.datetime.utcnow() + db.commit() + +def update(job_uuid, **kw): + db = init_db('jobs') + job = db.query(Job).filter_by(uuid=job_uuid).first() + + if job == None: + return + + for attr, value in kw.iteritems(): + setattr(job, attr, value) + + job.timestamp = datetime.datetime.utcnow() + + db.commit() + +def expire(): + """ + Unlock jobs that have been allocated to some worker way too long
View file
bonnie-0.1.3.tar.gz/bonnie/broker/brokers/zmq_broker/worker.py -> bonnie-0.2.0.tar.gz/bonnie/broker/brokers/zmq_broker/worker.py
Changed
@@ -1,15 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime import time -class Worker(object): - state = None - job = None +from bonnie.broker.state import init_db, Job, Worker + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.ZMQBroker') + +def add(identity, state = b'READY'): + db = init_db('workers') + db.add(Worker(identity, state)) + db.commit() + +def count(): + db = init_db('workers') + result = db.query(Worker).count() + return result + +def count_by_state(state): + db = init_db('workers') + result = db.query(Worker).filter_by(state=state).count() + return result + +def select_by_state(state): + db = init_db('workers') + result = db.query(Worker).filter_by(state=state).all() + return result + +def select(identity): + db = init_db('workers') + result = db.query(Worker).filter_by(identity=identity).first() + return result + +def set_job(identity, job_uuid): + db = init_db('workers') + job = db.query(Job).filter_by(uuid=job_uuid).first() + + if job == None: + return + + worker = db.query(Worker).filter_by(identity=identity).first() + worker.job = job.id + job.worker = worker.id + db.commit() + +def set_state(identity, state): + db = init_db('workers') + worker = db.query(Worker).filter_by(identity=identity).first() + + if worker == None: + db.add(Worker(identity, state)) + db.commit() + + else: + worker.state = state + worker.timestamp = datetime.datetime.utcnow() + db.commit() + +def update(identity, **kw): + db = init_db('identity') + worker = db.query(Worker).filter_by(identity=identity).first() + + if worker == None: + db.add(Worker(identity, b'READY')) + else: + for attr, value in kw.iteritems(): + setattr(worker, attr, value) + + worker.timestamp = datetime.datetime.utcnow() + + db.commit() + +def expire(): + db = init_db('workers') + for worker in db.query(Worker).filter(Worker.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Worker.state == b'STALE').all(): + log.debug("Purging worker %s as very stale" % (worker.identity), level=7) + + if not worker.job == None: + _job = db.query(Job).filter_by(id=worker.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() + + db.delete(worker) + db.commit() - def __init__(self, worker_id=None, state=b"READY", job=None): - self.worker_id = worker_id - self.state = state - self.job = job - self.timestamp = time.time() + for worker in db.query(Worker).filter(Worker.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Worker.state != b'STALE').all(): + log.debug("Marking worker %s as stale" % (worker.identity), level=7) + if not worker.job == None: + _job = db.query(Job).filter_by(id=worker.job).first() + if not _job == None: + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() - def set_status(self, state): - self.state = state - self.timestamp = time.time() + worker.state = b'STALE' + worker.timestamp = datetime.datetime.utcnow() + db.commit()
View file
bonnie-0.2.0.tar.gz/bonnie/broker/state.py
Added
@@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import datetime +import hashlib +import re + +import sqlalchemy + +from sqlalchemy import Column +from sqlalchemy import DateTime +from sqlalchemy import ForeignKey +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import Text + +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship + +try: + from sqlalchemy.orm import relationship +except: + from sqlalchemy.orm import relation as relationship + +try: + from sqlalchemy.orm import sessionmaker +except: + from sqlalchemy.orm import create_session + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.broker.state') + +DeclarativeBase = declarative_base() + +db = None + +collector_interest_table = Table( + 'collector_interest', + DeclarativeBase.metadata, + Column('collector_id', Integer, ForeignKey('collector.id')), + Column('interest_id', Integer, ForeignKey('interest.id')) + ) + +## +## Classes +## + +class Collector(DeclarativeBase): + __tablename__ = 'collector' + + id = Column(Integer, primary_key=True) + identity = Column(String(128)) + state = Column(String(16)) + job = Column(Integer, ForeignKey('job.id')) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + interests = relationship( + "Interest", + secondary=collector_interest_table + ) + + def __init__(self, identity, state = b'READY'): + DeclarativeBase.__init__(self) + self.identity = identity + self.state = state + +class Interest(DeclarativeBase): + __tablename__ = 'interest' + + id = Column(Integer, primary_key=True) + cmd = Column(String(16), nullable=False) + + def __init__(self, cmd): + self.cmd = cmd + +class Job(DeclarativeBase): + __tablename__ = 'job' + + id = Column(Integer, primary_key=True) + uuid = Column(String(128), nullable=False, unique=True) + dealer = Column(String(128), nullable=False) + collector = Column(String(128), nullable=False) + notification = Column(Text) + job_type = Column(String(16), default='worker') + state = Column(String(16)) + cmd = Column(String(256)) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + pushbacks = Column(Integer, default=0) + + def __init__(self, dealer, notification, job_type='worker'): + DeclarativeBase.__init__(self) + self.uuid = hashlib.sha224(notification).hexdigest() + self.dealer = dealer + self.collector = re.sub(r'-[0-9]+$', '', dealer).replace('Dealer', 'Collector') + self.notification = notification + self.state = b'PENDING' + self.job_type = job_type + self.timestamp = datetime.datetime.utcnow() + self.cmd = None + self.pushbacks = 0 + +class Worker(DeclarativeBase): + __tablename__ = 'worker' + + id = Column(Integer, primary_key=True) + identity = Column(String(128)) + state = Column(String(16)) + timestamp = Column(DateTime, default=datetime.datetime.utcnow()) + job = Column(Integer, ForeignKey('job.id')) + + def __init__(self, identity, state): + DeclarativeBase.__init__(self) + self.identity = identity + self.state = state + +def init_db(name, reinit=False): + """ + Returns a SQLAlchemy Session() instance. + """ + global db + + if not db == None and not reinit: + return db + + if reinit: + import os + if os.path.isfile('sqlite:////var/lib/bonnie/state.db'): + os.unlink('sqlite:////var/lib/bonnie/state.db') + + db_uri = 'sqlite:////var/lib/bonnie/state.db' + echo = conf.debuglevel > 8 + + try: + engine = create_engine(db_uri, echo=echo) + DeclarativeBase.metadata.create_all(engine) + except: + engine = create_engine('sqlite:////tmp/state.db') + DeclarativeBase.metadata.create_all(engine) + + Session = sessionmaker(bind=engine) + db = Session() + + return db
View file
bonnie-0.1.3.tar.gz/bonnie/collector/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/collector/__init__.py
Changed
@@ -43,15 +43,16 @@ self.handler_interests = {} self.handler_modules = {} - def execute(self, command, notification): + def execute(self, commands, notification): """ Dispatch collector job to the according handler(s) """ - log.debug("Executing collection command %s" % (command), level=8) + log.debug("Executing collection for %s" % (commands), level=8) - if self.handler_interests.has_key(command): - for interest in self.handler_interests[command]: - notification = interest['callback'](notification=notification) + for command in commands.split(): + if self.handler_interests.has_key(command): + for interest in self.handler_interests[command]: + notification = interest['callback'](notification=notification) return notification
View file
bonnie-0.1.3.tar.gz/bonnie/collector/handlers/ldapdata.py -> bonnie-0.2.0.tar.gz/bonnie/collector/handlers/ldapdata.py
Changed
@@ -40,7 +40,7 @@ self.pykolab_conf.finalize_conf(fatal=False) self.ldap = Auth() - self.connections = 0 + self.ldap.connect() def register(self, callback): interests = { @@ -54,10 +54,7 @@ log.debug("GETUSERDATA for %r" % (notification), level=9) if notification.has_key('user'): - self.connections += 1 - try: - self.ldap.connect() user_dn = self.ldap.find_user_dn(notification['user'], True) log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8) except Exception, e: @@ -78,9 +75,4 @@ notification['user_data'] = user_rec - self.connections -= 1 - - if self.connections == 0: - self.ldap.disconnect() - return json.dumps(notification)
View file
bonnie-0.1.3.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.2.0.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -65,7 +65,7 @@ def report_state(self, interests=[]): log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) - self.collector.send_multipart([b"STATE", self.state, ",".join(self.interests)]) + self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) self.report_timestamp = time.time() def run(self, callback=None, interests=[]): @@ -79,7 +79,7 @@ while self.running: try: - sockets = dict(self.poller.poll(1000)) + sockets = dict(self.poller.poll(1)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -108,6 +108,7 @@ if not callback == None: result = callback(_message[0], _notification) + self.report_timestamp = time.time() self.collector.send_multipart([b"DONE", _job_uuid, result]) log.info("[%s] shutting down", self.identity)
View file
bonnie-0.1.3.tar.gz/bonnie/worker/__init__.py -> bonnie-0.2.0.tar.gz/bonnie/worker/__init__.py
Changed
@@ -177,12 +177,11 @@ jobs.extend(_jobs) # trigger storage modules which registered interest in particular notification properties - if len(jobs) == 0: - for prop,storage_interests in self.storage_interests.iteritems(): - if notification.has_key(prop): - for interest in storage_interests: - (notification, _jobs) = self.interest_callback(interest, notification) - jobs.extend(_jobs) + for prop,storage_interests in self.storage_interests.iteritems(): + if notification.has_key(prop): + for interest in storage_interests: + (notification, _jobs) = self.interest_callback(interest, notification) + jobs.extend(_jobs) # finally send notification to output handlers if no jobs remaining if len(jobs) == 0 and not notification.has_key('_suppress_output') and not event in self.output_exclude_events: @@ -196,6 +195,8 @@ (notification, _jobs) = self.interest_callback(interest, notification) jobs.extend(_jobs) + log.debug("Event notification %r processed. Jobs = %r" % (event, jobs), level=7) + return notification, list(set(jobs)) def input_report(self):
View file
bonnie-0.1.3.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.2.0.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -40,7 +40,7 @@ def __init__(self, *args, **kw): self.state = b"READY" - self.job_id = None + self.job_uuid = None self.lastping = 0 self.report_timestamp = 0 @@ -52,7 +52,12 @@ def report_state(self): log.debug("[%s] reporting state: %s" % (self.identity, self.state), level=8) - self.controller.send_multipart([b"STATE", self.state]) + message = [b"STATE", self.state] + + if not self.job_uuid == None and self.state == b'BUSY': + message.append(self.job_uuid) + + self.controller.send_multipart(message) self.report_timestamp = time.time() def run(self, callback=None, report=None): @@ -89,7 +94,7 @@ while self.running: try: - sockets = dict(self.poller.poll(1000)) + sockets = dict(self.poller.poll(1)) except KeyboardInterrupt, e: log.info("zmq.Poller KeyboardInterrupt") break @@ -115,8 +120,8 @@ self.report_state() else: - _job_id = _message[1] - self.take_job(_job_id) + self.job_uuid = _message[1] + self.take_job(self.job_uuid) if self.worker in sockets: if sockets[self.worker] == zmq.POLLIN: @@ -124,22 +129,21 @@ log.debug("[%s] Worker message: %r" % (self.identity, _message), level=9) if _message[0] == "JOB": - _job_uuid = _message[1] + self.job_uuid = _message[1] # TODO: Sanity checking - #if _message[1] == self.job_id: + #if _message[1] == self.job_uuid: + #jobs = [] if not callback == None: (notification, jobs) = callback(_message[2]) else: jobs = [] if len(jobs) == 0: - self.controller.send_multipart([b"DONE", _job_uuid]) + self.controller.send_multipart([b"DONE", self.job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) - - for job in jobs: - self.controller.send_multipart([job, _job_uuid]) + self.controller.send_multipart([b"COLLECT", self.job_uuid, b" ".join(jobs)]) self.set_state_ready() @@ -150,20 +154,19 @@ log.info("[%s] shutting down", self.identity) self.worker.close() - def set_state_busy(self): + def set_state_busy(self, _job_id): log.debug("[%s] Set state to BUSY" % (self.identity), level=9) - self.controller.send_multipart([b"STATE", b"BUSY", b"%s" % (self.job_id)]) + self.report_timestamp = time.time() self.state = b"BUSY" + self.job_uuid = _job_id def set_state_ready(self): log.debug("[%s] Set state to READY" % (self.identity), level=9) - self.controller.send_multipart([b"STATE", b"READY"]) + self.report_timestamp = time.time() self.state = b"READY" - self.job_id = None + self.job_uuid = None def take_job(self, _job_id): log.debug("[%s] Accept job %s" % (self.identity, _job_id), level=9) - self.set_state_busy() + self.set_state_busy(_job_id) self.worker.send_multipart([b"GET", _job_id]) - self.job_id = _job_id -
View file
bonnie-0.1.3.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.2.0.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -34,27 +34,28 @@ Convert the given event notification record into a valid log entry """ keymap = { - 'timestamp': None, - 'clientIP': 'client_ip', - 'clientPort': None, - 'serverPort': None, - 'serverDomain': 'domain', - 'aclRights': 'acl_rights', - 'aclSubject': 'acl_subject', - 'mailboxID': 'mailbox_id', - 'messageSize': 'message_size', - 'messageHeaders': None, - 'messageContent': None, - 'bodyStructure': None, - 'metadata': None, - 'acl': None, - 'flagNames': 'flag_names', - 'diskUsed': 'disk_used', - 'vnd.cmu.oldUidset': 'olduidset', - 'vnd.cmu.sessionId': 'session_id', - 'vnd.cmu.midset': 'message_id', - 'vnd.cmu.unseenMessages': 'unseen_messages', - } + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': None, + 'messageContent': None, + 'bodyStructure': None, + 'metadata': None, + 'acl': None, + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'olduidset', + 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', + } + log = { '@version': bonnie.API_VERSION } for key,val in notification.iteritems(): newkey = keymap[key] if keymap.has_key(key) else key
View file
bonnie-0.1.3.tar.gz/bonnie/worker/storage/elasticsearch_storage.py -> bonnie-0.2.0.tar.gz/bonnie/worker/storage/elasticsearch_storage.py
Changed
@@ -364,6 +364,7 @@ now = int(time.time()) base_uri = re.sub(';.+$', '', notification[attrib]) + jobs = [] log.debug("Resolve folder for %r = %r" % (attrib, base_uri), level=8) @@ -375,12 +376,16 @@ # mailbox resolving requires metadata if not notification.has_key('metadata'): log.debug("Adding GETMETADATA job", level=8) - return (notification, [ b"GETMETADATA" ]) + jobs.append(b"GETMETADATA") # before creating a folder entry, we should collect folder ACLs if not notification.has_key('acl'): log.debug("Adding GETACL", level=8) - return (notification, [ b"GETACL" ]) + jobs.append(b"GETACL") + + # reject notification with additional collector jobs + if len(jobs) > 0: + return (notification, jobs) # extract folder properties and a unique identifier from the notification folder = self.notificaton2folder(notification)
View file
bonnie-0.1.3.tar.gz/conf/bonnie.conf -> bonnie-0.2.0.tar.gz/conf/bonnie.conf
Changed
@@ -1,7 +1,7 @@ [broker] zmq_dealer_router_bind_address = tcp://*:5570 zmq_collector_router_bind_address = tcp://*:5571 -zmq_controller_bind_address = tcp://*:5572 +zmq_worker_controller_router_bind_address = tcp://*:5572 zmq_worker_router_bind_address = tcp://*:5573 persistence_sql_uri = sqlite://
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.