Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 23
View file
bonnie.spec
Changed
@@ -1,5 +1,5 @@ Name: bonnie -Version: 0.1 +Version: 0.1.0 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware
View file
bonnie-0.1.tar.gz/bonnie/broker/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/broker/__init__.py
Changed
@@ -22,14 +22,19 @@ """ This is the broker for Bonnie. """ - import brokers -class BonnieBroker(object): - broker_interests = {} - broker_modules = {} +from bonnie.daemon import BonnieDaemon + +class BonnieBroker(BonnieDaemon): + pidfile = "/var/run/bonnie/broker.pid" def __init__(self, *args, **kw): + super(BonnieBroker, self).__init__(*args, **kw) + + self.broker_interests = {} + self.broker_modules = {} + for _class in brokers.list_classes(): module = _class() module.register(callback=self.register_broker)
View file
bonnie-0.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -39,6 +39,7 @@ from bonnie.broker import persistence class ZMQBroker(object): + MAX_RETRIES = 5 running = False def __init__(self): @@ -69,7 +70,7 @@ return job = jobs.pop() - job.set_state(b"ALLOC") + job.set_status(b"ALLOC") return job def collect_jobs_with_status(self, _state, collector_id=None): @@ -112,9 +113,10 @@ if len(jobs) < 1: return None - job = jobs.pop() + # take the first job in the queue + job = jobs[0] - job.set_state(b"ALLOC") + job.set_status(b"ALLOC") job.set_worker(_worker_id) return job.uuid @@ -124,11 +126,24 @@ self.worker_jobs.delete(job) log.debug("Worker job done: %s;" % (_job_uuid), level=8) - def worker_job_free(self, _job_uuid): + def worker_job_free(self, _job_uuid, pushback=False): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: job.set_status(b"PENDING") job.set_worker(None) + if pushback: + # increment retry count on pushback + job.retries += 1 + log.debug("Push back job %s for %d. time" % (_job_uuid, job.retries), level=8) + if job.retries > self.MAX_RETRIES: + # delete job after MAX retries + self.worker_jobs.delete(job) + log.info("Delete pushed back job %s" % (_job_uuid)) + else: + # move it to the end of the job queue + self.worker_jobs.remove(job) + self.worker_jobs.append(job) + def worker_job_send(self, _job_uuid, _worker_id): # TODO: Sanity check on job state, worker assignment, etc. for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: @@ -225,16 +240,19 @@ # reset existing jobs in self.worker_jobs and self.collect_jobs to status PENDING (?) # this will re-assign them to workers and collectors after a broker restart for job in self.worker_jobs: - job.set_state(b"PENDING") + job.set_status(b"PENDING") for job in self.collect_jobs: - job.set_state(b"PENDING") + job.set_status(b"PENDING") persistence.syncronize() while self.running: try: sockets = dict(poller.poll(1000)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break except Exception, e: log.error("zmq.Poller error: %r", e) sockets = dict() @@ -254,6 +272,9 @@ if _message[1] == b"DONE": self.worker_job_done(_message[2]) + if _message[1] == b"PUSHBACK": + self.worker_job_free(_message[2], True) + if _message[1] in self.collector_interests: _job_uuid = _message[2] self.transit_job_collect(_job_uuid, _message[1]) @@ -330,14 +351,14 @@ def transit_job_collect(self, _job_uuid, _command): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: - job.set_state(b"PENDING") + job.set_status(b"PENDING") job.set_command(_command) self.collect_jobs.append(job) self.worker_jobs.remove(job) def transit_job_worker(self, _job_uuid, _notification): for job in [x for x in self.collect_jobs if x.uuid == _job_uuid]: - job.set_state(b"PENDING") + job.set_status(b"PENDING") job.notification = _notification self.worker_jobs.append(job) self.collect_jobs.remove(job)
View file
bonnie-0.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.1.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -45,6 +45,7 @@ self.collector_id = collector_id self.timestamp = time.time() self.command = None + self.retries = 0 if self.client_id == None: if self.collector_id == None: @@ -54,7 +55,7 @@ else: self.type = 'Dealer' - def set_state(self, state): + def set_status(self, state): self.state = state def set_worker(self, worker_id):
View file
bonnie-0.1.tar.gz/bonnie/collector/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/collector/__init__.py
Changed
@@ -24,32 +24,24 @@ import inputs import handlers -import bonnie from bonnie.utils import parse_imap_uri +from bonnie.daemon import BonnieDaemon +import bonnie conf = bonnie.getConf() log = bonnie.getLogger('bonnie.collector') -class BonnieCollector(object): - input_interests = {} - input_modules = {} - - handler_interests = {} - handler_modules = {} - +class BonnieCollector(BonnieDaemon): + pidfile = "/var/run/bonnie/collector.pid" def __init__(self, *args, **kw): - # TODO: read active input module from config collector.input_modules - for _class in inputs.list_classes(): - module = _class() - module.register(callback=self.register_input) - self.input_modules[_class] = module + super(BonnieCollector, self).__init__(*args, **kw) - # TODO: read active handler module from config collector.handler_modules - for _class in handlers.list_classes(): - handler = _class() - handler.register(callback=self.register_handler) - self.handler_modules[_class] = handler + self.input_interests = {} + self.input_modules = {} + + self.handler_interests = {} + self.handler_modules = {} def execute(self, command, notification): """ @@ -74,6 +66,18 @@ self.handler_interests[interest].append(how) def run(self): + # TODO: read active input module from config collector.input_modules + for _class in inputs.list_classes(): + module = _class() + module.register(callback=self.register_input) + self.input_modules[_class] = module + + # TODO: read active handler module from config collector.handler_modules + for _class in handlers.list_classes(): + handler = _class() + handler.register(callback=self.register_handler) + self.handler_modules[_class] = handler + input_modules = conf.get('collector', 'input_modules').split(',') for _input in self.input_modules.values(): if _input.name() in input_modules:
View file
bonnie-0.1.tar.gz/bonnie/collector/handlers/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/collector/handlers/__init__.py
Changed
@@ -1,13 +1,16 @@ from messagedata import MessageDataHandler from imapdata import IMAPDataHandler +from ldapdata import LDAPDataHandler __all__ = [ 'MessageDataHandler', - 'IMAPDataHandler' + 'IMAPDataHandler', + 'LDAPDataHandler' ] def list_classes(): return [ MessageDataHandler, - IMAPDataHandler + IMAPDataHandler, + LDAPDataHandler ] \ No newline at end of file
View file
bonnie-0.1.tar.gz/bonnie/collector/handlers/imapdata.py -> bonnie-0.1.0.tar.gz/bonnie/collector/handlers/imapdata.py
Changed
@@ -37,7 +37,7 @@ # load pykolab conf conf = pykolab.getConf() if not hasattr(conf, 'defaults'): - conf.finalize_conf() + conf.finalize_conf(fatal=False) self.imap = IMAP()
View file
bonnie-0.1.0.tar.gz/bonnie/collector/handlers/ldapdata.py
Added
@@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import json +import pykolab +from pykolab.auth import Auth + +import bonnie +from bonnie.utils import parse_imap_uri +from bonnie.utils import imap_folder_path + +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.collector.LDAPDataHandler') + +class LDAPDataHandler(object): + """ + Collector handler to provide user data from LDAP + """ + + def __init__(self, *args, **kw): + # load pykolab conf + self.pykolab_conf = pykolab.getConf() + if not hasattr(self.pykolab_conf, 'defaults'): + self.pykolab_conf.finalize_conf(fatal=False) + + self.ldap = Auth() + self.connections = 0 + + def register(self, callback): + interests = { + 'GETUSERDATA': { 'callback': self.get_user_data } + } + + callback(interests) + + def get_user_data(self, notification): + notification = json.loads(notification) + log.debug("GETUSERDATA for %r" % (notification), level=9) + + if notification.has_key('user'): + self.connections += 1 + + try: + self.ldap.connect() + user_dn = self.ldap.find_user_dn(notification['user'], True) + log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8) + except Exception, e: + log.error("LDAP connection error: %r", e) + user_dn = None + + if user_dn: + unique_attr = self.pykolab_conf.get('ldap', 'unique_attribute', 'nsuniqueid') + user_rec = self.ldap.get_entry_attributes(None, user_dn, [unique_attr, 'cn']) + log.debug("User attributes: %r" % (user_rec), level=8) + + if user_rec and user_rec.has_key(unique_attr): + user_rec['dn'] = user_dn + user_rec['id'] = user_rec[unique_attr] + del user_rec[unique_attr] + else: + user_rec = None + + notification['user_data'] = user_rec + + self.connections -= 1 + + if self.connections == 0: + self.ldap.disconnect() + + return json.dumps(notification)
View file
bonnie-0.1.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.1.0.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -41,6 +41,7 @@ running = False def __init__(self, *args, **kw): + self.interests = [] self.context = zmq.Context() zmq_broker_address = conf.get('collector', 'zmq_broker_address') @@ -63,8 +64,8 @@ pass def report_state(self, interests=[]): - log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, interests), level=9) - self.collector.send_multipart([b"STATE", self.state, ",".join(interests)]) + log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) + self.collector.send_multipart([b"STATE", self.state, ",".join(self.interests)]) self.report_timestamp = time.time() def run(self, callback=None, interests=[]): @@ -73,11 +74,15 @@ self.running = True # report READY state with interests - self.report_state(interests) + self.interests = interests + self.report_state() while self.running: try: sockets = dict(self.poller.poll(1000)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break except Exception, e: log.error("zmq.Poller error: %r", e) sockets = dict()
View file
bonnie-0.1.0.tar.gz/bonnie/daemon.py
Added
@@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import os +import sys +import grp +import pwd +import signal +import traceback +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie') + +class BonnieDaemon(object): + pidfile = "/var/run/bonnie/bonnie.pid" + + def __init__(self, *args, **kw): + daemon_group = conf.add_cli_parser_option_group("Daemon Options") + + daemon_group.add_option( + "--fork", + dest = "fork_mode", + action = "store_true", + default = False, + help = "Fork to the background." + ) + + daemon_group.add_option( + "-p", + "--pid-file", + dest = "pidfile", + action = "store", + default = self.pidfile, + help = "Path to the PID file to use." + ) + + daemon_group.add_option( + "-u", + "--user", + dest = "process_username", + action = "store", + default = "kolab", + help = "Run as user USERNAME", + metavar = "USERNAME" + ) + + daemon_group.add_option( + "-g", + "--group", + dest = "process_groupname", + action = "store", + default = "kolab", + help = "Run as group GROUPNAME", + metavar = "GROUPNAME" + ) + + conf.finalize_conf() + + def run(self, *args, **kw): + """ + The daemon main loop + """ + pass + + def start(self, *args, **kw): + """ + Start the daemon + """ + exitcode = 0 + terminate = True + + if conf.fork_mode: + self.drop_privileges() + + try: + pid = 1 + if conf.fork_mode: + pid = daemonize() + + if pid == 0: + self.write_pid() + self.signal_handlers() + self.run(*args, **kw) + elif not conf.fork_mode: + self.signal_handlers() + self.run(*args, **kw) + else: + terminate = False + + except SystemExit, errcode: + terminate = False + exitcode = errcode + + except KeyboardInterrupt: + exitcode = 1 + log.info("Interrupted by user") + + except (AttributeError, TypeError) as errmsg: + exitcode = 1 + traceback.print_exc() + print >> sys.stderr, "Traceback occurred, please report a " + \ + "bug at https://issues.kolab.org" + + except: + exitcode = 2 + traceback.print_exc() + print >> sys.stderr, "Traceback occurred, please report a " + \ + "bug at https://issues.kolab.org" + + if terminate: + self.terminate() + + sys.exit(exitcode) + + def terminate(self, *args, **kw): + """ + Daemon shutdown function + """ + self.remove_pid() + + def signal_handlers(self): + """ + Register process signal handlers + """ + signal.signal(signal.SIGTERM, self.terminate) + + def write_pid(self): + """ + Write the process ID to the configured pid file + """ + pid = os.getpid() + fp = open(conf.pidfile, 'w') + fp.write("%d\n" % (pid)) + fp.close() + + def remove_pid(self, *args, **kw): + """ + Remove our PID file. + """ + if os.access(conf.pidfile, os.R_OK): + try: + os.remove(conf.pidfile) + except: + pass + + raise SystemExit + + def drop_privileges(self): + try: + try: + (ruid, euid, suid) = os.getresuid() + (rgid, egid, sgid) = os.getresgid() + except AttributeError, errmsg: + ruid = os.getuid() + rgid = os.getgid() + + if ruid == 0: + # Means we can setreuid() / setregid() / setgroups() + if rgid == 0: + # Get group entry details + try: + ( + group_name, + group_password, + group_gid, + group_members + ) = grp.getgrnam(conf.process_groupname) + + except KeyError: + print >> sys.stderr, "Group %s does not exist" % (conf.process_groupname) + sys.exit(1) + + # Set real and effective group if not the same as current. + if not group_gid == rgid: + log.debug("Switching real and effective group id to %d" % (group_gid), level=8) + os.setregid(group_gid, group_gid) + + if ruid == 0: + # Means we haven't switched yet. + try: + ( + user_name, + user_password, + user_uid, + user_gid, + user_gecos, + user_homedir, + user_shell + ) = pwd.getpwnam(conf.process_username) + + except KeyError: + print >> sys.stderr, "User %s does not exist" % (conf.process_username) + sys.exit(1) + + + # Set real and effective user if not the same as current. + if not user_uid == ruid: + log.debug("Switching real and effective user id to %d" % (user_uid), level=8) + os.setreuid(user_uid, user_uid) + + except: + log.error("Could not change real and effective uid and/or gid") + + +def daemonize(): + """ + This forks the current process into a daemon. + """ + # do the UNIX double-fork magic, see Stevens' "Advanced + # Programming in the UNIX Environment" for details (ISBN 0201563177) + try: + pid = os.fork() + if pid > 0: + sys.exit(0) # exit first parent. + except OSError, e: + print >> sys.stderr, "Fork #1 failed: (%d) %s" % (e.errno, e.strerror) + sys.exit(1) + + # Decouple from parent environment. + # os.chdir("/") + os.umask(0) + os.setsid() + + # Do second fork. + try: + pid = os.fork() + if pid > 0: + sys.exit(0) # exit second parent. + except OSError, e: + print >> sys.stderr, "Fork #2 failed: (%d) %s" % (e.errno, e.strerror) + sys.exit(1) + + return pid
View file
bonnie-0.1.tar.gz/bonnie/dealer/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/dealer/__init__.py
Changed
@@ -28,23 +28,28 @@ log = bonnie.getLogger('bonnie.dealer') class BonnieDealer(object): - output_modules = {} - def __init__(self, *args, **kw): + self.output_modules = {} + for _class in outputs.list_classes(): __class = _class() self.output_modules[__class] = __class.register(callback=self.register_output) def register_output(self, interests): - self.output_interests = interests + pass def accept_notification(self, notification): parsed = json.loads(notification) event = parsed['event'] user = parsed['user'] if parsed.has_key('user') else None - blacklist_events = conf.get('dealer', 'blacklist_events').split(',') - blacklist_users = conf.get('dealer', 'blacklist_users').split(',') + # ignore globally excluded events + exclude_events = conf.get('dealer', 'input_exclude_events', '').split(',') + if event in exclude_events: + return False + + blacklist_events = conf.get('dealer', 'blacklist_events', '').split(',') + blacklist_users = conf.get('dealer', 'blacklist_users', '').split(',') # ignore blacklisted events for blacklisted users if event in blacklist_events and user is not None and user in blacklist_users:
View file
bonnie-0.1.tar.gz/bonnie/dealer/outputs/zmq_output.py -> bonnie-0.1.0.tar.gz/bonnie/dealer/outputs/zmq_output.py
Changed
@@ -48,7 +48,7 @@ return 'zmq_output' def register(self, *args, **kw): - return [ 'MailboxCreate' ] + return self.run def run(self, notification): log.debug("[%s] Notification received: %r" % (self.dealer.identity, notification), level=9)
View file
bonnie-0.1.tar.gz/bonnie/logger.py -> bonnie-0.1.0.tar.gz/bonnie/logger.py
Changed
@@ -36,36 +36,51 @@ """ debuglevel = 0 fork = False + logfile = '/var/log/bonnie/bonnie.log' loglevel = logging.CRITICAL if hasattr(sys, 'argv'): for arg in sys.argv: + value = None + if '=' in arg: + (arg,value) = arg.split('=')[0:2] + + if '-d' == arg or '--debug' == arg: + debuglevel = value if value is not None else -1 + continue + if debuglevel == -1: try: debuglevel = int(arg) except ValueError, errmsg: + debuglevel = 0 continue loglevel = logging.DEBUG - break - - if '-d' == arg: - debuglevel = -1 continue if '-l' == arg: loglevel = -1 continue - if '--fork' == arg: - fork = True - if loglevel == -1: if hasattr(logging,arg.upper()): loglevel = getattr(logging,arg.upper()) else: loglevel = logging.DEBUG + if '--logfile' == arg: + logfile = value + continue + + if logfile is None: + logfile = arg + continue + + if '--fork' == arg: + fork = True + continue + def __init__(self, *args, **kw): if kw.has_key('name'): name = kw['name'] @@ -86,9 +101,11 @@ if kw.has_key('logfile'): self.logfile = kw['logfile'] - else: + elif self.logfile is None: self.logfile = '/var/log/bonnie/bonnie.log' + self.setLevel(self.loglevel) + # Make sure (read: attempt to change) the permissions try: (ruid, euid, suid) = os.getresuid() @@ -137,8 +154,16 @@ self.console_stdout.close() self.removeHandler(self.console_stdout) + def info(self, msg, *args): + # Suppress info messages from other applications according to debug level + if self.name.startswith('sqlalchemy') and self.debuglevel < 9: + return + if not self.name.startswith('bonnie') and self.debuglevel < 8: + return + + self.log(logging.INFO, '[%d]: %s' % (os.getpid(), msg) % args) + def debug(self, msg, level=1, *args): - self.setLevel(self.loglevel) # Work around other applications not using various levels of debugging if not self.name.startswith('bonnie') and not self.debuglevel == 9: return
View file
bonnie-0.1.tar.gz/bonnie/translate.py -> bonnie-0.1.0.tar.gz/bonnie/translate.py
Changed
@@ -26,7 +26,10 @@ import os N_ = lambda x: x -_ = lambda x: gettext.ldgettext(domain, x) +_ = lambda x: current.lgettext(x) + +localedir = '/usr/local/share/locale' +current = gettext.translation(domain, localedir, fallback=True) def getDefaultLangs(): languages = []
View file
bonnie-0.1.tar.gz/bonnie/utils.py -> bonnie-0.1.0.tar.gz/bonnie/utils.py
Changed
@@ -221,4 +221,4 @@ mailbox_path = os.path.join(mailbox_path, folder_name) return mailbox_path - \ No newline at end of file +
View file
bonnie-0.1.tar.gz/bonnie/worker/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/worker/__init__.py
Changed
@@ -21,17 +21,93 @@ # import json - +import time import handlers import inputs import outputs import storage +import signal + +from bonnie.translate import _ +from bonnie.daemon import BonnieDaemon +from multiprocessing import Process import bonnie conf = bonnie.getConf() -from bonnie.translate import _ +log = bonnie.getLogger('bonnie.worker') + +class BonnieWorker(BonnieDaemon): + pidfile = "/var/run/bonnie/worker.pid" + + def __init__(self, *args, **kw): + worker_group = conf.add_cli_parser_option_group("Worker Options") + + worker_group.add_option( + "-n", + "--num-childs", + dest = "num_childs", + action = "store", + default = None, + help = "Number of child processes to spawn" + ) + + super(BonnieWorker, self).__init__(*args, **kw) + + self.childs = [] + self.manager = False + self.running = False + + def run(self): + """ + Daemon main loop + """ + num_childs = conf.num_childs or conf.get('worker', 'num_childs') + if num_childs is not None: + num_childs = int(num_childs) + + if num_childs is None or num_childs < 1: + main = BonnieWorkerProcess() + self.childs.append(main) + main.run() # blocking + else: + conf.fork_mode = False + self.manager = True + self.running = True + + while self.running: + # (re)start child worker processes + while len(self.childs) < num_childs: + p = Process(target=self.run_child) + self.childs.append(p) + p.start() + + # check states of child processes + for p in self.childs: + if not p.is_alive(): + log.info("Restarting dead worker process %r", p.pid) + self.childs.remove(p) + + time.sleep(10) + + log.info("Shutting down worker manager") + + def run_child(self): + """ + This method is being run in a separate process + """ + BonnieWorkerProcess(as_child=True).run() -class BonnieWorker(object): + def terminate(self, *args, **kw): + self.running = False + for p in self.childs: + p.terminate() + + if self.manager: + for p in self.childs: + p.join() + + +class BonnieWorkerProcess(object): handler_interests = { '_all': [] } input_interests = {} storage_interests = {} @@ -42,28 +118,9 @@ storage_modules = {} output_modules = {} - def __init__(self, *args, **kw): - - daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) - - daemon_group.add_option( - "--fork", - dest = "fork_mode", - action = "store_true", - default = False, - help = _("Fork to the background.") - ) - - daemon_group.add_option( - "-p", - "--pid-file", - dest = "pidfile", - action = "store", - default = "/var/run/bonnie/worker.pid", - help = _("Path to the PID file to use.") - ) - - conf.finalize_conf() + def __init__(self, as_child=False, *args, **kw): + if as_child: + signal.signal(signal.SIGTERM, self.terminate) for _class in handlers.list_classes(): __class = _class() @@ -90,6 +147,8 @@ self.storage_modules[_class] = _storage self.storage = _storage + self.output_exclude_events = conf.get('worker', 'output_exclude_events', '').split(',') + def event_notification(self, notification): """ Input an event notification in to our process. @@ -123,7 +182,7 @@ jobs.extend(_jobs) # finally send notification to output handlers if no jobs remaining - if len(jobs) == 0 and not notification.has_key('_suppress_output'): + if len(jobs) == 0 and not notification.has_key('_suppress_output') and not event in self.output_exclude_events: if self.output_interests.has_key(event): for interest in self.output_interests[event]: (notification, _jobs) = self.interest_callback(interest, notification) @@ -136,6 +195,23 @@ return notification, list(set(jobs)) + def input_report(self): + """ + Periodic callbacks from the input main loop. + Forward to all handler, storage and output modules + """ + for _handler in self.handler_modules.values(): + if hasattr(_handler, 'report'): + _handler.report() + + for _storage in self.storage_modules.values(): + if hasattr(_storage, 'report'): + _storage.report() + + for _storage in self.storage_modules.values(): + if hasattr(_storage, 'report'): + _storage.report() + def interest_callback(self, interest, notification): """ Helper method to call an interest callback @@ -156,7 +232,6 @@ { 'MessageAppend': { 'callback': self.run } } """ - print 'register_handler', interests for interest,how in interests.iteritems(): if not self.handler_interests.has_key(interest): self.handler_interests[interest] = [] @@ -191,7 +266,7 @@ input_modules = conf.get('worker', 'input_modules').split(',') for _input in self.input_modules.values(): if _input.name() in input_modules: - _input.run(callback=self.event_notification) + _input.run(callback=self.event_notification, report=self.input_report) def terminate(self, *args, **kw): for _input in self.input_modules.values():
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/__init__.py
Changed
@@ -6,6 +6,8 @@ import bonnie from base import HandlerBase +from mailboxbase import MailboxHandlerBase +from messagebase import MessageHandlerBase from aclchange import AclChangeHandler from flagsclear import FlagsClearHandler @@ -17,7 +19,6 @@ from mailboxrename import MailboxRenameHandler from mailboxsubscribe import MailboxSubscribeHandler from mailboxunsubscribe import MailboxUnsubscribeHandler -from messagebase import MessageHandlerBase from messageappend import MessageAppendHandler from messagecopy import MessageCopyHandler from messageexpire import MessageExpireHandler @@ -37,6 +38,7 @@ 'FlagsSetHandler', 'LoginHandler', 'LogoutHandler', + 'MailboxHandlerBase', 'MailboxCreateHandler', 'MailboxDeleteHandler', 'MailboxRenameHandler',
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/aclchange.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/aclchange.py
Changed
@@ -21,10 +21,10 @@ Base handler for an event notification of type 'AclChange' """ -from bonnie.worker.handlers import HandlerBase +from bonnie.worker.handlers import MailboxHandlerBase -class AclChangeHandler(HandlerBase): +class AclChangeHandler(MailboxHandlerBase): event = 'AclChange' def __init__(self, *args, **kw): - HandlerBase.__init__(self, *args, **kw) + MailboxHandlerBase.__init__(self, *args, **kw)
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/base.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/base.py
Changed
@@ -12,7 +12,18 @@ self.worker = callback(interests) def run(self, notification): + # resolve user_id from storage if notification.has_key('user') and not notification.has_key('user_id'): - notification['user_id'] = self.worker.storage.resolve_username(notification['user']) + user_data = notification['user_data'] if notification.has_key('user_data') else None + notification['user_id'] = self.worker.storage.resolve_username(notification['user'], user_data, force=notification.has_key('user_data')) + + # if storage has no entry, fetch user record from collector + if notification.has_key('user') and notification['user_id'] is None and not notification.has_key('user_data'): + notification['user_data'] = None # avoid endless loop if GETUSERDATA fails + return (notification, [ b"GETUSERDATA" ]) + + # don't store user data in notification + if notification.has_key('user_data'): + del notification['user_data'] return (notification, [])
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/changelog.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/changelog.py
Changed
@@ -29,8 +29,8 @@ log = bonnie.getLogger('bonnie.worker.changelog') -# timestamp (* 100) at the year 2010 -REVBASE = 94668480000 +# timestamp (* 10) at the year 2014 +REVBASE = 13885344000 class ChangelogHandler(HandlerBase): events = ['MessageAppend','vnd.cmu.MessageMove'] @@ -59,7 +59,7 @@ # assign a revision number based on the current time if object_type is not None: - notification['revision'] = int(round(time.time() * 100 - REVBASE)) + notification['revision'] = int(round(time.time() * 10 - REVBASE)) # TODO: save object type and UUID in separate fields? # These are translated into headers.X-Kolab-Type and headers.Subject by the output module
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/logout.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/logout.py
Changed
@@ -39,7 +39,7 @@ # and suppress separate logging of this event with notification['_suppress_output'] = True if notification.has_key('vnd.cmu.sessionId'): now = datetime.datetime.now(tzutc()) - attempts = 5 + attempts = 4 while attempts > 0: results = self.worker.storage.select( query=[ @@ -51,7 +51,7 @@ fields='user,@timestamp', limit=1 ) - if results['total'] > 0: + if results and results['total'] > 0: login_event = results['hits'][0] try: @@ -77,4 +77,7 @@ attempts -= 1 time.sleep(1) # wait for storage and try again + # push back into the job queue, the corresponding Login event may not yet have been processed. + return (notification, [b"PUSHBACK"]) + return super(LogoutHandler, self).run(notification) \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/bonnie/worker/handlers/mailboxbase.py
Added
@@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +""" + Base handler for an event notification of type 'MailboxRename' +""" + +from bonnie.worker.handlers import HandlerBase + +class MailboxHandlerBase(HandlerBase): + event = None + + def __init__(self, *args, **kw): + HandlerBase.__init__(self, *args, **kw) + + def run(self, notification): + # call super for some basic notification processing + (notification, jobs) = super(MailboxHandlerBase, self).run(notification) + + # mailbox notifications require metadata + if not notification.has_key('metadata'): + jobs.append(b"GETMETADATA") + return (notification, jobs) + + # extract uniqueid from metadata -> triggers the storage module + if notification['metadata'].has_key('/shared/vendor/cmu/cyrus-imapd/uniqueid'): + notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] + + return (notification, jobs)
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/mailboxcreate.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/mailboxcreate.py
Changed
@@ -22,27 +22,10 @@ """ import bonnie -from bonnie.worker.handlers import HandlerBase +from bonnie.worker.handlers import MailboxHandlerBase -class MailboxCreateHandler(HandlerBase): +class MailboxCreateHandler(MailboxHandlerBase): event = 'MailboxCreate' def __init__(self, *args, **kw): - HandlerBase.__init__(self, *args, **kw) - self.log = bonnie.getLogger('bonnie.worker.' + self.event) - - def run(self, notification): - # call super for some basic notification processing - (notification, jobs) = super(MailboxCreateHandler, self).run(notification) - - # mailbox notifications require metadata - if not notification.has_key('metadata'): - self.log.debug("Adding GETMETADATA job for " + self.event, level=8) - jobs.append(b"GETMETADATA") - return (notification, jobs) - - # extract uniqueid from metadata -> triggers the storage module - if notification['metadata'].has_key('/shared/vendor/cmu/cyrus-imapd/uniqueid'): - notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] - - return (notification, jobs) + MailboxHandlerBase.__init__(self, *args, **kw)
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/mailboxrename.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/mailboxrename.py
Changed
@@ -21,25 +21,10 @@ Base handler for an event notification of type 'MailboxRename' """ -from bonnie.worker.handlers import HandlerBase +from bonnie.worker.handlers import MailboxHandlerBase -class MailboxRenameHandler(HandlerBase): +class MailboxRenameHandler(MailboxHandlerBase): event = 'MailboxRename' def __init__(self, *args, **kw): - HandlerBase.__init__(self, *args, **kw) - - def run(self, notification): - # call super for some basic notification processing - (notification, jobs) = super(MailboxRenameHandler, self).run(notification) - - # mailbox notifications require metadata - if not notification.has_key('metadata'): - jobs.append(b"GETMETADATA") - return (notification, jobs) - - # extract uniqueid from metadata -> triggers the storage module - if notification['metadata'].has_key('/shared/vendor/cmu/cyrus-imapd/uniqueid'): - notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] - - return (notification, jobs) + MailboxHandlerBase.__init__(self, *args, **kw)
View file
bonnie-0.1.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.1.0.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -39,7 +39,25 @@ running = False def __init__(self, *args, **kw): + self.state = b"READY" + self.job_id = None + self.lastping = 0 + self.report_timestamp = 0 + + def name(self): + return 'zmq_input' + + def register(self, *args, **kw): + pass + + def report_state(self): + log.debug("[%s] reporting state: %s" % (self.identity, self.state), level=8) + self.controller.send_multipart([b"STATE", self.state]) + self.report_timestamp = time.time() + + def run(self, callback=None, report=None): self.identity = u"Worker-%s-%d" % (socket.getfqdn(),os.getpid()) + log.info("[%s] starting", self.identity) self.context = zmq.Context() @@ -65,35 +83,23 @@ self.poller.register(self.controller, zmq.POLLIN) self.poller.register(self.worker, zmq.POLLIN) - self.state = b"READY" - self.job_id = None - self.report_timestamp = time.time() - - def name(self): - return 'zmq_input' - - def register(self, *args, **kw): - pass - - def report_state(self): - print "reporting state", self.state - self.controller.send_multipart([b"STATE", self.state]) - self.report_timestamp = time.time() - - def run(self, callback=None): - log.info("[%s] starting", self.identity) - self.running = True + self.lastping = time.time() self.report_state() while self.running: try: sockets = dict(self.poller.poll(1000)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break except Exception, e: log.error("zmq.Poller error: %r", e) sockets = dict() - if self.report_timestamp < (time.time() - 60): + now = time.time() + + if self.report_timestamp < (now - 60): self.report_state() if self.controller in sockets: @@ -118,21 +124,29 @@ log.debug("[%s] Worker message: %r" % (self.identity, _message), level=9) if _message[0] == "JOB": + _job_uuid = _message[1] + # TODO: Sanity checking #if _message[1] == self.job_id: if not callback == None: - (status, jobs) = callback(_message[2]) + (notification, jobs) = callback(_message[2]) + else: + jobs = [] if len(jobs) == 0: - self.controller.send_multipart([b"DONE", _message[1]]) + self.controller.send_multipart([b"DONE", _job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) for job in jobs: - self.controller.send_multipart([job, _message[1]]) + self.controller.send_multipart([job, _job_uuid]) self.set_state_ready() + if report is not None and self.lastping < (now - 60): + report() + self.lastping = now + log.info("[%s] shutting down", self.identity) self.worker.close()
View file
bonnie-0.1.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.1.0.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -52,6 +52,8 @@ 'diskUsed': 'disk_used', 'vnd.cmu.oldUidset': 'olduidset', 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', } log = { '@version': bonnie.API_VERSION } for key,val in notification.iteritems(): @@ -85,6 +87,11 @@ notification['message'] = None if notification.has_key('messageContent') and notification['messageContent'].has_key(uid): notification['message'] = notification['messageContent'][uid] + # no need for bodystructure if we have the real message content + notification.pop('bodyStructure', None) + + # remove vnd.cmu.envelope if we have headers + notification.pop('vnd.cmu.envelope', None) self.es.create( index=index,
View file
bonnie-0.1.tar.gz/bonnie/worker/storage/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/worker/storage/__init__.py
Changed
@@ -1,6 +1,8 @@ +from caching import CachedDict from elasticsearch_storage import ElasticSearchStorage __all__ = [ + 'CachedDict', 'ElasticSearchStorage' ]
View file
bonnie-0.1.0.tar.gz/bonnie/worker/storage/caching.py
Added
@@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import time + +class CachedDict(object): + """ + dict-like class which drops items after the given TTL + """ + def __init__(self, ttl=60): + # TODO: use memcache for distributed memory-based caching + self.ttl = ttl + self.data = {} + + def remove(self, key): + self.data.remove(key) + + def pop(self, key, default=None): + item = self.data.pop(key) + return item[0] if item is not None else default + + def update(self, other): + expire = int(time.time()) + self.ttl + self.data.update(dict((k, (v, expire)) for k, v in other.items())) + + def keys(self): + now = int(time.time()) + return [k for k, v in self.data.items() if v[1] > now] + + def values(self): + now = int(time.time()) + return [v[0] for v in self.data.values() if v[1] > now] + + def items(self): + now = int(time.time()) + return dict((k, v[0]) for k, v in self.data.items() if v[1] > now).items() + + def iteritems(self): + return self.items().iteritems() + + def has_key(self, key): + return self.data.has_key(key) and self.data[key][1] > int(time.time()) + + def expunge(self): + now = int(time.time()) + self.data = dict((k, v) for k, v in self.data.items() if v[1] > now) + + def __getitem__(self, key): + return self.data[key][0] + + def __setitem__(self, key, value): + self.data[key] = (value, int(time.time()) + self.ttl) + + def __contains__(self, key): + return self.has_key(key) + + def __len__(self): + return len(self.keys()) + + def __iter__(self): + return self.items().__iter__()
View file
bonnie-0.1.tar.gz/bonnie/worker/storage/elasticsearch_storage.py -> bonnie-0.1.0.tar.gz/bonnie/worker/storage/elasticsearch_storage.py
Changed
@@ -24,11 +24,14 @@ import json import urllib import hashlib +import random +import time import datetime import elasticsearch from dateutil.tz import tzutc from bonnie.utils import parse_imap_uri +from bonnie.worker.storage import CachedDict import bonnie conf = bonnie.getConf() @@ -42,6 +45,8 @@ default_doctype = 'object' folders_index = 'objects' folders_doctype = 'folder' + users_index = 'objects' + users_doctype = 'user' def __init__(self, *args, **kw): elasticsearch_output_address = conf.get('worker', 'elasticsearch_storage_address') @@ -53,6 +58,10 @@ host=elasticsearch_output_address ) + # use dicts with automatic expiration for caching user/folder lookups + self.user_id_cache = CachedDict(300) + self.folder_id_cache = CachedDict(120) + def name(self): return 'elasticsearch_storage' @@ -186,7 +195,7 @@ log.debug("ES select result for %r: %r" % (args['q'] or args['body'], res), level=8) except elasticsearch.exceptions.NotFoundError, e: - log.debug("ES entry not found for key %s: %r", key, e) + log.debug("ES entry not found for %r: %r", args['q'] or args['body'], e) res = None except Exception, e: @@ -241,20 +250,55 @@ result['_id'] = res['_id'] result['_index'] = res['_index'] result['_doctype'] = res['_type'] - result['_score'] = res['_score'] + + if res.has_key('_score'): + result['_score'] = res['_score'] return result - def resolve_username(self, user): + def resolve_username(self, user, user_data=None, force=False): """ Resovle the given username to the corresponding nsuniqueid from LDAP """ if not '@' in user: return user - # TODO: resolve with storage data - # return md5 sum of the username to make usernames work as fields/keys in elasticsearch - return hashlib.md5(user).hexdigest() + # return id cached in memory + if self.user_id_cache.has_key(user): + return self.user_id_cache[user] + + user_id = None + + # find existing entry in our storage backend + result = self.select( + [ ('user', '=', user) ], + index=self.users_index, + doctype=self.users_doctype, + sortby='@timestamp:desc', + limit=1 + ) + + if result and result['total'] > 0: + user_id = result['hits'][0]['_id'] + + elif user_data and user_data.has_key('id'): + # user data (from LDAP) is provided + user_id = user_data['id'] + + # insert a user record into our database + del user_data['id'] + user_data['user'] = user + user_data['@timestamp'] = datetime.datetime.now(tzutc()).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + self.set(user_id, user_data, index=self.users_index, doctype=self.users_doctype) + + elif force: + user_id = hashlib.md5(user).hexdigest() + + # cache this for 5 minutes + if user_id is not None: + self.user_id_cache[user] = user_id + + return user_id def notificaton2folder(self, notification, attrib='uri'): @@ -286,7 +330,7 @@ '@timestamp': datetime.datetime.now(tzutc()).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), 'uniqueid': notification['folder_uniqueid'], 'metadata': notification['metadata'], - 'acl': dict((self.resolve_username(k),v) for k,v in notification['acl'].iteritems()), + 'acl': dict((self.resolve_username(k, force=True),v) for k,v in notification['acl'].iteritems()), 'type': notification['metadata']['/shared/vendor/kolab/folder-type'] if notification['metadata'].has_key('/shared/vendor/kolab/folder-type') else 'mail', 'owner': uri['user'] + '@' + uri['domain'] if uri['user'] is not None else 'nobody', 'server': uri['host'], @@ -318,7 +362,15 @@ if not notification.has_key(attrib) or notification.has_key('folder_id'): return (notification, []) - log.debug("Resolve folder for %r = %r" % (attrib, notification[attrib]), level=8) + now = int(time.time()) + base_uri = re.sub(';.+$', '', notification[attrib]) + + log.debug("Resolve folder for %r = %r" % (attrib, base_uri), level=8) + + # return id cached in memory + if not notification.has_key('metadata') and self.folder_id_cache.has_key(base_uri): + notification['folder_id'] = self.folder_id_cache[base_uri] + return (notification, []) # mailbox resolving requires metadata if not notification.has_key('metadata'): @@ -382,6 +434,15 @@ # add reference to internal folder_id if folder is not None: + self.folder_id_cache[base_uri] = folder['id'] notification['folder_id'] = folder['id'] return (notification, []) + + def report(self): + """ + Callback from the worker main loop to trigger periodic jobs + """ + # clean-up in-memory caches from time to time + self.user_id_cache.expunge() + self.folder_id_cache.expunge()
View file
bonnie-0.1.tar.gz/broker.py -> bonnie-0.1.0.tar.gz/broker.py
Changed
@@ -1,9 +1,7 @@ #!/usr/bin/python -import signal from bonnie.broker import BonnieBroker if __name__ == "__main__": broker = BonnieBroker() - signal.signal(signal.SIGTERM, broker.terminate) - broker.run() + broker.start()
View file
bonnie-0.1.tar.gz/collector.py -> bonnie-0.1.0.tar.gz/collector.py
Changed
@@ -1,9 +1,7 @@ #!/usr/bin/python -import signal from bonnie.collector import BonnieCollector if __name__ == "__main__": collector = BonnieCollector() - signal.signal(signal.SIGTERM, collector.terminate) - collector.run() + collector.start()
View file
bonnie-0.1.tar.gz/conf/bonnie.conf -> bonnie-0.1.0.tar.gz/conf/bonnie.conf
Changed
@@ -13,12 +13,15 @@ output_modules = zmq_output zmq_broker_address = tcp://localhost:5570 blacklist_users = cyrus-admin -blacklist_events = Login,Logout +blacklist_events = Login,Logout,AclChange +input_exclude_events = [worker] +num_childs = 0 input_modules = zmq_input storage_modules = elasticsearch_storage output_modules = elasticsearch_output +output_exclude_events = MessageExpunge zmq_controller_address = tcp://localhost:5572 zmq_worker_router_address = tcp://localhost:5573 elasticsearch_output_address = localhost
View file
bonnie-0.1.0.tar.gz/tests/README.md
Added
@@ -0,0 +1,27 @@ +Running Bonnie Tests +==================== + +The test scripts are based on the `twisted trial` package which can be +installed from the `python-twisted-core` RPM package. + +Run the tests from the bonnie root directory with the following command: + +``` +$ export PYTHONPATH=.:/usr/lib/python2.6/site-packages +$ trial tests.{unit|functional}.<test-file-name>.<testClassName> +``` + +So for example + +``` +$ trial tests.unit.test-001-utils.TestBonnieUtils +$ trial tests.functional.test-001-login.TestBonnieLogin +``` + +Both the unit tests as well as the functional tests make the following +assumptions regarding the Kolab environment on the host they're run: + + * Kolab standard single-host setup running the domain 'example.org' + * A Kolab user named John Doe <john.doe@example.org> exists + * The Elasticsearch service is running + * No Bonnie processes are running or connected to ZMQ
View file
bonnie-0.1.0.tar.gz/tests/functional
Added
+(directory)
View file
bonnie-0.1.0.tar.gz/tests/functional/__init__.py
Added
@@ -0,0 +1,9 @@ +import sys + +sys.path = [ '.', '../..' ] + sys.path + +from testbase import TestBonnieFunctional +from test_001_login import TestBonnieLogin +from test_002_mailboxes import TestBonnieMailboxes +from test_003_messages import TestBonnieMessageEvents +from test_004_changelog import TestBonnieChangelog
View file
bonnie-0.1.0.tar.gz/tests/functional/start.sh
Added
@@ -0,0 +1,19 @@ +#!/bin/bash + +BASEDIR=$1 +: ${BASEDIR:="."} + +if [ -f /tmp/bonnie.pid ]; then + echo "Bonnie already running (/tmp/bonnie.pid exists)" + exit +fi + +killall -q broker.py worker.py collector.py + +cd $BASEDIR + +./broker.py -d 8 2>/dev/null > /dev/null & +./worker.py -d 8 2>/dev/null > /dev/null & +./collector.py -d 8 2>/dev/null > /dev/null & + +touch /tmp/bonnie.pid
View file
bonnie-0.1.0.tar.gz/tests/functional/stop.sh
Added
@@ -0,0 +1,4 @@ +#!/bin/bash + +killall -q broker.py worker.py collector.py +rm -f /tmp/bonnie.pid \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/functional/test_001_login.py
Added
@@ -0,0 +1,57 @@ +import json +import time + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer + +import bonnie +conf = bonnie.getConf() + +class TestBonnieLogin(TestBonnieFunctional): + + def test_001_login(self): + login = { + 'event': 'Login', + 'user': 'john.doe@example.org', + 'vnd.cmu.sessionId': 'kolab-sess-test-12345', + 'clientIP': '::1', + 'serverDomain': 'example.org', + } + + dealer = BonnieDealer() + dealer.run(json.dumps(login)) + + events = self.query_log([('event','=','Login')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event['session_id'], login['vnd.cmu.sessionId']) + self.assertEqual(event['@version'], bonnie.API_VERSION) + + del dealer + time.sleep(1) + + logout = { + 'event': 'Logout', + 'user': 'john.doe@example.org', + 'vnd.cmu.sessionId': 'kolab-sess-test-12345', + 'clientIP': '::1' + } + + dealer = BonnieDealer() + dealer.run(json.dumps(logout)) + + events = self.query_log([('event','=','Login'), ('logout_time','=','*')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('logout_time')) + self.assertTrue(event.has_key('duration')) + + # check objects/users entry + user = self.storage_get(event['user_id'], index='objects', doctype='user') + + self.assertIsInstance(user, dict) + self.assertEqual(user['user'], login['user']) + self.assertEqual(user['cn'], 'John Doe') \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/functional/test_002_mailboxes.py
Added
@@ -0,0 +1,49 @@ +import json + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer + +import bonnie +conf = bonnie.getConf() + +class TestBonnieMailboxes(TestBonnieFunctional): + + def test_mailboxcreate(self): + dealer = BonnieDealer() + + notification = { + 'event': 'MailboxCreate', + 'user': 'john.doe@example.org', + 'uri': 'imap://john.doe@example.org@kolab.example.org/Calendar;UIDVALIDITY=12345' + } + + dealer.run(json.dumps(notification)) + + events = self.query_log([('event','=','MailboxCreate')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('folder_id')) + self.assertTrue(event.has_key('folder_uniqueid')) + self.assertTrue(event.has_key('user_id')) + + # check objects/folder entry + folder = self.storage_get(event['folder_id'], index='objects', doctype='folder') + + self.assertIsInstance(folder, dict) + self.assertEqual(folder['uniqueid'], event['folder_uniqueid']) + self.assertEqual(folder['name'], 'Calendar') + self.assertEqual(folder['type'], 'event') + self.assertEqual(folder['owner'], 'john.doe@example.org') + + self.assertIsInstance(folder['metadata'], dict) + self.assertIsInstance(folder['acl'], dict) + self.assertTrue(folder['acl'].has_key(event['user_id'])) + self.assertTrue(folder['acl'][event['user_id']].startswith('lrswi')) + + # check objects/user entry + user = self.storage_get(event['user_id'], index='objects', doctype='user') + + self.assertIsInstance(user, dict) + self.assertEqual(user['user'], notification['user']) + self.assertEqual(user['cn'], 'John Doe')
View file
bonnie-0.1.0.tar.gz/tests/functional/test_003_messages.py
Added
@@ -0,0 +1,247 @@ +import json + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer +from email import message_from_string + +import bonnie +conf = bonnie.getConf() + +class TestBonnieMessageEvents(TestBonnieFunctional): + + def test_001_messagenew(self): + # we assume "messageHeaders" and "messageContent" payload already being collected + messagenew = { + "event": "MessageNew", + "messageSize": 976, + "messages": 6, + "modseq": 20, + "pid": 2340, + "service": "lmtpunix", + "timestamp": "2014-10-20T13:34:14.966+02:00", + "uidnext": 7, + "uidset": "6", + "uri": "imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487714/;UID=6", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "<a8486f5db6ec207de9b9f069850546ee@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-2340-1413804854-1", + "vnd.cmu.unseenMessages": 3, + "messageHeaders": { + "6": { + "Content-Transfer-Encoding": "7bit", + "Content-Type": "text/plain", + "Date": "2014-10-20T11:32:41Z", + "From": [ "Br\u00fcederli, Thomas <john.doe@example.org>" ], + "MIME-Version": "1.0", + "Message-ID": "<a8486f5db6ec207de9b9f069850546ee@example.org>", + "Received": "from kolab.example.org ([unix socket])\r\n\t by kolab.example.org (Cyrus git2.5+0-Kolab-2.5-67.el6.kolab_3.4) with LMTPA;\r\n\t Mon, 20 Oct 2014 13:34:14 +0200", + "Return-Path": "<john.doe@example.org>", + "Subject": "MessageNew event test", + "To": [ "Doe, John <john.doe@example.org>" ], + "X-Sender": "john.doe@example.org", + "X-Sieve": "CMU Sieve 2.4", + "X-Spam-Flag": "NO", + "X-Spam-Level": "", + "X-Spam-Score": "-0.002", + "X-Spam-Status": "No, score=-0.002 tagged_above=-10 required=6.2\r\n\ttests=[NO_RECEIVED=-0.001, NO_RELAYS=-0.001] autolearn=ham", + "X-Virus-Scanned": "amavisd-new at example.org" + } + }, + "messageContent": { + "6": "Return-Path: <john.doe@example.org>\r\nReceived: from kolab.example.org ([unix socket])\r\n\t by kolab.example.org (Cyrus git2.5+0-Kolab-2.5-67.el6.kolab_3.4) with LMTPA;\r\n\t Mon, 20 Oct 2014 13:34:14 +0200\r\nX-Sieve: CMU Sieve 2.4\r\nX-Virus-Scanned: amavisd-new at example.org\r\nX-Spam-Flag: NO\r\nX-Spam-Score: -0.002\r\nX-Spam-Level: \r\nX-Spam-Status: No, score=-0.002 tagged_above=-10 required=6.2\r\n\ttests=[NO_RECEIVED=-0.001, NO_RELAYS=-0.001] autolearn=ham\r\nMIME-Version: 1.0\r\nContent-Type: text/plain; charset=US-ASCII;\r\n format=flowed\r\nContent-Transfer-Encoding: 7bit\r\nDate: Mon, 20 Oct 2014 13:32:41 +0200\r\nFrom: =?UTF-8?Q?Br=C3=BCederli=2C_Thomas?= <john.doe@example.org>\r\nTo: \"Doe, John\" <john.doe@example.org>\r\nSubject: MessageNew event test\r\nMessage-ID: <a8486f5db6ec207de9b9f069850546ee@example.org>\r\nX-Sender: john.doe@example.org\r\n\r\nThis message should trigger the MessageNew event for john.doe...\r\n...and MessageAppend to /Sent for the sender.\r\n" + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messagenew)) + + events = self.query_log([('event','=','MessageNew')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertEqual(event['uidset'], '6') + self.assertEqual(event['service'], 'lmtpunix') + self.assertEqual(event['session_id'], messagenew['vnd.cmu.sessionId']) + self.assertEqual(event['@timestamp'], '2014-10-20T11:34:14.966000Z') + self.assertEqual(len(event['message_id']), 1) + + self.assertIsInstance(event['headers'], dict) + self.assertTrue(event['headers']['Message-ID'] in event['message_id']) + self.assertTrue(event['headers']['Subject'] in messagenew['messageHeaders']['6']['Subject']) + + # check if message payload is parsable + message = message_from_string(event['message'].encode('utf8','replace')) + self.assertEqual(message['Subject'], event['headers']['Subject']) + + # check objects/folder entry + self.assertTrue(event.has_key('folder_id')) + folder = self.storage_get(event['folder_id'], index='objects', doctype='folder') + + self.assertIsInstance(folder, dict) + self.assertEqual(folder['uniqueid'], event['folder_uniqueid']) + self.assertEqual(folder['name'], 'INBOX') + self.assertEqual(folder['owner'], 'john.doe@example.org') + + # check objects/user entry + self.assertTrue(event.has_key('user_id')) + user = self.storage_get(event['user_id'], index='objects', doctype='user') + + self.assertIsInstance(user, dict) + self.assertEqual(user['user'], messagenew['user']) + + + def test_002_messageappend(self): + messageappend = { + "event": "MessageAppend", + "flagNames": "\seen", + "messageSize": 555, + "messages": 6, + "modseq": 12, + "pid": 2222, + "service": "imap", + "timestamp": "2014-10-20T13:33:27.062+02:00", + "uidnext": 9, + "uidset": "8", + "uri": "imap://john.doe@example.org@kolab.example.org/Sent;UIDVALIDITY=1411487701/;UID=8", + "user": "john.doe@example.org", + "vnd.cmu.envelope": "(\"Mon, 20 Oct 2014 13:33:26 +0200\" \"MessageNew event test\" ((\"=?UTF-8?Q?Br=C3=BCederli=2C_Thomas?=\" NIL \"john.doe\" \"example.org\")) ((\"=?UTF-8?Q?Br=C3=BCederli=2C_Thomas?=\" NIL \"john.doe\" \"example.org\")) ((\"=?UTF-8?Q?Br=C3=BCederli=2C_Thomas?=\" NIL \"john.doe\" \"example.org\")) ((\"Doe, John\" NIL \"john.doe\" \"example.org\")) NIL NIL NIL \"<20f46a82b8584c1518fbeac7bad5f05b@example.org>\")", + "vnd.cmu.midset": [ "<20f46a82b8584c1518fbeac7bad5f05b@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-2222-1413804806-1", + "vnd.cmu.unseenMessages": 0, + "folder_id": "76b8cd8f85bb435d17fe28d576db64a7", + "folder_uniqueid": "f356a1a9-f897-454f-9ada-5646fe4c4117", + "messageHeaders": { + "8": { + "Content-Transfer-Encoding": "7bit", + "Content-Type": "text/plain", + "Date": "2014-10-20T11:31:11Z", + "From": [ "Br\u00fcederli, Thomas <john.doe@example.org>" ], + "MIME-Version": "1.0", + "Message-ID": "<20f46a82b8584c1518fbeac7bad5f05b@example.org>", + "Subject": "MessageNew event test", + "To": [ "Doe, John <john.doe@example.org>" ], + "User-Agent": "Kolab 3.1/Roundcube 1.1-git", + "X-Sender": "john.doe@example.org" + } + }, + "messageContent": { + "8": "MIME-Version: 1.0\r\nContent-Type: text/plain; charset=US-ASCII;\r\n format=flowed\r\nContent-Transfer-Encoding: 7bit\r\nDate: Mon, 20 Oct 2014 13:31:11 +0200\r\nFrom: =?UTF-8?Q?Br=C3=BCederli=2C_Thomas?= <john.doe@example.org>\r\nTo: \"Doe, John\" <john.doe@example.org>\r\nSubject: MessageNew event test\r\nMessage-ID: <44ef83beb911cb9cd82e8dc7a29467a9@example.org>\r\nX-Sender: john.doe@example.org\r\nUser-Agent: Kolab 3.1/Roundcube 1.1-git\r\n\r\nThis message should trigger the MessageNew event for john.doe...\r\n...and MessageAppend to /Sent for the sender." + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messageappend)) + + events = self.query_log([('event','=','MessageAppend')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('user_id')) + self.assertIsInstance(event['headers'], dict) + self.assertEqual(len(event['headers']['To']), 1) + self.assertEqual(event['headers']['Content-Type'], 'text/plain') + self.assertTrue(event['headers']['Message-ID'] in event['message_id']) + + + def test_003_messageread(self): + messageread = { + "event": "MessageRead", + "messages": 3, + "modseq": 64, + "pid": 802, + "service": "imap", + "timestamp": "2014-10-20T13:04:09.077+02:00", + "uidnext": 7, + "uidset": "4", + "uri": "imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487701", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "<e0ffe5d5a1569a35c1b62791390a48d5@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-802-1413803049-1", + "vnd.cmu.unseenMessages": 0 + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messageread)) + + # query by message-ID + events = self.query_log([('message_id','=','<e0ffe5d5a1569a35c1b62791390a48d5@example.org>')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertEqual(event['uidset'], '4') + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event.has_key('folder_id')) + + def test_004_flagsclear(self): + flagsclear = { + "event": "FlagsClear", + "flagNames": "\Seen", + "messages": 3, + "modseq": 47, + "pid": 489, + "service": "imap", + "timestamp": "2014-10-20T13:03:31.348+02:00", + "uidnext": 7, + "uidset": "4", + "uri": "imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487701", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "<e0ffe5d5a1569a35c1b62791390a48d5@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-489-1413803011-1", + "vnd.cmu.unseenMessages": 1 + } + + dealer = BonnieDealer() + dealer.run(json.dumps(flagsclear)) + + # query by message-ID + events = self.query_log([('message_id','=','<e0ffe5d5a1569a35c1b62791390a48d5@example.org>')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertEqual(event['uidset'], '4') + self.assertEqual(event['flag_names'], '\Seen') + self.assertEqual(event['unseen_messages'], 1) + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event.has_key('folder_id')) + + def test_005_messagetrash(self): + messagetrash = { + "event": "MessageTrash", + "messages": 2, + "modseq": 104, + "pid": 1248, + "service": "imap", + "timestamp": "2014-10-20T13:10:59.546+02:00", + "uidnext": 38, + "uidset": "36", + "uri": "imap://john.doe@example.org@kolab.example.org/Calendar;UIDVALIDITY=1411487702", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "NIL" ], + "vnd.cmu.sessionId": "kolab.example.org-1248-1413803459-1", + "vnd.cmu.unseenMessages": 2, + "messageHeaders": { + "36": { + "Content-Type": "multipart/mixed", + "Date": "2014-10-20T11:23:40Z", + "From": [ " <thomas.bruederli@example.org>" ], + "MIME-Version": "1.0", + "Subject": "253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0", + "To": [ " <thomas.bruederli@example.org>" ], + "User-Agent": "Kolab 3.1/Roundcube 1.1-git", + "X-Kolab-Mime-Version": "3.0", + "X-Kolab-Type": "application/x-vnd.kolab.event" + } + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messagetrash)) + + events = self.query_log([('event','=','MessageTrash')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event['headers']['Subject'] in messagetrash['messageHeaders']['36']['Subject']) + self.assertEqual(len(event['message_id']), 0) + + \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/functional/test_004_changelog.py
Added
@@ -0,0 +1,61 @@ +import json + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer + +import bonnie +conf = bonnie.getConf() + +class TestBonnieChangelog(TestBonnieFunctional): + + def test_001_changelog(self): + # we assume "messageHeaders" and "messageContent" payload already being collected + messageappend = { + "event": "MessageAppend", + "messageSize": 2932, + "messages": 2, + "modseq": 107, + "pid": 1248, + "service": "imap", + "timestamp": "2014-10-20T13:10:59.516+02:00", + "uidnext": 38, + "uidset": "37", + "uri": "imap://john.doe@example.org@kolab.example.org/Calendar;UIDVALIDITY=1411487702/;UID=37", + "user": "john.doe@example.org", + "vnd.cmu.envelope": "(\"Mon, 20 Oct 2014 13:10:59 +0200\" \"253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0\" ((NIL NIL \"john.doe\" \"example.org\")) ((NIL NIL \"john.doe\" \"example.org\")) ((NIL NIL \"john.doe\" \"example.org\")) ((NIL NIL \"john.doe\" \"example.org\")) NIL NIL NIL NIL)", + "vnd.cmu.midset": [ "NIL" ], + "vnd.cmu.sessionId": "kolab.example.org-1248-1413803459-1", + "vnd.cmu.unseenMessages": 2, + "messageHeaders": { + "37": { + "Content-Type": "multipart/mixed", + "Date": "2014-10-20T11:23:40Z", + "From": [ " <thomas.bruederli@example.org>" ], + "MIME-Version": "1.0", + "Subject": "253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0", + "To": [ " <thomas.bruederli@example.org>" ], + "User-Agent": "Kolab 3.1/Roundcube 1.1-git", + "X-Kolab-Mime-Version": "3.0", + "X-Kolab-Type": "application/x-vnd.kolab.event" + } + }, + "messageContent": { + "37": "MIME-Version: 1.0\r\nContent-Type: multipart/mixed;\r\n boundary=\"=_46bc539ab7a6c0a8bd4d2ddbf553df00\"\r\nFrom: thomas.bruederli@example.org\r\nTo: thomas.bruederli@example.org\r\nDate: Mon, 20 Oct 2014 13:23:40 +0200\r\nX-Kolab-Type: application/x-vnd.kolab.event\r\nX-Kolab-Mime-Version: 3.0\r\nSubject: 253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0\r\nUser-Agent: Kolab 3.1/Roundcube 1.1-git\r\n\r\n--=_46bc539ab7a6c0a8bd4d2ddbf553df00\r\nContent-Transfer-Encoding: quoted-printable\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nThis is a Kolab Groupware object....\r\n\r\n--=_46bc539ab7a6c0a8bd4d2ddbf553df00\r\nContent-Transfer-Encoding: 8bit\r\nContent-Type: application/calendar+xml; charset=UTF-8;\r\n name=kolab.xml\r\nContent-Disposition: attachment;\r\n filename=kolab.xml;\r\n size=1954\r\n\r\n<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\" ?>\r\n<icalendar xmlns=\"urn:ietf:params:xml:ns:icalendar-2.0\">\r\n\r\n <vcalendar>\r\n <properties>\r\n <prodid>\r\n <text>Roundcube-libkolab-1.1 Libkolabxml-1.1</text>\r\n </prodid>\r\n <version>\r\n <text>2.0</text>\r\n </version>\r\n <x-kolab-version>\r\n <text>3.1.0</text>\r\n </x-kolab-version>\r\n </properties>\r\n <components>\r\n <vevent>\r\n <properties>\r\n <uid>\r\n <text>253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0</text>\r\n </uid>\r\n <created>\r\n <date-time>2014-09-23T23:31:23Z</date-time>\r\n </created>\r\n <dtstamp>\r\n <date-time>2014-10-20T11:23:40Z</date-time>\r\n </dtstamp>\r\n <sequence>\r\n <integer>28</integer>\r\n </sequence>\r\n <class>\r\n <text>PUBLIC</text>\r\n </class>\r\n <dtstart>\r\n <parameters>\r\n <tzid>\r\n <text>/kolab.org/Europe/Berlin</text>\r\n </tzid>\r\n </parameters>\r\n <date-time>2014-10-20T14:00:00</date-time>\r\n </dtstart>\r\n <dtend>\r\n <parameters>\r\n <tzid>\r\n <text>/kolab.org/Europe/Berlin</text>\r\n </tzid>\r\n </parameters>\r\n <date-time>2014-10-20T16:00:00</date-time>\r\n </dtend>\r\n <summary>\r\n <text>Today</text>\r\n </summary>\r\n <description>\r\n <text>(new revision)</text>\r\n </description>\r\n <organizer>\r\n <parameters>\r\n <cn>\r\n <text>Br\u00fcederli, Thomas</text>\r\n </cn>\r\n </parameters>\r\n <cal-address>mailto:%3Cthomas.bruederli%40example.org%3E</cal-address>\r\n </organizer>\r\n </properties>\r\n </vevent>\r\n </components>\r\n </vcalendar>\r\n\r\n</icalendar>\r\n\r\n--=_46bc539ab7a6c0a8bd4d2ddbf553df00--\r\n" + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messageappend)) + + # query by subject (i.e. object UUID) + events = self.query_log([('headers.Subject','=','253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertIsInstance(event['headers'], dict) + self.assertEqual(event['headers']['X-Kolab-Mime-Version'], '3.0') + self.assertEqual(event['headers']['X-Kolab-Type'], 'application/x-vnd.kolab.event') + + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event.has_key('revision')) +
View file
bonnie-0.1.0.tar.gz/tests/functional/testbase.py
Added
@@ -0,0 +1,56 @@ +import os +import json +import time + +from twisted.trial import unittest +from subprocess import call +from bonnie.dealer import BonnieDealer +from bonnie.worker.storage import ElasticSearchStorage + +pwd = os.path.dirname(__file__) +basedir = os.path.join(pwd, '..', '..') + +import bonnie +conf = bonnie.getConf() +conf.finalize_conf() + +class TestBonnieFunctional(unittest.TestCase): + attempts = 12 + + def setUp(self): + self.storage = ElasticSearchStorage() + self.storage.es.indices.delete(index='logstash-*', ignore=[400, 404]) + self.storage.es.indices.delete(index='objects', ignore=[400, 404]) + + call([os.path.join(pwd, 'start.sh'), basedir]) + time.sleep(1) + + def tearDown(self): + call([os.path.join(pwd, 'stop.sh')]) + time.sleep(2) + + def query_log(self, query): + attempts = self.attempts + while attempts > 0: + attempts -= 1 + res = self.storage.select(query, index='logstash-*', doctype='logs', sortby='@timestamp:desc') + if res and res['total'] > 0: + return res['hits'] + + time.sleep(1) + # print "query retry", attempts + + return None + + def storage_get(self, key, index, doctype): + attempts = self.attempts + while attempts > 0: + attempts -= 1 + res = self.storage.get(key, index=index, doctype=doctype) + if res is not None: + return res + time.sleep(1) + # print "get retry", attempts + + return None +
View file
bonnie-0.1.tar.gz/tests/unit/__init__.py -> bonnie-0.1.0.tar.gz/tests/unit/__init__.py
Changed
@@ -1,3 +1,11 @@ import sys -sys.path = [ '.', '..' ] + sys.path +sys.path = [ '.', '../..' ] + sys.path + +from test_001_utils import TestBonnieUtils +from test_002_collector import TestBonnieCollector +from test_003_dealer import TestBonnieDealer +from test_004_worker import TestBonnieWorker +from test_005_persistence import TestBonniePersistence +from test_006_caching import TestBonnieCaching +
View file
bonnie-0.1.0.tar.gz/tests/unit/test_001_utils.py
Added
@@ -0,0 +1,78 @@ +import os +import json +from bonnie.utils import expand_uidset +from bonnie.utils import parse_imap_uri +from bonnie.utils import mail_message2dict +from bonnie.utils import decode_message_headers +from bonnie.utils import imap_folder_path +from bonnie.utils import imap_mailbox_fs_path +from email import message_from_string +from twisted.trial import unittest + + +class TestBonnieUtils(unittest.TestCase): + + def setUp(self): + pass + + def _get_resource(self, filename): + pwd = os.path.dirname(__file__) + filepath = os.path.join(pwd, 'resources', filename) + fp = open(filepath, 'r') + data = fp.read() + fp.close() + return data + + def test_expand_uidset(self): + self.assertEqual(expand_uidset('3'), ['3']) + self.assertEqual(expand_uidset('3,5'), ['3','5']) + self.assertEqual(expand_uidset('3:5'), ['3','4','5']) + + def test_parse_imap_uri(self): + url = parse_imap_uri("imap://john.doe@example.org@kolab.example.org/Calendar/Personal%20Calendar;UIDVALIDITY=1411487702/;UID=3") + self.assertEqual(url['host'], 'kolab.example.org') + self.assertEqual(url['user'], 'john.doe') + self.assertEqual(url['domain'], 'example.org') + self.assertEqual(url['path'], 'Calendar/Personal Calendar') + self.assertEqual(url['UID'], '3') + + def test_decode_message_headers(self): + message = message_from_string(self._get_resource('3.')) + headers = decode_message_headers(message) + + self.assertEqual(len(headers['From']), 1) + self.assertEqual(len(headers['To']), 2) + self.assertEqual(headers['To'][0], u'Br\u00fcderli, Thomas <thomas.bruederli@example.org>') + self.assertEqual(headers['Content-Type'], 'text/plain') + self.assertEqual(headers['Date'], '2014-09-24T04:52:00Z') + self.assertEqual(headers['Subject'], 'Test') + + def test_mail_message2dict(self): + message = mail_message2dict(self._get_resource('event_mime_message.eml')) + + self.assertIsInstance(message, dict) + self.assertEqual(message['Subject'], '253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0') + self.assertEqual(message['X-Kolab-Type'], 'application/x-vnd.kolab.event') + self.assertEqual(len(message['@parts']), 2) + + xmlpart = message['@parts'][1] + self.assertEqual(xmlpart['Content-Type'], 'application/calendar+xml; charset=UTF-8; name=kolab.xml') + + message2 = mail_message2dict("FOO") + self.assertIsInstance(message2, dict) + self.assertEqual(message2['@body'], "FOO") + + def test_imap_folder_path(self): + p1 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/Calendar;UID=3") + self.assertEqual(p1, "user/john.doe/Calendar@example.org") + + p2 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487702") + self.assertEqual(p2, "user/john.doe@example.org") + + # test shared folders (but how are they referred in the uri?) + p3 = imap_folder_path("imap://kolab.example.org/Shared%20Folders/shared/Project-X%40example.org;UIDVALIDITY=1412093781/;UID=2") + self.assertEqual(p3, "shared/Project-X@example.org") + + def test_imap_mailbox_fs_path(self): + path = imap_mailbox_fs_path("imap://john.doe@example.org@kolab.example.org/Calendar/Personal%20Calendar;UID=3") + self.assertEqual(path, "/var/spool/imap/domain/e/example.org/j/user/john^doe/Calendar/Personal Calendar")
View file
bonnie-0.1.0.tar.gz/tests/unit/test_002_collector.py
Changed
(renamed from tests/unit/test-002-collector.py)
View file
bonnie-0.1.0.tar.gz/tests/unit/test_003_dealer.py
Changed
(renamed from tests/unit/test-003-dealer.py)
View file
bonnie-0.1.0.tar.gz/tests/unit/test_004_worker.py
Changed
(renamed from tests/unit/test-004-worker.py)
View file
bonnie-0.1.0.tar.gz/tests/unit/test_005_persistence.py
Added
@@ -0,0 +1,100 @@ +import os +import json +import bonnie +from bonnie.broker import persistence +from bonnie.broker.persistence import db, PersistentBase +from bonnie.broker.brokers.zmq_broker.job import Job +from twisted.trial import unittest + +class PlistItem(PersistentBase): + __tablename__ = 'plisttest' + id = db.Column(db.Integer, primary_key=True) + value = db.Column(db.String) + + def __init__(self, value, id=None): + self.value = value + self.id = id + + def __repr__(self): + return '<PlistItem %s:%r>' % (self.id, self.value) + + +class TestBonniePersistence(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + # TODO: clear database + # PersistentBase.metadata.drop_all(bonnie.broker.persistence.engine) + pass + + def test_001_base_list(self): + plist = persistence.List('base', PlistItem) + plist.append(PlistItem("One")) + plist.append(PlistItem("Two")) + item3 = PlistItem("Three") + plist.append(item3) + + self.assertEqual(len(plist), 3) + self.assertEqual(plist[2], item3) + self.assertEqual(2, plist.index(item3)) + self.assertTrue(item3 in plist) + + plist.append(PlistItem("Five")) + plist.append(PlistItem("Six")) + + plist[4] = PlistItem("Five.5") + self.assertEqual(plist[4].value, "Five.5") + + del plist[plist.index(item3)] + plist.pop(3) + plist.pop() + self.assertEqual(len(plist), 2) + + i = 0 + for item in plist: + i += 1 + self.assertTrue(isinstance(item, PlistItem)) + + self.assertEqual(i, 2) + + + def test_003_broker_jobs(self): + worker_jobs = persistence.List('worker', Job) + collector_jobs = persistence.List('collector', Job) + one = Job(state='PENDING', notification='{"state":"pending","event":"test"}', collector_id='C.1') + two = Job(state='PENDING', notification='{"state":"pending","event":"other"}', collector_id='C.1') + done = Job(state='DONE', notification='{"state":"done","event":"passed"}', collector_id='C.1') + worker_jobs.append(one) + worker_jobs.append(two) + worker_jobs.append(done) + + self.assertEqual(len(worker_jobs), 3) + self.assertEqual(len(collector_jobs), 0) + self.assertTrue(one.uuid in [x.uuid for x in worker_jobs]) + pending = [x for x in worker_jobs if x.state == 'PENDING' and x.collector_id == 'C.1'] + self.assertEqual(len(pending), 2) + job = pending.pop() + + self.assertEqual(job, two) + self.assertEqual(len(pending), 1) + self.assertEqual(len(worker_jobs), 3) + + # move job to another list + collector_jobs.append(one) + self.assertTrue(one in collector_jobs) + self.assertTrue(one in worker_jobs) + + worker_jobs.remove(one) + self.assertFalse(one in worker_jobs) + + self.assertEqual(len(worker_jobs), 2) + self.assertEqual(len(collector_jobs), 1) + + # move job to the end of the queue + worker_jobs.remove(two) + worker_jobs.append(two) + + self.assertEqual(worker_jobs[0], done) + self.assertEqual(worker_jobs[-1], two) \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/unit/test_006_caching.py
Added
@@ -0,0 +1,51 @@ +import time + +from twisted.trial import unittest +from bonnie.worker.storage import CachedDict + +class TestBonnieCaching(unittest.TestCase): + + def test_cached_dict(self): + # dict with 10 seconds TTL + d = CachedDict(10) + d['one'] = 'ONE' + d['two'] = 'TWO' + + self.assertEqual(len(d), 2) + self.assertTrue(d.has_key('one')) + self.assertFalse(d.has_key('none')) + self.assertEqual(d['one'], 'ONE') + # internal sorting is influenced by the expiry time + # but sorting in dicts is irrelevant in most cases + self.assertEqual(sorted(d.keys()), ['one','two']) + self.assertEqual(sorted(d.values()), ['ONE','TWO']) + self.assertEqual(sorted(d.items()), [ ('one','ONE'), ('two','TWO') ]) + + time.sleep(5) + d['five'] = 'FIVE' + time.sleep(1) + d['six'] = 'SIX' + self.assertEqual(len(d), 4) + self.assertEqual(d.pop('five'), 'FIVE') + self.assertEqual(len(d), 3) + + # let the first two items expire + time.sleep(5) + self.assertEqual(len(d), 1) + self.assertFalse(d.has_key('one')) + + # test iterator + # 'five' was popped, thus only 'six' remains + for k,v in d: + self.assertEqual(k, 'six') + self.assertEqual(v, 'SIX') + + # all expired + time.sleep(5) + self.assertEqual(len(d), 0) + self.assertEqual(len(d.data), 3) + + # expunge internal cache + d.expunge() + self.assertEqual(len(d.data), 0) + \ No newline at end of file
View file
bonnie-0.1.tar.gz/worker.py -> bonnie-0.1.0.tar.gz/worker.py
Changed
@@ -1,10 +1,8 @@ #!/usr/bin/python -import signal from bonnie.worker import BonnieWorker if __name__ == "__main__": worker = BonnieWorker() - signal.signal(signal.SIGTERM, worker.terminate) - worker.run() + worker.start()
View file
bonnie-0.1.tar.gz/tests/unit/test-001-utils.py
Deleted
@@ -1,78 +0,0 @@ -import os -import json -from bonnie.utils import expand_uidset -from bonnie.utils import parse_imap_uri -from bonnie.utils import mail_message2dict -from bonnie.utils import decode_message_headers -from bonnie.utils import imap_folder_path -from bonnie.utils import imap_mailbox_fs_path -from email import message_from_string -from twisted.trial import unittest - - -class TestBonnieUtils(unittest.TestCase): - - def setUp(self): - pass - - def _get_resource(self, filename): - pwd = os.path.dirname(__file__) - filepath = os.path.join(pwd, 'resources', filename) - fp = open(filepath, 'r') - data = fp.read() - fp.close() - return data - - def test_expand_uidset(self): - self.assertEqual(expand_uidset('3'), ['3']) - self.assertEqual(expand_uidset('3,5'), ['3','5']) - self.assertEqual(expand_uidset('3:5'), ['3','4','5']) - - def test_parse_imap_uri(self): - url = parse_imap_uri("imap://john.doe@example.org@kolab33.example.org/Calendar/Personal%20Calendar;UIDVALIDITY=1411487702/;UID=3") - self.assertEqual(url['host'], 'kolab33.example.org') - self.assertEqual(url['user'], 'john.doe') - self.assertEqual(url['domain'], 'example.org') - self.assertEqual(url['path'], 'Calendar/Personal Calendar') - self.assertEqual(url['UID'], '3') - - def test_decode_message_headers(self): - message = message_from_string(self._get_resource('3.')) - headers = decode_message_headers(message) - - self.assertEqual(len(headers['From']), 1) - self.assertEqual(len(headers['To']), 2) - self.assertEqual(headers['To'][0], u'Br\u00fcderli, Thomas <thomas.bruederli@example.org>') - self.assertEqual(headers['Content-Type'], 'text/plain') - self.assertEqual(headers['Date'], '2014-09-24T04:52:00Z') - self.assertEqual(headers['Subject'], 'Test') - - def test_mail_message2dict(self): - message = mail_message2dict(self._get_resource('event_mime_message.eml')) - - self.assertIsInstance(message, dict) - self.assertEqual(message['Subject'], '253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0') - self.assertEqual(message['X-Kolab-Type'], 'application/x-vnd.kolab.event') - self.assertEqual(len(message['@parts']), 2) - - xmlpart = message['@parts'][1] - self.assertEqual(xmlpart['Content-Type'], 'application/calendar+xml; charset=UTF-8; name=kolab.xml') - - message2 = mail_message2dict("FOO") - self.assertIsInstance(message2, dict) - self.assertEqual(message2['@body'], "FOO") - - def test_imap_folder_path(self): - p1 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/Calendar;UID=3") - self.assertEqual(p1, "user/john.doe/Calendar@example.org") - - p2 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487702") - self.assertEqual(p2, "user/john.doe@example.org") - - # test shared folders (but how are they referred in the uri?) - p3 = imap_folder_path("imap://kolab33.example.org/Shared%20Folders/shared/Project-X%40example.org;UIDVALIDITY=1412093781/;UID=2") - self.assertEqual(p3, "shared/Project-X@example.org") - - def test_imap_mailbox_fs_path(self): - path = imap_mailbox_fs_path("imap://john.doe@example.org@kolab.example.org/Calendar/Personal%20Calendar;UID=3") - self.assertEqual(path, "/var/spool/imap/domain/e/example.org/j/user/john^doe/Calendar/Personal Calendar")
View file
bonnie-0.1.tar.gz/tests/unit/test-005-persistence.py
Deleted
@@ -1,94 +0,0 @@ -import os -import json -import bonnie -from bonnie.broker import persistence -from bonnie.broker.persistence import db, PersistentBase -from bonnie.broker.brokers.zmq_broker.job import Job -from twisted.trial import unittest - -class PlistItem(PersistentBase): - __tablename__ = 'plisttest' - id = db.Column(db.Integer, primary_key=True) - value = db.Column(db.String) - - def __init__(self, value, id=None): - self.value = value - self.id = id - - def __repr__(self): - return '<PlistItem %s:%r>' % (self.id, self.value) - - -class TestBonniePersistence(unittest.TestCase): - - def setUp(self): - pass - - def tearDown(self): - # TODO: clear database - # PersistentBase.metadata.drop_all(bonnie.broker.persistence.engine) - pass - - def test_001_base_list(self): - plist = persistence.List('base', PlistItem) - plist.append(PlistItem("One")) - plist.append(PlistItem("Two")) - item3 = PlistItem("Three") - plist.append(item3) - - self.assertEqual(len(plist), 3) - self.assertEqual(plist[2], item3) - self.assertEqual(2, plist.index(item3)) - self.assertTrue(item3 in plist) - - plist.append(PlistItem("Five")) - plist.append(PlistItem("Six")) - - plist[4] = PlistItem("Five.5") - self.assertEqual(plist[4].value, "Five.5") - - del plist[plist.index(item3)] - plist.pop(3) - plist.pop() - self.assertEqual(len(plist), 2) - - i = 0 - for item in plist: - i += 1 - self.assertTrue(isinstance(item, PlistItem)) - - self.assertEqual(i, 2) - - - def test_003_broker_jobs(self): - worker_jobs = persistence.List('worker', Job) - collector_jobs = persistence.List('collector', Job) - one = Job(state='PENDING', notification='{"state":"pending","event":"test"}', collector_id='C.1') - two = Job(state='PENDING', notification='{"state":"pending","event":"other"}', collector_id='C.1') - done = Job(state='DONE', notification='{"state":"done","event":"passed"}', collector_id='C.1') - worker_jobs.append(one) - worker_jobs.append(two) - worker_jobs.append(done) - - self.assertEqual(len(worker_jobs), 3) - self.assertEqual(len(collector_jobs), 0) - self.assertTrue(one.uuid in [x.uuid for x in worker_jobs]) - pending = [x for x in worker_jobs if x.state == 'PENDING' and x.collector_id == 'C.1'] - self.assertEqual(len(pending), 2) - job = pending.pop() - - self.assertEqual(job, two) - self.assertEqual(len(pending), 1) - self.assertEqual(len(worker_jobs), 3) - - # move job to another list - collector_jobs.append(one) - self.assertTrue(one in collector_jobs) - self.assertTrue(one in worker_jobs) - - worker_jobs.remove(one) - self.assertFalse(one in worker_jobs) - - self.assertEqual(len(worker_jobs), 2) - self.assertEqual(len(collector_jobs), 1) - \ No newline at end of file
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.