Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
We truncated the diff of some files because they were too big. If you want to see the full diff for every file,
click here
.
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 23
View file
bonnie.spec
Changed
@@ -1,5 +1,5 @@ Name: bonnie -Version: 0.1 +Version: 0.1.0 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware
View file
bonnie-0.1.tar.gz/bonnie/broker/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/broker/__init__.py
Changed
@@ -22,14 +22,19 @@ """ This is the broker for Bonnie. """ - import brokers -class BonnieBroker(object): - broker_interests = {} - broker_modules = {} +from bonnie.daemon import BonnieDaemon + +class BonnieBroker(BonnieDaemon): + pidfile = "/var/run/bonnie/broker.pid" def __init__(self, *args, **kw): + super(BonnieBroker, self).__init__(*args, **kw) + + self.broker_interests = {} + self.broker_modules = {} + for _class in brokers.list_classes(): module = _class() module.register(callback=self.register_broker)
View file
bonnie-0.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -39,6 +39,7 @@ from bonnie.broker import persistence class ZMQBroker(object): + MAX_RETRIES = 5 running = False def __init__(self): @@ -69,7 +70,7 @@ return job = jobs.pop() - job.set_state(b"ALLOC") + job.set_status(b"ALLOC") return job def collect_jobs_with_status(self, _state, collector_id=None): @@ -112,9 +113,10 @@ if len(jobs) < 1: return None - job = jobs.pop() + # take the first job in the queue + job = jobs[0] - job.set_state(b"ALLOC") + job.set_status(b"ALLOC") job.set_worker(_worker_id) return job.uuid @@ -124,11 +126,24 @@ self.worker_jobs.delete(job) log.debug("Worker job done: %s;" % (_job_uuid), level=8) - def worker_job_free(self, _job_uuid): + def worker_job_free(self, _job_uuid, pushback=False): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: job.set_status(b"PENDING") job.set_worker(None) + if pushback: + # increment retry count on pushback + job.retries += 1 + log.debug("Push back job %s for %d. time" % (_job_uuid, job.retries), level=8) + if job.retries > self.MAX_RETRIES: + # delete job after MAX retries + self.worker_jobs.delete(job) + log.info("Delete pushed back job %s" % (_job_uuid)) + else: + # move it to the end of the job queue + self.worker_jobs.remove(job) + self.worker_jobs.append(job) + def worker_job_send(self, _job_uuid, _worker_id): # TODO: Sanity check on job state, worker assignment, etc. for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: @@ -225,16 +240,19 @@ # reset existing jobs in self.worker_jobs and self.collect_jobs to status PENDING (?) # this will re-assign them to workers and collectors after a broker restart for job in self.worker_jobs: - job.set_state(b"PENDING") + job.set_status(b"PENDING") for job in self.collect_jobs: - job.set_state(b"PENDING") + job.set_status(b"PENDING") persistence.syncronize() while self.running: try: sockets = dict(poller.poll(1000)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break except Exception, e: log.error("zmq.Poller error: %r", e) sockets = dict() @@ -254,6 +272,9 @@ if _message[1] == b"DONE": self.worker_job_done(_message[2]) + if _message[1] == b"PUSHBACK": + self.worker_job_free(_message[2], True) + if _message[1] in self.collector_interests: _job_uuid = _message[2] self.transit_job_collect(_job_uuid, _message[1]) @@ -330,14 +351,14 @@ def transit_job_collect(self, _job_uuid, _command): for job in [x for x in self.worker_jobs if x.uuid == _job_uuid]: - job.set_state(b"PENDING") + job.set_status(b"PENDING") job.set_command(_command) self.collect_jobs.append(job) self.worker_jobs.remove(job) def transit_job_worker(self, _job_uuid, _notification): for job in [x for x in self.collect_jobs if x.uuid == _job_uuid]: - job.set_state(b"PENDING") + job.set_status(b"PENDING") job.notification = _notification self.worker_jobs.append(job) self.collect_jobs.remove(job)
View file
bonnie-0.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.1.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -45,6 +45,7 @@ self.collector_id = collector_id self.timestamp = time.time() self.command = None + self.retries = 0 if self.client_id == None: if self.collector_id == None: @@ -54,7 +55,7 @@ else: self.type = 'Dealer' - def set_state(self, state): + def set_status(self, state): self.state = state def set_worker(self, worker_id):
View file
bonnie-0.1.tar.gz/bonnie/collector/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/collector/__init__.py
Changed
@@ -24,32 +24,24 @@ import inputs import handlers -import bonnie from bonnie.utils import parse_imap_uri +from bonnie.daemon import BonnieDaemon +import bonnie conf = bonnie.getConf() log = bonnie.getLogger('bonnie.collector') -class BonnieCollector(object): - input_interests = {} - input_modules = {} - - handler_interests = {} - handler_modules = {} - +class BonnieCollector(BonnieDaemon): + pidfile = "/var/run/bonnie/collector.pid" def __init__(self, *args, **kw): - # TODO: read active input module from config collector.input_modules - for _class in inputs.list_classes(): - module = _class() - module.register(callback=self.register_input) - self.input_modules[_class] = module + super(BonnieCollector, self).__init__(*args, **kw) - # TODO: read active handler module from config collector.handler_modules - for _class in handlers.list_classes(): - handler = _class() - handler.register(callback=self.register_handler) - self.handler_modules[_class] = handler + self.input_interests = {} + self.input_modules = {} + + self.handler_interests = {} + self.handler_modules = {} def execute(self, command, notification): """ @@ -74,6 +66,18 @@ self.handler_interests[interest].append(how) def run(self): + # TODO: read active input module from config collector.input_modules + for _class in inputs.list_classes(): + module = _class() + module.register(callback=self.register_input) + self.input_modules[_class] = module + + # TODO: read active handler module from config collector.handler_modules + for _class in handlers.list_classes(): + handler = _class() + handler.register(callback=self.register_handler) + self.handler_modules[_class] = handler + input_modules = conf.get('collector', 'input_modules').split(',') for _input in self.input_modules.values(): if _input.name() in input_modules:
View file
bonnie-0.1.tar.gz/bonnie/collector/handlers/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/collector/handlers/__init__.py
Changed
@@ -1,13 +1,16 @@ from messagedata import MessageDataHandler from imapdata import IMAPDataHandler +from ldapdata import LDAPDataHandler __all__ = [ 'MessageDataHandler', - 'IMAPDataHandler' + 'IMAPDataHandler', + 'LDAPDataHandler' ] def list_classes(): return [ MessageDataHandler, - IMAPDataHandler + IMAPDataHandler, + LDAPDataHandler ] \ No newline at end of file
View file
bonnie-0.1.tar.gz/bonnie/collector/handlers/imapdata.py -> bonnie-0.1.0.tar.gz/bonnie/collector/handlers/imapdata.py
Changed
@@ -37,7 +37,7 @@ # load pykolab conf conf = pykolab.getConf() if not hasattr(conf, 'defaults'): - conf.finalize_conf() + conf.finalize_conf(fatal=False) self.imap = IMAP()
View file
bonnie-0.1.0.tar.gz/bonnie/collector/handlers/ldapdata.py
Added
@@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import json +import pykolab +from pykolab.auth import Auth + +import bonnie +from bonnie.utils import parse_imap_uri +from bonnie.utils import imap_folder_path + +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.collector.LDAPDataHandler') + +class LDAPDataHandler(object): + """ + Collector handler to provide user data from LDAP + """ + + def __init__(self, *args, **kw): + # load pykolab conf + self.pykolab_conf = pykolab.getConf() + if not hasattr(self.pykolab_conf, 'defaults'): + self.pykolab_conf.finalize_conf(fatal=False) + + self.ldap = Auth() + self.connections = 0 + + def register(self, callback): + interests = { + 'GETUSERDATA': { 'callback': self.get_user_data } + } + + callback(interests) + + def get_user_data(self, notification): + notification = json.loads(notification) + log.debug("GETUSERDATA for %r" % (notification), level=9) + + if notification.has_key('user'): + self.connections += 1 + + try: + self.ldap.connect() + user_dn = self.ldap.find_user_dn(notification['user'], True) + log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8) + except Exception, e: + log.error("LDAP connection error: %r", e) + user_dn = None + + if user_dn: + unique_attr = self.pykolab_conf.get('ldap', 'unique_attribute', 'nsuniqueid') + user_rec = self.ldap.get_entry_attributes(None, user_dn, [unique_attr, 'cn']) + log.debug("User attributes: %r" % (user_rec), level=8) + + if user_rec and user_rec.has_key(unique_attr): + user_rec['dn'] = user_dn + user_rec['id'] = user_rec[unique_attr] + del user_rec[unique_attr] + else: + user_rec = None + + notification['user_data'] = user_rec + + self.connections -= 1 + + if self.connections == 0: + self.ldap.disconnect() + + return json.dumps(notification)
View file
bonnie-0.1.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.1.0.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -41,6 +41,7 @@ running = False def __init__(self, *args, **kw): + self.interests = [] self.context = zmq.Context() zmq_broker_address = conf.get('collector', 'zmq_broker_address') @@ -63,8 +64,8 @@ pass def report_state(self, interests=[]): - log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, interests), level=9) - self.collector.send_multipart([b"STATE", self.state, ",".join(interests)]) + log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) + self.collector.send_multipart([b"STATE", self.state, ",".join(self.interests)]) self.report_timestamp = time.time() def run(self, callback=None, interests=[]): @@ -73,11 +74,15 @@ self.running = True # report READY state with interests - self.report_state(interests) + self.interests = interests + self.report_state() while self.running: try: sockets = dict(self.poller.poll(1000)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break except Exception, e: log.error("zmq.Poller error: %r", e) sockets = dict()
View file
bonnie-0.1.0.tar.gz/bonnie/daemon.py
Added
@@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import os +import sys +import grp +import pwd +import signal +import traceback +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie') + +class BonnieDaemon(object): + pidfile = "/var/run/bonnie/bonnie.pid" + + def __init__(self, *args, **kw): + daemon_group = conf.add_cli_parser_option_group("Daemon Options") + + daemon_group.add_option( + "--fork", + dest = "fork_mode", + action = "store_true", + default = False, + help = "Fork to the background." + ) + + daemon_group.add_option( + "-p", + "--pid-file", + dest = "pidfile", + action = "store", + default = self.pidfile, + help = "Path to the PID file to use." + ) + + daemon_group.add_option( + "-u", + "--user", + dest = "process_username", + action = "store", + default = "kolab", + help = "Run as user USERNAME", + metavar = "USERNAME" + ) + + daemon_group.add_option( + "-g", + "--group", + dest = "process_groupname", + action = "store", + default = "kolab", + help = "Run as group GROUPNAME", + metavar = "GROUPNAME" + ) + + conf.finalize_conf() + + def run(self, *args, **kw): + """ + The daemon main loop + """ + pass + + def start(self, *args, **kw): + """ + Start the daemon + """ + exitcode = 0 + terminate = True + + if conf.fork_mode: + self.drop_privileges() + + try: + pid = 1 + if conf.fork_mode: + pid = daemonize() + + if pid == 0: + self.write_pid() + self.signal_handlers() + self.run(*args, **kw) + elif not conf.fork_mode: + self.signal_handlers() + self.run(*args, **kw) + else: + terminate = False + + except SystemExit, errcode: + terminate = False + exitcode = errcode + + except KeyboardInterrupt: + exitcode = 1 + log.info("Interrupted by user") + + except (AttributeError, TypeError) as errmsg: + exitcode = 1 + traceback.print_exc() + print >> sys.stderr, "Traceback occurred, please report a " + \ + "bug at https://issues.kolab.org" + + except: + exitcode = 2 + traceback.print_exc() + print >> sys.stderr, "Traceback occurred, please report a " + \ + "bug at https://issues.kolab.org" + + if terminate: + self.terminate() + + sys.exit(exitcode) + + def terminate(self, *args, **kw): + """ + Daemon shutdown function + """ + self.remove_pid() + + def signal_handlers(self): + """ + Register process signal handlers + """ + signal.signal(signal.SIGTERM, self.terminate) + + def write_pid(self): + """ + Write the process ID to the configured pid file + """ + pid = os.getpid() + fp = open(conf.pidfile, 'w') + fp.write("%d\n" % (pid)) + fp.close() + + def remove_pid(self, *args, **kw): + """ + Remove our PID file. + """ + if os.access(conf.pidfile, os.R_OK): + try: + os.remove(conf.pidfile) + except: + pass + + raise SystemExit + + def drop_privileges(self): + try: + try: + (ruid, euid, suid) = os.getresuid() + (rgid, egid, sgid) = os.getresgid() + except AttributeError, errmsg: + ruid = os.getuid() + rgid = os.getgid() + + if ruid == 0: + # Means we can setreuid() / setregid() / setgroups() + if rgid == 0: + # Get group entry details + try: + ( + group_name, + group_password, + group_gid, + group_members + ) = grp.getgrnam(conf.process_groupname) + + except KeyError: + print >> sys.stderr, "Group %s does not exist" % (conf.process_groupname) + sys.exit(1) + + # Set real and effective group if not the same as current. + if not group_gid == rgid: + log.debug("Switching real and effective group id to %d" % (group_gid), level=8) + os.setregid(group_gid, group_gid) + + if ruid == 0: + # Means we haven't switched yet. + try: + (
View file
bonnie-0.1.tar.gz/bonnie/dealer/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/dealer/__init__.py
Changed
@@ -28,23 +28,28 @@ log = bonnie.getLogger('bonnie.dealer') class BonnieDealer(object): - output_modules = {} - def __init__(self, *args, **kw): + self.output_modules = {} + for _class in outputs.list_classes(): __class = _class() self.output_modules[__class] = __class.register(callback=self.register_output) def register_output(self, interests): - self.output_interests = interests + pass def accept_notification(self, notification): parsed = json.loads(notification) event = parsed['event'] user = parsed['user'] if parsed.has_key('user') else None - blacklist_events = conf.get('dealer', 'blacklist_events').split(',') - blacklist_users = conf.get('dealer', 'blacklist_users').split(',') + # ignore globally excluded events + exclude_events = conf.get('dealer', 'input_exclude_events', '').split(',') + if event in exclude_events: + return False + + blacklist_events = conf.get('dealer', 'blacklist_events', '').split(',') + blacklist_users = conf.get('dealer', 'blacklist_users', '').split(',') # ignore blacklisted events for blacklisted users if event in blacklist_events and user is not None and user in blacklist_users:
View file
bonnie-0.1.tar.gz/bonnie/dealer/outputs/zmq_output.py -> bonnie-0.1.0.tar.gz/bonnie/dealer/outputs/zmq_output.py
Changed
@@ -48,7 +48,7 @@ return 'zmq_output' def register(self, *args, **kw): - return [ 'MailboxCreate' ] + return self.run def run(self, notification): log.debug("[%s] Notification received: %r" % (self.dealer.identity, notification), level=9)
View file
bonnie-0.1.tar.gz/bonnie/logger.py -> bonnie-0.1.0.tar.gz/bonnie/logger.py
Changed
@@ -36,36 +36,51 @@ """ debuglevel = 0 fork = False + logfile = '/var/log/bonnie/bonnie.log' loglevel = logging.CRITICAL if hasattr(sys, 'argv'): for arg in sys.argv: + value = None + if '=' in arg: + (arg,value) = arg.split('=')[0:2] + + if '-d' == arg or '--debug' == arg: + debuglevel = value if value is not None else -1 + continue + if debuglevel == -1: try: debuglevel = int(arg) except ValueError, errmsg: + debuglevel = 0 continue loglevel = logging.DEBUG - break - - if '-d' == arg: - debuglevel = -1 continue if '-l' == arg: loglevel = -1 continue - if '--fork' == arg: - fork = True - if loglevel == -1: if hasattr(logging,arg.upper()): loglevel = getattr(logging,arg.upper()) else: loglevel = logging.DEBUG + if '--logfile' == arg: + logfile = value + continue + + if logfile is None: + logfile = arg + continue + + if '--fork' == arg: + fork = True + continue + def __init__(self, *args, **kw): if kw.has_key('name'): name = kw['name'] @@ -86,9 +101,11 @@ if kw.has_key('logfile'): self.logfile = kw['logfile'] - else: + elif self.logfile is None: self.logfile = '/var/log/bonnie/bonnie.log' + self.setLevel(self.loglevel) + # Make sure (read: attempt to change) the permissions try: (ruid, euid, suid) = os.getresuid() @@ -137,8 +154,16 @@ self.console_stdout.close() self.removeHandler(self.console_stdout) + def info(self, msg, *args): + # Suppress info messages from other applications according to debug level + if self.name.startswith('sqlalchemy') and self.debuglevel < 9: + return + if not self.name.startswith('bonnie') and self.debuglevel < 8: + return + + self.log(logging.INFO, '[%d]: %s' % (os.getpid(), msg) % args) + def debug(self, msg, level=1, *args): - self.setLevel(self.loglevel) # Work around other applications not using various levels of debugging if not self.name.startswith('bonnie') and not self.debuglevel == 9: return
View file
bonnie-0.1.tar.gz/bonnie/translate.py -> bonnie-0.1.0.tar.gz/bonnie/translate.py
Changed
@@ -26,7 +26,10 @@ import os N_ = lambda x: x -_ = lambda x: gettext.ldgettext(domain, x) +_ = lambda x: current.lgettext(x) + +localedir = '/usr/local/share/locale' +current = gettext.translation(domain, localedir, fallback=True) def getDefaultLangs(): languages = []
View file
bonnie-0.1.tar.gz/bonnie/utils.py -> bonnie-0.1.0.tar.gz/bonnie/utils.py
Changed
@@ -221,4 +221,4 @@ mailbox_path = os.path.join(mailbox_path, folder_name) return mailbox_path - \ No newline at end of file +
View file
bonnie-0.1.tar.gz/bonnie/worker/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/worker/__init__.py
Changed
@@ -21,17 +21,93 @@ # import json - +import time import handlers import inputs import outputs import storage +import signal + +from bonnie.translate import _ +from bonnie.daemon import BonnieDaemon +from multiprocessing import Process import bonnie conf = bonnie.getConf() -from bonnie.translate import _ +log = bonnie.getLogger('bonnie.worker') + +class BonnieWorker(BonnieDaemon): + pidfile = "/var/run/bonnie/worker.pid" + + def __init__(self, *args, **kw): + worker_group = conf.add_cli_parser_option_group("Worker Options") + + worker_group.add_option( + "-n", + "--num-childs", + dest = "num_childs", + action = "store", + default = None, + help = "Number of child processes to spawn" + ) + + super(BonnieWorker, self).__init__(*args, **kw) + + self.childs = [] + self.manager = False + self.running = False + + def run(self): + """ + Daemon main loop + """ + num_childs = conf.num_childs or conf.get('worker', 'num_childs') + if num_childs is not None: + num_childs = int(num_childs) + + if num_childs is None or num_childs < 1: + main = BonnieWorkerProcess() + self.childs.append(main) + main.run() # blocking + else: + conf.fork_mode = False + self.manager = True + self.running = True + + while self.running: + # (re)start child worker processes + while len(self.childs) < num_childs: + p = Process(target=self.run_child) + self.childs.append(p) + p.start() + + # check states of child processes + for p in self.childs: + if not p.is_alive(): + log.info("Restarting dead worker process %r", p.pid) + self.childs.remove(p) + + time.sleep(10) + + log.info("Shutting down worker manager") + + def run_child(self): + """ + This method is being run in a separate process + """ + BonnieWorkerProcess(as_child=True).run() -class BonnieWorker(object): + def terminate(self, *args, **kw): + self.running = False + for p in self.childs: + p.terminate() + + if self.manager: + for p in self.childs: + p.join() + + +class BonnieWorkerProcess(object): handler_interests = { '_all': [] } input_interests = {} storage_interests = {} @@ -42,28 +118,9 @@ storage_modules = {} output_modules = {} - def __init__(self, *args, **kw): - - daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) - - daemon_group.add_option( - "--fork", - dest = "fork_mode", - action = "store_true", - default = False, - help = _("Fork to the background.") - ) - - daemon_group.add_option( - "-p", - "--pid-file", - dest = "pidfile", - action = "store", - default = "/var/run/bonnie/worker.pid", - help = _("Path to the PID file to use.") - ) - - conf.finalize_conf() + def __init__(self, as_child=False, *args, **kw): + if as_child: + signal.signal(signal.SIGTERM, self.terminate) for _class in handlers.list_classes(): __class = _class() @@ -90,6 +147,8 @@ self.storage_modules[_class] = _storage self.storage = _storage + self.output_exclude_events = conf.get('worker', 'output_exclude_events', '').split(',') + def event_notification(self, notification): """ Input an event notification in to our process. @@ -123,7 +182,7 @@ jobs.extend(_jobs) # finally send notification to output handlers if no jobs remaining - if len(jobs) == 0 and not notification.has_key('_suppress_output'): + if len(jobs) == 0 and not notification.has_key('_suppress_output') and not event in self.output_exclude_events: if self.output_interests.has_key(event): for interest in self.output_interests[event]: (notification, _jobs) = self.interest_callback(interest, notification) @@ -136,6 +195,23 @@ return notification, list(set(jobs)) + def input_report(self): + """ + Periodic callbacks from the input main loop. + Forward to all handler, storage and output modules + """ + for _handler in self.handler_modules.values(): + if hasattr(_handler, 'report'): + _handler.report() + + for _storage in self.storage_modules.values(): + if hasattr(_storage, 'report'): + _storage.report() + + for _storage in self.storage_modules.values(): + if hasattr(_storage, 'report'): + _storage.report() + def interest_callback(self, interest, notification): """ Helper method to call an interest callback @@ -156,7 +232,6 @@ { 'MessageAppend': { 'callback': self.run } } """ - print 'register_handler', interests for interest,how in interests.iteritems(): if not self.handler_interests.has_key(interest): self.handler_interests[interest] = [] @@ -191,7 +266,7 @@ input_modules = conf.get('worker', 'input_modules').split(',') for _input in self.input_modules.values(): if _input.name() in input_modules: - _input.run(callback=self.event_notification) + _input.run(callback=self.event_notification, report=self.input_report) def terminate(self, *args, **kw): for _input in self.input_modules.values():
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/__init__.py
Changed
@@ -6,6 +6,8 @@ import bonnie from base import HandlerBase +from mailboxbase import MailboxHandlerBase +from messagebase import MessageHandlerBase from aclchange import AclChangeHandler from flagsclear import FlagsClearHandler @@ -17,7 +19,6 @@ from mailboxrename import MailboxRenameHandler from mailboxsubscribe import MailboxSubscribeHandler from mailboxunsubscribe import MailboxUnsubscribeHandler -from messagebase import MessageHandlerBase from messageappend import MessageAppendHandler from messagecopy import MessageCopyHandler from messageexpire import MessageExpireHandler @@ -37,6 +38,7 @@ 'FlagsSetHandler', 'LoginHandler', 'LogoutHandler', + 'MailboxHandlerBase', 'MailboxCreateHandler', 'MailboxDeleteHandler', 'MailboxRenameHandler',
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/aclchange.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/aclchange.py
Changed
@@ -21,10 +21,10 @@ Base handler for an event notification of type 'AclChange' """ -from bonnie.worker.handlers import HandlerBase +from bonnie.worker.handlers import MailboxHandlerBase -class AclChangeHandler(HandlerBase): +class AclChangeHandler(MailboxHandlerBase): event = 'AclChange' def __init__(self, *args, **kw): - HandlerBase.__init__(self, *args, **kw) + MailboxHandlerBase.__init__(self, *args, **kw)
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/base.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/base.py
Changed
@@ -12,7 +12,18 @@ self.worker = callback(interests) def run(self, notification): + # resolve user_id from storage if notification.has_key('user') and not notification.has_key('user_id'): - notification['user_id'] = self.worker.storage.resolve_username(notification['user']) + user_data = notification['user_data'] if notification.has_key('user_data') else None + notification['user_id'] = self.worker.storage.resolve_username(notification['user'], user_data, force=notification.has_key('user_data')) + + # if storage has no entry, fetch user record from collector + if notification.has_key('user') and notification['user_id'] is None and not notification.has_key('user_data'): + notification['user_data'] = None # avoid endless loop if GETUSERDATA fails + return (notification, [ b"GETUSERDATA" ]) + + # don't store user data in notification + if notification.has_key('user_data'): + del notification['user_data'] return (notification, [])
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/changelog.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/changelog.py
Changed
@@ -29,8 +29,8 @@ log = bonnie.getLogger('bonnie.worker.changelog') -# timestamp (* 100) at the year 2010 -REVBASE = 94668480000 +# timestamp (* 10) at the year 2014 +REVBASE = 13885344000 class ChangelogHandler(HandlerBase): events = ['MessageAppend','vnd.cmu.MessageMove'] @@ -59,7 +59,7 @@ # assign a revision number based on the current time if object_type is not None: - notification['revision'] = int(round(time.time() * 100 - REVBASE)) + notification['revision'] = int(round(time.time() * 10 - REVBASE)) # TODO: save object type and UUID in separate fields? # These are translated into headers.X-Kolab-Type and headers.Subject by the output module
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/logout.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/logout.py
Changed
@@ -39,7 +39,7 @@ # and suppress separate logging of this event with notification['_suppress_output'] = True if notification.has_key('vnd.cmu.sessionId'): now = datetime.datetime.now(tzutc()) - attempts = 5 + attempts = 4 while attempts > 0: results = self.worker.storage.select( query=[ @@ -51,7 +51,7 @@ fields='user,@timestamp', limit=1 ) - if results['total'] > 0: + if results and results['total'] > 0: login_event = results['hits'][0] try: @@ -77,4 +77,7 @@ attempts -= 1 time.sleep(1) # wait for storage and try again + # push back into the job queue, the corresponding Login event may not yet have been processed. + return (notification, [b"PUSHBACK"]) + return super(LogoutHandler, self).run(notification) \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/bonnie/worker/handlers/mailboxbase.py
Added
@@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later version +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +""" + Base handler for an event notification of type 'MailboxRename' +""" + +from bonnie.worker.handlers import HandlerBase + +class MailboxHandlerBase(HandlerBase): + event = None + + def __init__(self, *args, **kw): + HandlerBase.__init__(self, *args, **kw) + + def run(self, notification): + # call super for some basic notification processing + (notification, jobs) = super(MailboxHandlerBase, self).run(notification) + + # mailbox notifications require metadata + if not notification.has_key('metadata'): + jobs.append(b"GETMETADATA") + return (notification, jobs) + + # extract uniqueid from metadata -> triggers the storage module + if notification['metadata'].has_key('/shared/vendor/cmu/cyrus-imapd/uniqueid'): + notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] + + return (notification, jobs)
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/mailboxcreate.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/mailboxcreate.py
Changed
@@ -22,27 +22,10 @@ """ import bonnie -from bonnie.worker.handlers import HandlerBase +from bonnie.worker.handlers import MailboxHandlerBase -class MailboxCreateHandler(HandlerBase): +class MailboxCreateHandler(MailboxHandlerBase): event = 'MailboxCreate' def __init__(self, *args, **kw): - HandlerBase.__init__(self, *args, **kw) - self.log = bonnie.getLogger('bonnie.worker.' + self.event) - - def run(self, notification): - # call super for some basic notification processing - (notification, jobs) = super(MailboxCreateHandler, self).run(notification) - - # mailbox notifications require metadata - if not notification.has_key('metadata'): - self.log.debug("Adding GETMETADATA job for " + self.event, level=8) - jobs.append(b"GETMETADATA") - return (notification, jobs) - - # extract uniqueid from metadata -> triggers the storage module - if notification['metadata'].has_key('/shared/vendor/cmu/cyrus-imapd/uniqueid'): - notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] - - return (notification, jobs) + MailboxHandlerBase.__init__(self, *args, **kw)
View file
bonnie-0.1.tar.gz/bonnie/worker/handlers/mailboxrename.py -> bonnie-0.1.0.tar.gz/bonnie/worker/handlers/mailboxrename.py
Changed
@@ -21,25 +21,10 @@ Base handler for an event notification of type 'MailboxRename' """ -from bonnie.worker.handlers import HandlerBase +from bonnie.worker.handlers import MailboxHandlerBase -class MailboxRenameHandler(HandlerBase): +class MailboxRenameHandler(MailboxHandlerBase): event = 'MailboxRename' def __init__(self, *args, **kw): - HandlerBase.__init__(self, *args, **kw) - - def run(self, notification): - # call super for some basic notification processing - (notification, jobs) = super(MailboxRenameHandler, self).run(notification) - - # mailbox notifications require metadata - if not notification.has_key('metadata'): - jobs.append(b"GETMETADATA") - return (notification, jobs) - - # extract uniqueid from metadata -> triggers the storage module - if notification['metadata'].has_key('/shared/vendor/cmu/cyrus-imapd/uniqueid'): - notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] - - return (notification, jobs) + MailboxHandlerBase.__init__(self, *args, **kw)
View file
bonnie-0.1.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.1.0.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -39,7 +39,25 @@ running = False def __init__(self, *args, **kw): + self.state = b"READY" + self.job_id = None + self.lastping = 0 + self.report_timestamp = 0 + + def name(self): + return 'zmq_input' + + def register(self, *args, **kw): + pass + + def report_state(self): + log.debug("[%s] reporting state: %s" % (self.identity, self.state), level=8) + self.controller.send_multipart([b"STATE", self.state]) + self.report_timestamp = time.time() + + def run(self, callback=None, report=None): self.identity = u"Worker-%s-%d" % (socket.getfqdn(),os.getpid()) + log.info("[%s] starting", self.identity) self.context = zmq.Context() @@ -65,35 +83,23 @@ self.poller.register(self.controller, zmq.POLLIN) self.poller.register(self.worker, zmq.POLLIN) - self.state = b"READY" - self.job_id = None - self.report_timestamp = time.time() - - def name(self): - return 'zmq_input' - - def register(self, *args, **kw): - pass - - def report_state(self): - print "reporting state", self.state - self.controller.send_multipart([b"STATE", self.state]) - self.report_timestamp = time.time() - - def run(self, callback=None): - log.info("[%s] starting", self.identity) - self.running = True + self.lastping = time.time() self.report_state() while self.running: try: sockets = dict(self.poller.poll(1000)) + except KeyboardInterrupt, e: + log.info("zmq.Poller KeyboardInterrupt") + break except Exception, e: log.error("zmq.Poller error: %r", e) sockets = dict() - if self.report_timestamp < (time.time() - 60): + now = time.time() + + if self.report_timestamp < (now - 60): self.report_state() if self.controller in sockets: @@ -118,21 +124,29 @@ log.debug("[%s] Worker message: %r" % (self.identity, _message), level=9) if _message[0] == "JOB": + _job_uuid = _message[1] + # TODO: Sanity checking #if _message[1] == self.job_id: if not callback == None: - (status, jobs) = callback(_message[2]) + (notification, jobs) = callback(_message[2]) + else: + jobs = [] if len(jobs) == 0: - self.controller.send_multipart([b"DONE", _message[1]]) + self.controller.send_multipart([b"DONE", _job_uuid]) else: log.debug("[%s] Has jobs: %r" % (self.identity, jobs), level=8) for job in jobs: - self.controller.send_multipart([job, _message[1]]) + self.controller.send_multipart([job, _job_uuid]) self.set_state_ready() + if report is not None and self.lastping < (now - 60): + report() + self.lastping = now + log.info("[%s] shutting down", self.identity) self.worker.close()
View file
bonnie-0.1.tar.gz/bonnie/worker/outputs/elasticsearch_output.py -> bonnie-0.1.0.tar.gz/bonnie/worker/outputs/elasticsearch_output.py
Changed
@@ -52,6 +52,8 @@ 'diskUsed': 'disk_used', 'vnd.cmu.oldUidset': 'olduidset', 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', } log = { '@version': bonnie.API_VERSION } for key,val in notification.iteritems(): @@ -85,6 +87,11 @@ notification['message'] = None if notification.has_key('messageContent') and notification['messageContent'].has_key(uid): notification['message'] = notification['messageContent'][uid] + # no need for bodystructure if we have the real message content + notification.pop('bodyStructure', None) + + # remove vnd.cmu.envelope if we have headers + notification.pop('vnd.cmu.envelope', None) self.es.create( index=index,
View file
bonnie-0.1.tar.gz/bonnie/worker/storage/__init__.py -> bonnie-0.1.0.tar.gz/bonnie/worker/storage/__init__.py
Changed
@@ -1,6 +1,8 @@ +from caching import CachedDict from elasticsearch_storage import ElasticSearchStorage __all__ = [ + 'CachedDict', 'ElasticSearchStorage' ]
View file
bonnie-0.1.0.tar.gz/bonnie/worker/storage/caching.py
Added
@@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli <bruederli at kolabsys.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3 or, at your option, any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +# USA. +# + +import time + +class CachedDict(object): + """ + dict-like class which drops items after the given TTL + """ + def __init__(self, ttl=60): + # TODO: use memcache for distributed memory-based caching + self.ttl = ttl + self.data = {} + + def remove(self, key): + self.data.remove(key) + + def pop(self, key, default=None): + item = self.data.pop(key) + return item[0] if item is not None else default + + def update(self, other): + expire = int(time.time()) + self.ttl + self.data.update(dict((k, (v, expire)) for k, v in other.items())) + + def keys(self): + now = int(time.time()) + return [k for k, v in self.data.items() if v[1] > now] + + def values(self): + now = int(time.time()) + return [v[0] for v in self.data.values() if v[1] > now] + + def items(self): + now = int(time.time()) + return dict((k, v[0]) for k, v in self.data.items() if v[1] > now).items() + + def iteritems(self): + return self.items().iteritems() + + def has_key(self, key): + return self.data.has_key(key) and self.data[key][1] > int(time.time()) + + def expunge(self): + now = int(time.time()) + self.data = dict((k, v) for k, v in self.data.items() if v[1] > now) + + def __getitem__(self, key): + return self.data[key][0] + + def __setitem__(self, key, value): + self.data[key] = (value, int(time.time()) + self.ttl) + + def __contains__(self, key): + return self.has_key(key) + + def __len__(self): + return len(self.keys()) + + def __iter__(self): + return self.items().__iter__()
View file
bonnie-0.1.tar.gz/bonnie/worker/storage/elasticsearch_storage.py -> bonnie-0.1.0.tar.gz/bonnie/worker/storage/elasticsearch_storage.py
Changed
@@ -24,11 +24,14 @@ import json import urllib import hashlib +import random +import time import datetime import elasticsearch from dateutil.tz import tzutc from bonnie.utils import parse_imap_uri +from bonnie.worker.storage import CachedDict import bonnie conf = bonnie.getConf() @@ -42,6 +45,8 @@ default_doctype = 'object' folders_index = 'objects' folders_doctype = 'folder' + users_index = 'objects' + users_doctype = 'user' def __init__(self, *args, **kw): elasticsearch_output_address = conf.get('worker', 'elasticsearch_storage_address') @@ -53,6 +58,10 @@ host=elasticsearch_output_address ) + # use dicts with automatic expiration for caching user/folder lookups + self.user_id_cache = CachedDict(300) + self.folder_id_cache = CachedDict(120) + def name(self): return 'elasticsearch_storage' @@ -186,7 +195,7 @@ log.debug("ES select result for %r: %r" % (args['q'] or args['body'], res), level=8) except elasticsearch.exceptions.NotFoundError, e: - log.debug("ES entry not found for key %s: %r", key, e) + log.debug("ES entry not found for %r: %r", args['q'] or args['body'], e) res = None except Exception, e: @@ -241,20 +250,55 @@ result['_id'] = res['_id'] result['_index'] = res['_index'] result['_doctype'] = res['_type'] - result['_score'] = res['_score'] + + if res.has_key('_score'): + result['_score'] = res['_score'] return result - def resolve_username(self, user): + def resolve_username(self, user, user_data=None, force=False): """ Resovle the given username to the corresponding nsuniqueid from LDAP """ if not '@' in user: return user - # TODO: resolve with storage data - # return md5 sum of the username to make usernames work as fields/keys in elasticsearch - return hashlib.md5(user).hexdigest() + # return id cached in memory + if self.user_id_cache.has_key(user): + return self.user_id_cache[user] + + user_id = None + + # find existing entry in our storage backend + result = self.select( + [ ('user', '=', user) ], + index=self.users_index, + doctype=self.users_doctype, + sortby='@timestamp:desc', + limit=1 + ) + + if result and result['total'] > 0: + user_id = result['hits'][0]['_id'] + + elif user_data and user_data.has_key('id'): + # user data (from LDAP) is provided + user_id = user_data['id'] + + # insert a user record into our database + del user_data['id'] + user_data['user'] = user + user_data['@timestamp'] = datetime.datetime.now(tzutc()).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + self.set(user_id, user_data, index=self.users_index, doctype=self.users_doctype) + + elif force: + user_id = hashlib.md5(user).hexdigest() + + # cache this for 5 minutes + if user_id is not None: + self.user_id_cache[user] = user_id + + return user_id def notificaton2folder(self, notification, attrib='uri'): @@ -286,7 +330,7 @@ '@timestamp': datetime.datetime.now(tzutc()).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), 'uniqueid': notification['folder_uniqueid'], 'metadata': notification['metadata'], - 'acl': dict((self.resolve_username(k),v) for k,v in notification['acl'].iteritems()), + 'acl': dict((self.resolve_username(k, force=True),v) for k,v in notification['acl'].iteritems()), 'type': notification['metadata']['/shared/vendor/kolab/folder-type'] if notification['metadata'].has_key('/shared/vendor/kolab/folder-type') else 'mail', 'owner': uri['user'] + '@' + uri['domain'] if uri['user'] is not None else 'nobody', 'server': uri['host'], @@ -318,7 +362,15 @@ if not notification.has_key(attrib) or notification.has_key('folder_id'): return (notification, []) - log.debug("Resolve folder for %r = %r" % (attrib, notification[attrib]), level=8) + now = int(time.time()) + base_uri = re.sub(';.+$', '', notification[attrib]) + + log.debug("Resolve folder for %r = %r" % (attrib, base_uri), level=8) + + # return id cached in memory + if not notification.has_key('metadata') and self.folder_id_cache.has_key(base_uri): + notification['folder_id'] = self.folder_id_cache[base_uri] + return (notification, []) # mailbox resolving requires metadata if not notification.has_key('metadata'): @@ -382,6 +434,15 @@ # add reference to internal folder_id if folder is not None: + self.folder_id_cache[base_uri] = folder['id'] notification['folder_id'] = folder['id'] return (notification, []) + + def report(self): + """ + Callback from the worker main loop to trigger periodic jobs + """ + # clean-up in-memory caches from time to time + self.user_id_cache.expunge() + self.folder_id_cache.expunge()
View file
bonnie-0.1.tar.gz/broker.py -> bonnie-0.1.0.tar.gz/broker.py
Changed
@@ -1,9 +1,7 @@ #!/usr/bin/python -import signal from bonnie.broker import BonnieBroker if __name__ == "__main__": broker = BonnieBroker() - signal.signal(signal.SIGTERM, broker.terminate) - broker.run() + broker.start()
View file
bonnie-0.1.tar.gz/collector.py -> bonnie-0.1.0.tar.gz/collector.py
Changed
@@ -1,9 +1,7 @@ #!/usr/bin/python -import signal from bonnie.collector import BonnieCollector if __name__ == "__main__": collector = BonnieCollector() - signal.signal(signal.SIGTERM, collector.terminate) - collector.run() + collector.start()
View file
bonnie-0.1.tar.gz/conf/bonnie.conf -> bonnie-0.1.0.tar.gz/conf/bonnie.conf
Changed
@@ -13,12 +13,15 @@ output_modules = zmq_output zmq_broker_address = tcp://localhost:5570 blacklist_users = cyrus-admin -blacklist_events = Login,Logout +blacklist_events = Login,Logout,AclChange +input_exclude_events = [worker] +num_childs = 0 input_modules = zmq_input storage_modules = elasticsearch_storage output_modules = elasticsearch_output +output_exclude_events = MessageExpunge zmq_controller_address = tcp://localhost:5572 zmq_worker_router_address = tcp://localhost:5573 elasticsearch_output_address = localhost
View file
bonnie-0.1.0.tar.gz/tests/README.md
Added
@@ -0,0 +1,27 @@ +Running Bonnie Tests +==================== + +The test scripts are based on the `twisted trial` package which can be +installed from the `python-twisted-core` RPM package. + +Run the tests from the bonnie root directory with the following command: + +``` +$ export PYTHONPATH=.:/usr/lib/python2.6/site-packages +$ trial tests.{unit|functional}.<test-file-name>.<testClassName> +``` + +So for example + +``` +$ trial tests.unit.test-001-utils.TestBonnieUtils +$ trial tests.functional.test-001-login.TestBonnieLogin +``` + +Both the unit tests as well as the functional tests make the following +assumptions regarding the Kolab environment on the host they're run: + + * Kolab standard single-host setup running the domain 'example.org' + * A Kolab user named John Doe <john.doe@example.org> exists + * The Elasticsearch service is running + * No Bonnie processes are running or connected to ZMQ
View file
bonnie-0.1.0.tar.gz/tests/functional
Added
+(directory)
View file
bonnie-0.1.0.tar.gz/tests/functional/__init__.py
Added
@@ -0,0 +1,9 @@ +import sys + +sys.path = [ '.', '../..' ] + sys.path + +from testbase import TestBonnieFunctional +from test_001_login import TestBonnieLogin +from test_002_mailboxes import TestBonnieMailboxes +from test_003_messages import TestBonnieMessageEvents +from test_004_changelog import TestBonnieChangelog
View file
bonnie-0.1.0.tar.gz/tests/functional/start.sh
Added
@@ -0,0 +1,19 @@ +#!/bin/bash + +BASEDIR=$1 +: ${BASEDIR:="."} + +if [ -f /tmp/bonnie.pid ]; then + echo "Bonnie already running (/tmp/bonnie.pid exists)" + exit +fi + +killall -q broker.py worker.py collector.py + +cd $BASEDIR + +./broker.py -d 8 2>/dev/null > /dev/null & +./worker.py -d 8 2>/dev/null > /dev/null & +./collector.py -d 8 2>/dev/null > /dev/null & + +touch /tmp/bonnie.pid
View file
bonnie-0.1.0.tar.gz/tests/functional/stop.sh
Added
@@ -0,0 +1,4 @@ +#!/bin/bash + +killall -q broker.py worker.py collector.py +rm -f /tmp/bonnie.pid \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/functional/test_001_login.py
Added
@@ -0,0 +1,57 @@ +import json +import time + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer + +import bonnie +conf = bonnie.getConf() + +class TestBonnieLogin(TestBonnieFunctional): + + def test_001_login(self): + login = { + 'event': 'Login', + 'user': 'john.doe@example.org', + 'vnd.cmu.sessionId': 'kolab-sess-test-12345', + 'clientIP': '::1', + 'serverDomain': 'example.org', + } + + dealer = BonnieDealer() + dealer.run(json.dumps(login)) + + events = self.query_log([('event','=','Login')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event['session_id'], login['vnd.cmu.sessionId']) + self.assertEqual(event['@version'], bonnie.API_VERSION) + + del dealer + time.sleep(1) + + logout = { + 'event': 'Logout', + 'user': 'john.doe@example.org', + 'vnd.cmu.sessionId': 'kolab-sess-test-12345', + 'clientIP': '::1' + } + + dealer = BonnieDealer() + dealer.run(json.dumps(logout)) + + events = self.query_log([('event','=','Login'), ('logout_time','=','*')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('logout_time')) + self.assertTrue(event.has_key('duration')) + + # check objects/users entry + user = self.storage_get(event['user_id'], index='objects', doctype='user') + + self.assertIsInstance(user, dict) + self.assertEqual(user['user'], login['user']) + self.assertEqual(user['cn'], 'John Doe') \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/functional/test_002_mailboxes.py
Added
@@ -0,0 +1,49 @@ +import json + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer + +import bonnie +conf = bonnie.getConf() + +class TestBonnieMailboxes(TestBonnieFunctional): + + def test_mailboxcreate(self): + dealer = BonnieDealer() + + notification = { + 'event': 'MailboxCreate', + 'user': 'john.doe@example.org', + 'uri': 'imap://john.doe@example.org@kolab.example.org/Calendar;UIDVALIDITY=12345' + } + + dealer.run(json.dumps(notification)) + + events = self.query_log([('event','=','MailboxCreate')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('folder_id')) + self.assertTrue(event.has_key('folder_uniqueid')) + self.assertTrue(event.has_key('user_id')) + + # check objects/folder entry + folder = self.storage_get(event['folder_id'], index='objects', doctype='folder') + + self.assertIsInstance(folder, dict) + self.assertEqual(folder['uniqueid'], event['folder_uniqueid']) + self.assertEqual(folder['name'], 'Calendar') + self.assertEqual(folder['type'], 'event') + self.assertEqual(folder['owner'], 'john.doe@example.org') + + self.assertIsInstance(folder['metadata'], dict) + self.assertIsInstance(folder['acl'], dict) + self.assertTrue(folder['acl'].has_key(event['user_id'])) + self.assertTrue(folder['acl'][event['user_id']].startswith('lrswi')) + + # check objects/user entry + user = self.storage_get(event['user_id'], index='objects', doctype='user') + + self.assertIsInstance(user, dict) + self.assertEqual(user['user'], notification['user']) + self.assertEqual(user['cn'], 'John Doe')
View file
bonnie-0.1.0.tar.gz/tests/functional/test_003_messages.py
Added
@@ -0,0 +1,247 @@ +import json + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer +from email import message_from_string + +import bonnie +conf = bonnie.getConf() + +class TestBonnieMessageEvents(TestBonnieFunctional): + + def test_001_messagenew(self): + # we assume "messageHeaders" and "messageContent" payload already being collected + messagenew = { + "event": "MessageNew", + "messageSize": 976, + "messages": 6, + "modseq": 20, + "pid": 2340, + "service": "lmtpunix", + "timestamp": "2014-10-20T13:34:14.966+02:00", + "uidnext": 7, + "uidset": "6", + "uri": "imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487714/;UID=6", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "<a8486f5db6ec207de9b9f069850546ee@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-2340-1413804854-1", + "vnd.cmu.unseenMessages": 3, + "messageHeaders": { + "6": { + "Content-Transfer-Encoding": "7bit", + "Content-Type": "text/plain", + "Date": "2014-10-20T11:32:41Z", + "From": [ "Br\u00fcederli, Thomas <john.doe@example.org>" ], + "MIME-Version": "1.0", + "Message-ID": "<a8486f5db6ec207de9b9f069850546ee@example.org>", + "Received": "from kolab.example.org ([unix socket])\r\n\t by kolab.example.org (Cyrus git2.5+0-Kolab-2.5-67.el6.kolab_3.4) with LMTPA;\r\n\t Mon, 20 Oct 2014 13:34:14 +0200", + "Return-Path": "<john.doe@example.org>", + "Subject": "MessageNew event test", + "To": [ "Doe, John <john.doe@example.org>" ], + "X-Sender": "john.doe@example.org", + "X-Sieve": "CMU Sieve 2.4", + "X-Spam-Flag": "NO", + "X-Spam-Level": "", + "X-Spam-Score": "-0.002", + "X-Spam-Status": "No, score=-0.002 tagged_above=-10 required=6.2\r\n\ttests=[NO_RECEIVED=-0.001, NO_RELAYS=-0.001] autolearn=ham", + "X-Virus-Scanned": "amavisd-new at example.org" + } + }, + "messageContent": { + "6": "Return-Path: <john.doe@example.org>\r\nReceived: from kolab.example.org ([unix socket])\r\n\t by kolab.example.org (Cyrus git2.5+0-Kolab-2.5-67.el6.kolab_3.4) with LMTPA;\r\n\t Mon, 20 Oct 2014 13:34:14 +0200\r\nX-Sieve: CMU Sieve 2.4\r\nX-Virus-Scanned: amavisd-new at example.org\r\nX-Spam-Flag: NO\r\nX-Spam-Score: -0.002\r\nX-Spam-Level: \r\nX-Spam-Status: No, score=-0.002 tagged_above=-10 required=6.2\r\n\ttests=[NO_RECEIVED=-0.001, NO_RELAYS=-0.001] autolearn=ham\r\nMIME-Version: 1.0\r\nContent-Type: text/plain; charset=US-ASCII;\r\n format=flowed\r\nContent-Transfer-Encoding: 7bit\r\nDate: Mon, 20 Oct 2014 13:32:41 +0200\r\nFrom: =?UTF-8?Q?Br=C3=BCederli=2C_Thomas?= <john.doe@example.org>\r\nTo: \"Doe, John\" <john.doe@example.org>\r\nSubject: MessageNew event test\r\nMessage-ID: <a8486f5db6ec207de9b9f069850546ee@example.org>\r\nX-Sender: john.doe@example.org\r\n\r\nThis message should trigger the MessageNew event for john.doe...\r\n...and MessageAppend to /Sent for the sender.\r\n" + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messagenew)) + + events = self.query_log([('event','=','MessageNew')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertEqual(event['uidset'], '6') + self.assertEqual(event['service'], 'lmtpunix') + self.assertEqual(event['session_id'], messagenew['vnd.cmu.sessionId']) + self.assertEqual(event['@timestamp'], '2014-10-20T11:34:14.966000Z') + self.assertEqual(len(event['message_id']), 1) + + self.assertIsInstance(event['headers'], dict) + self.assertTrue(event['headers']['Message-ID'] in event['message_id']) + self.assertTrue(event['headers']['Subject'] in messagenew['messageHeaders']['6']['Subject']) + + # check if message payload is parsable + message = message_from_string(event['message'].encode('utf8','replace')) + self.assertEqual(message['Subject'], event['headers']['Subject']) + + # check objects/folder entry + self.assertTrue(event.has_key('folder_id')) + folder = self.storage_get(event['folder_id'], index='objects', doctype='folder') + + self.assertIsInstance(folder, dict) + self.assertEqual(folder['uniqueid'], event['folder_uniqueid']) + self.assertEqual(folder['name'], 'INBOX') + self.assertEqual(folder['owner'], 'john.doe@example.org') + + # check objects/user entry + self.assertTrue(event.has_key('user_id')) + user = self.storage_get(event['user_id'], index='objects', doctype='user') + + self.assertIsInstance(user, dict) + self.assertEqual(user['user'], messagenew['user']) + + + def test_002_messageappend(self): + messageappend = { + "event": "MessageAppend", + "flagNames": "\seen", + "messageSize": 555, + "messages": 6, + "modseq": 12, + "pid": 2222, + "service": "imap", + "timestamp": "2014-10-20T13:33:27.062+02:00", + "uidnext": 9, + "uidset": "8", + "uri": "imap://john.doe@example.org@kolab.example.org/Sent;UIDVALIDITY=1411487701/;UID=8", + "user": "john.doe@example.org", + "vnd.cmu.envelope": "(\"Mon, 20 Oct 2014 13:33:26 +0200\" \"MessageNew event test\" ((\"=?UTF-8?Q?Br=C3=BCederli=2C_Thomas?=\" NIL \"john.doe\" \"example.org\")) ((\"=?UTF-8?Q?Br=C3=BCederli=2C_Thomas?=\" NIL \"john.doe\" \"example.org\")) ((\"=?UTF-8?Q?Br=C3=BCederli=2C_Thomas?=\" NIL \"john.doe\" \"example.org\")) ((\"Doe, John\" NIL \"john.doe\" \"example.org\")) NIL NIL NIL \"<20f46a82b8584c1518fbeac7bad5f05b@example.org>\")", + "vnd.cmu.midset": [ "<20f46a82b8584c1518fbeac7bad5f05b@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-2222-1413804806-1", + "vnd.cmu.unseenMessages": 0, + "folder_id": "76b8cd8f85bb435d17fe28d576db64a7", + "folder_uniqueid": "f356a1a9-f897-454f-9ada-5646fe4c4117", + "messageHeaders": { + "8": { + "Content-Transfer-Encoding": "7bit", + "Content-Type": "text/plain", + "Date": "2014-10-20T11:31:11Z", + "From": [ "Br\u00fcederli, Thomas <john.doe@example.org>" ], + "MIME-Version": "1.0", + "Message-ID": "<20f46a82b8584c1518fbeac7bad5f05b@example.org>", + "Subject": "MessageNew event test", + "To": [ "Doe, John <john.doe@example.org>" ], + "User-Agent": "Kolab 3.1/Roundcube 1.1-git", + "X-Sender": "john.doe@example.org" + } + }, + "messageContent": { + "8": "MIME-Version: 1.0\r\nContent-Type: text/plain; charset=US-ASCII;\r\n format=flowed\r\nContent-Transfer-Encoding: 7bit\r\nDate: Mon, 20 Oct 2014 13:31:11 +0200\r\nFrom: =?UTF-8?Q?Br=C3=BCederli=2C_Thomas?= <john.doe@example.org>\r\nTo: \"Doe, John\" <john.doe@example.org>\r\nSubject: MessageNew event test\r\nMessage-ID: <44ef83beb911cb9cd82e8dc7a29467a9@example.org>\r\nX-Sender: john.doe@example.org\r\nUser-Agent: Kolab 3.1/Roundcube 1.1-git\r\n\r\nThis message should trigger the MessageNew event for john.doe...\r\n...and MessageAppend to /Sent for the sender." + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messageappend)) + + events = self.query_log([('event','=','MessageAppend')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertTrue(event.has_key('user_id')) + self.assertIsInstance(event['headers'], dict) + self.assertEqual(len(event['headers']['To']), 1) + self.assertEqual(event['headers']['Content-Type'], 'text/plain') + self.assertTrue(event['headers']['Message-ID'] in event['message_id']) + + + def test_003_messageread(self): + messageread = { + "event": "MessageRead", + "messages": 3, + "modseq": 64, + "pid": 802, + "service": "imap", + "timestamp": "2014-10-20T13:04:09.077+02:00", + "uidnext": 7, + "uidset": "4", + "uri": "imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487701", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "<e0ffe5d5a1569a35c1b62791390a48d5@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-802-1413803049-1", + "vnd.cmu.unseenMessages": 0 + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messageread)) + + # query by message-ID + events = self.query_log([('message_id','=','<e0ffe5d5a1569a35c1b62791390a48d5@example.org>')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertEqual(event['uidset'], '4') + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event.has_key('folder_id')) + + def test_004_flagsclear(self): + flagsclear = { + "event": "FlagsClear", + "flagNames": "\Seen", + "messages": 3, + "modseq": 47, + "pid": 489, + "service": "imap", + "timestamp": "2014-10-20T13:03:31.348+02:00", + "uidnext": 7, + "uidset": "4", + "uri": "imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487701", + "user": "john.doe@example.org", + "vnd.cmu.midset": [ "<e0ffe5d5a1569a35c1b62791390a48d5@example.org>" ], + "vnd.cmu.sessionId": "kolab.example.org-489-1413803011-1", + "vnd.cmu.unseenMessages": 1 + } + + dealer = BonnieDealer() + dealer.run(json.dumps(flagsclear)) + + # query by message-ID + events = self.query_log([('message_id','=','<e0ffe5d5a1569a35c1b62791390a48d5@example.org>')]) + self.assertEqual(len(events), 1) +
View file
bonnie-0.1.0.tar.gz/tests/functional/test_004_changelog.py
Added
@@ -0,0 +1,61 @@ +import json + +from . import TestBonnieFunctional +from bonnie.dealer import BonnieDealer + +import bonnie +conf = bonnie.getConf() + +class TestBonnieChangelog(TestBonnieFunctional): + + def test_001_changelog(self): + # we assume "messageHeaders" and "messageContent" payload already being collected + messageappend = { + "event": "MessageAppend", + "messageSize": 2932, + "messages": 2, + "modseq": 107, + "pid": 1248, + "service": "imap", + "timestamp": "2014-10-20T13:10:59.516+02:00", + "uidnext": 38, + "uidset": "37", + "uri": "imap://john.doe@example.org@kolab.example.org/Calendar;UIDVALIDITY=1411487702/;UID=37", + "user": "john.doe@example.org", + "vnd.cmu.envelope": "(\"Mon, 20 Oct 2014 13:10:59 +0200\" \"253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0\" ((NIL NIL \"john.doe\" \"example.org\")) ((NIL NIL \"john.doe\" \"example.org\")) ((NIL NIL \"john.doe\" \"example.org\")) ((NIL NIL \"john.doe\" \"example.org\")) NIL NIL NIL NIL)", + "vnd.cmu.midset": [ "NIL" ], + "vnd.cmu.sessionId": "kolab.example.org-1248-1413803459-1", + "vnd.cmu.unseenMessages": 2, + "messageHeaders": { + "37": { + "Content-Type": "multipart/mixed", + "Date": "2014-10-20T11:23:40Z", + "From": [ " <thomas.bruederli@example.org>" ], + "MIME-Version": "1.0", + "Subject": "253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0", + "To": [ " <thomas.bruederli@example.org>" ], + "User-Agent": "Kolab 3.1/Roundcube 1.1-git", + "X-Kolab-Mime-Version": "3.0", + "X-Kolab-Type": "application/x-vnd.kolab.event" + } + }, + "messageContent": { + "37": "MIME-Version: 1.0\r\nContent-Type: multipart/mixed;\r\n boundary=\"=_46bc539ab7a6c0a8bd4d2ddbf553df00\"\r\nFrom: thomas.bruederli@example.org\r\nTo: thomas.bruederli@example.org\r\nDate: Mon, 20 Oct 2014 13:23:40 +0200\r\nX-Kolab-Type: application/x-vnd.kolab.event\r\nX-Kolab-Mime-Version: 3.0\r\nSubject: 253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0\r\nUser-Agent: Kolab 3.1/Roundcube 1.1-git\r\n\r\n--=_46bc539ab7a6c0a8bd4d2ddbf553df00\r\nContent-Transfer-Encoding: quoted-printable\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nThis is a Kolab Groupware object....\r\n\r\n--=_46bc539ab7a6c0a8bd4d2ddbf553df00\r\nContent-Transfer-Encoding: 8bit\r\nContent-Type: application/calendar+xml; charset=UTF-8;\r\n name=kolab.xml\r\nContent-Disposition: attachment;\r\n filename=kolab.xml;\r\n size=1954\r\n\r\n<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\" ?>\r\n<icalendar xmlns=\"urn:ietf:params:xml:ns:icalendar-2.0\">\r\n\r\n <vcalendar>\r\n <properties>\r\n <prodid>\r\n <text>Roundcube-libkolab-1.1 Libkolabxml-1.1</text>\r\n </prodid>\r\n <version>\r\n <text>2.0</text>\r\n </version>\r\n <x-kolab-version>\r\n <text>3.1.0</text>\r\n </x-kolab-version>\r\n </properties>\r\n <components>\r\n <vevent>\r\n <properties>\r\n <uid>\r\n <text>253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0</text>\r\n </uid>\r\n <created>\r\n <date-time>2014-09-23T23:31:23Z</date-time>\r\n </created>\r\n <dtstamp>\r\n <date-time>2014-10-20T11:23:40Z</date-time>\r\n </dtstamp>\r\n <sequence>\r\n <integer>28</integer>\r\n </sequence>\r\n <class>\r\n <text>PUBLIC</text>\r\n </class>\r\n <dtstart>\r\n <parameters>\r\n <tzid>\r\n <text>/kolab.org/Europe/Berlin</text>\r\n </tzid>\r\n </parameters>\r\n <date-time>2014-10-20T14:00:00</date-time>\r\n </dtstart>\r\n <dtend>\r\n <parameters>\r\n <tzid>\r\n <text>/kolab.org/Europe/Berlin</text>\r\n </tzid>\r\n </parameters>\r\n <date-time>2014-10-20T16:00:00</date-time>\r\n </dtend>\r\n <summary>\r\n <text>Today</text>\r\n </summary>\r\n <description>\r\n <text>(new revision)</text>\r\n </description>\r\n <organizer>\r\n <parameters>\r\n <cn>\r\n <text>Br\u00fcederli, Thomas</text>\r\n </cn>\r\n </parameters>\r\n <cal-address>mailto:%3Cthomas.bruederli%40example.org%3E</cal-address>\r\n </organizer>\r\n </properties>\r\n </vevent>\r\n </components>\r\n </vcalendar>\r\n\r\n</icalendar>\r\n\r\n--=_46bc539ab7a6c0a8bd4d2ddbf553df00--\r\n" + } + } + + dealer = BonnieDealer() + dealer.run(json.dumps(messageappend)) + + # query by subject (i.e. object UUID) + events = self.query_log([('headers.Subject','=','253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0')]) + self.assertEqual(len(events), 1) + + event = events[0] + self.assertIsInstance(event['headers'], dict) + self.assertEqual(event['headers']['X-Kolab-Mime-Version'], '3.0') + self.assertEqual(event['headers']['X-Kolab-Type'], 'application/x-vnd.kolab.event') + + self.assertTrue(event.has_key('user_id')) + self.assertTrue(event.has_key('revision')) +
View file
bonnie-0.1.0.tar.gz/tests/functional/testbase.py
Added
@@ -0,0 +1,56 @@ +import os +import json +import time + +from twisted.trial import unittest +from subprocess import call +from bonnie.dealer import BonnieDealer +from bonnie.worker.storage import ElasticSearchStorage + +pwd = os.path.dirname(__file__) +basedir = os.path.join(pwd, '..', '..') + +import bonnie +conf = bonnie.getConf() +conf.finalize_conf() + +class TestBonnieFunctional(unittest.TestCase): + attempts = 12 + + def setUp(self): + self.storage = ElasticSearchStorage() + self.storage.es.indices.delete(index='logstash-*', ignore=[400, 404]) + self.storage.es.indices.delete(index='objects', ignore=[400, 404]) + + call([os.path.join(pwd, 'start.sh'), basedir]) + time.sleep(1) + + def tearDown(self): + call([os.path.join(pwd, 'stop.sh')]) + time.sleep(2) + + def query_log(self, query): + attempts = self.attempts + while attempts > 0: + attempts -= 1 + res = self.storage.select(query, index='logstash-*', doctype='logs', sortby='@timestamp:desc') + if res and res['total'] > 0: + return res['hits'] + + time.sleep(1) + # print "query retry", attempts + + return None + + def storage_get(self, key, index, doctype): + attempts = self.attempts + while attempts > 0: + attempts -= 1 + res = self.storage.get(key, index=index, doctype=doctype) + if res is not None: + return res + time.sleep(1) + # print "get retry", attempts + + return None +
View file
bonnie-0.1.tar.gz/tests/unit/__init__.py -> bonnie-0.1.0.tar.gz/tests/unit/__init__.py
Changed
@@ -1,3 +1,11 @@ import sys -sys.path = [ '.', '..' ] + sys.path +sys.path = [ '.', '../..' ] + sys.path + +from test_001_utils import TestBonnieUtils +from test_002_collector import TestBonnieCollector +from test_003_dealer import TestBonnieDealer +from test_004_worker import TestBonnieWorker +from test_005_persistence import TestBonniePersistence +from test_006_caching import TestBonnieCaching +
View file
bonnie-0.1.0.tar.gz/tests/unit/test_001_utils.py
Added
@@ -0,0 +1,78 @@ +import os +import json +from bonnie.utils import expand_uidset +from bonnie.utils import parse_imap_uri +from bonnie.utils import mail_message2dict +from bonnie.utils import decode_message_headers +from bonnie.utils import imap_folder_path +from bonnie.utils import imap_mailbox_fs_path +from email import message_from_string +from twisted.trial import unittest + + +class TestBonnieUtils(unittest.TestCase): + + def setUp(self): + pass + + def _get_resource(self, filename): + pwd = os.path.dirname(__file__) + filepath = os.path.join(pwd, 'resources', filename) + fp = open(filepath, 'r') + data = fp.read() + fp.close() + return data + + def test_expand_uidset(self): + self.assertEqual(expand_uidset('3'), ['3']) + self.assertEqual(expand_uidset('3,5'), ['3','5']) + self.assertEqual(expand_uidset('3:5'), ['3','4','5']) + + def test_parse_imap_uri(self): + url = parse_imap_uri("imap://john.doe@example.org@kolab.example.org/Calendar/Personal%20Calendar;UIDVALIDITY=1411487702/;UID=3") + self.assertEqual(url['host'], 'kolab.example.org') + self.assertEqual(url['user'], 'john.doe') + self.assertEqual(url['domain'], 'example.org') + self.assertEqual(url['path'], 'Calendar/Personal Calendar') + self.assertEqual(url['UID'], '3') + + def test_decode_message_headers(self): + message = message_from_string(self._get_resource('3.')) + headers = decode_message_headers(message) + + self.assertEqual(len(headers['From']), 1) + self.assertEqual(len(headers['To']), 2) + self.assertEqual(headers['To'][0], u'Br\u00fcderli, Thomas <thomas.bruederli@example.org>') + self.assertEqual(headers['Content-Type'], 'text/plain') + self.assertEqual(headers['Date'], '2014-09-24T04:52:00Z') + self.assertEqual(headers['Subject'], 'Test') + + def test_mail_message2dict(self): + message = mail_message2dict(self._get_resource('event_mime_message.eml')) + + self.assertIsInstance(message, dict) + self.assertEqual(message['Subject'], '253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0') + self.assertEqual(message['X-Kolab-Type'], 'application/x-vnd.kolab.event') + self.assertEqual(len(message['@parts']), 2) + + xmlpart = message['@parts'][1] + self.assertEqual(xmlpart['Content-Type'], 'application/calendar+xml; charset=UTF-8; name=kolab.xml') + + message2 = mail_message2dict("FOO") + self.assertIsInstance(message2, dict) + self.assertEqual(message2['@body'], "FOO") + + def test_imap_folder_path(self): + p1 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/Calendar;UID=3") + self.assertEqual(p1, "user/john.doe/Calendar@example.org") + + p2 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487702") + self.assertEqual(p2, "user/john.doe@example.org") + + # test shared folders (but how are they referred in the uri?) + p3 = imap_folder_path("imap://kolab.example.org/Shared%20Folders/shared/Project-X%40example.org;UIDVALIDITY=1412093781/;UID=2") + self.assertEqual(p3, "shared/Project-X@example.org") + + def test_imap_mailbox_fs_path(self): + path = imap_mailbox_fs_path("imap://john.doe@example.org@kolab.example.org/Calendar/Personal%20Calendar;UID=3") + self.assertEqual(path, "/var/spool/imap/domain/e/example.org/j/user/john^doe/Calendar/Personal Calendar")
View file
bonnie-0.1.0.tar.gz/tests/unit/test_002_collector.py
Changed
(renamed from tests/unit/test-002-collector.py)
View file
bonnie-0.1.0.tar.gz/tests/unit/test_003_dealer.py
Changed
(renamed from tests/unit/test-003-dealer.py)
View file
bonnie-0.1.0.tar.gz/tests/unit/test_004_worker.py
Changed
(renamed from tests/unit/test-004-worker.py)
View file
bonnie-0.1.0.tar.gz/tests/unit/test_005_persistence.py
Added
@@ -0,0 +1,100 @@ +import os +import json +import bonnie +from bonnie.broker import persistence +from bonnie.broker.persistence import db, PersistentBase +from bonnie.broker.brokers.zmq_broker.job import Job +from twisted.trial import unittest + +class PlistItem(PersistentBase): + __tablename__ = 'plisttest' + id = db.Column(db.Integer, primary_key=True) + value = db.Column(db.String) + + def __init__(self, value, id=None): + self.value = value + self.id = id + + def __repr__(self): + return '<PlistItem %s:%r>' % (self.id, self.value) + + +class TestBonniePersistence(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + # TODO: clear database + # PersistentBase.metadata.drop_all(bonnie.broker.persistence.engine) + pass + + def test_001_base_list(self): + plist = persistence.List('base', PlistItem) + plist.append(PlistItem("One")) + plist.append(PlistItem("Two")) + item3 = PlistItem("Three") + plist.append(item3) + + self.assertEqual(len(plist), 3) + self.assertEqual(plist[2], item3) + self.assertEqual(2, plist.index(item3)) + self.assertTrue(item3 in plist) + + plist.append(PlistItem("Five")) + plist.append(PlistItem("Six")) + + plist[4] = PlistItem("Five.5") + self.assertEqual(plist[4].value, "Five.5") + + del plist[plist.index(item3)] + plist.pop(3) + plist.pop() + self.assertEqual(len(plist), 2) + + i = 0 + for item in plist: + i += 1 + self.assertTrue(isinstance(item, PlistItem)) + + self.assertEqual(i, 2) + + + def test_003_broker_jobs(self): + worker_jobs = persistence.List('worker', Job) + collector_jobs = persistence.List('collector', Job) + one = Job(state='PENDING', notification='{"state":"pending","event":"test"}', collector_id='C.1') + two = Job(state='PENDING', notification='{"state":"pending","event":"other"}', collector_id='C.1') + done = Job(state='DONE', notification='{"state":"done","event":"passed"}', collector_id='C.1') + worker_jobs.append(one) + worker_jobs.append(two) + worker_jobs.append(done) + + self.assertEqual(len(worker_jobs), 3) + self.assertEqual(len(collector_jobs), 0) + self.assertTrue(one.uuid in [x.uuid for x in worker_jobs]) + pending = [x for x in worker_jobs if x.state == 'PENDING' and x.collector_id == 'C.1'] + self.assertEqual(len(pending), 2) + job = pending.pop() + + self.assertEqual(job, two) + self.assertEqual(len(pending), 1) + self.assertEqual(len(worker_jobs), 3) + + # move job to another list + collector_jobs.append(one) + self.assertTrue(one in collector_jobs) + self.assertTrue(one in worker_jobs) + + worker_jobs.remove(one) + self.assertFalse(one in worker_jobs) + + self.assertEqual(len(worker_jobs), 2) + self.assertEqual(len(collector_jobs), 1) + + # move job to the end of the queue + worker_jobs.remove(two) + worker_jobs.append(two) + + self.assertEqual(worker_jobs[0], done) + self.assertEqual(worker_jobs[-1], two) \ No newline at end of file
View file
bonnie-0.1.0.tar.gz/tests/unit/test_006_caching.py
Added
@@ -0,0 +1,51 @@ +import time + +from twisted.trial import unittest +from bonnie.worker.storage import CachedDict + +class TestBonnieCaching(unittest.TestCase): + + def test_cached_dict(self): + # dict with 10 seconds TTL + d = CachedDict(10) + d['one'] = 'ONE' + d['two'] = 'TWO' + + self.assertEqual(len(d), 2) + self.assertTrue(d.has_key('one')) + self.assertFalse(d.has_key('none')) + self.assertEqual(d['one'], 'ONE') + # internal sorting is influenced by the expiry time + # but sorting in dicts is irrelevant in most cases + self.assertEqual(sorted(d.keys()), ['one','two']) + self.assertEqual(sorted(d.values()), ['ONE','TWO']) + self.assertEqual(sorted(d.items()), [ ('one','ONE'), ('two','TWO') ]) + + time.sleep(5) + d['five'] = 'FIVE' + time.sleep(1) + d['six'] = 'SIX' + self.assertEqual(len(d), 4) + self.assertEqual(d.pop('five'), 'FIVE') + self.assertEqual(len(d), 3) + + # let the first two items expire + time.sleep(5) + self.assertEqual(len(d), 1) + self.assertFalse(d.has_key('one')) + + # test iterator + # 'five' was popped, thus only 'six' remains + for k,v in d: + self.assertEqual(k, 'six') + self.assertEqual(v, 'SIX') + + # all expired + time.sleep(5) + self.assertEqual(len(d), 0) + self.assertEqual(len(d.data), 3) + + # expunge internal cache + d.expunge() + self.assertEqual(len(d.data), 0) + \ No newline at end of file
View file
bonnie-0.1.tar.gz/worker.py -> bonnie-0.1.0.tar.gz/worker.py
Changed
@@ -1,10 +1,8 @@ #!/usr/bin/python -import signal from bonnie.worker import BonnieWorker if __name__ == "__main__": worker = BonnieWorker() - signal.signal(signal.SIGTERM, worker.terminate) - worker.run() + worker.start()
View file
bonnie-0.1.tar.gz/tests/unit/test-001-utils.py
Deleted
@@ -1,78 +0,0 @@ -import os -import json -from bonnie.utils import expand_uidset -from bonnie.utils import parse_imap_uri -from bonnie.utils import mail_message2dict -from bonnie.utils import decode_message_headers -from bonnie.utils import imap_folder_path -from bonnie.utils import imap_mailbox_fs_path -from email import message_from_string -from twisted.trial import unittest - - -class TestBonnieUtils(unittest.TestCase): - - def setUp(self): - pass - - def _get_resource(self, filename): - pwd = os.path.dirname(__file__) - filepath = os.path.join(pwd, 'resources', filename) - fp = open(filepath, 'r') - data = fp.read() - fp.close() - return data - - def test_expand_uidset(self): - self.assertEqual(expand_uidset('3'), ['3']) - self.assertEqual(expand_uidset('3,5'), ['3','5']) - self.assertEqual(expand_uidset('3:5'), ['3','4','5']) - - def test_parse_imap_uri(self): - url = parse_imap_uri("imap://john.doe@example.org@kolab33.example.org/Calendar/Personal%20Calendar;UIDVALIDITY=1411487702/;UID=3") - self.assertEqual(url['host'], 'kolab33.example.org') - self.assertEqual(url['user'], 'john.doe') - self.assertEqual(url['domain'], 'example.org') - self.assertEqual(url['path'], 'Calendar/Personal Calendar') - self.assertEqual(url['UID'], '3') - - def test_decode_message_headers(self): - message = message_from_string(self._get_resource('3.')) - headers = decode_message_headers(message) - - self.assertEqual(len(headers['From']), 1) - self.assertEqual(len(headers['To']), 2) - self.assertEqual(headers['To'][0], u'Br\u00fcderli, Thomas <thomas.bruederli@example.org>') - self.assertEqual(headers['Content-Type'], 'text/plain') - self.assertEqual(headers['Date'], '2014-09-24T04:52:00Z') - self.assertEqual(headers['Subject'], 'Test') - - def test_mail_message2dict(self): - message = mail_message2dict(self._get_resource('event_mime_message.eml')) - - self.assertIsInstance(message, dict) - self.assertEqual(message['Subject'], '253E800C973E9FB99D174669001DB19B-FCBB6C4091F28CA0') - self.assertEqual(message['X-Kolab-Type'], 'application/x-vnd.kolab.event') - self.assertEqual(len(message['@parts']), 2) - - xmlpart = message['@parts'][1] - self.assertEqual(xmlpart['Content-Type'], 'application/calendar+xml; charset=UTF-8; name=kolab.xml') - - message2 = mail_message2dict("FOO") - self.assertIsInstance(message2, dict) - self.assertEqual(message2['@body'], "FOO") - - def test_imap_folder_path(self): - p1 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/Calendar;UID=3") - self.assertEqual(p1, "user/john.doe/Calendar@example.org") - - p2 = imap_folder_path("imap://john.doe@example.org@kolab.example.org/INBOX;UIDVALIDITY=1411487702") - self.assertEqual(p2, "user/john.doe@example.org") - - # test shared folders (but how are they referred in the uri?) - p3 = imap_folder_path("imap://kolab33.example.org/Shared%20Folders/shared/Project-X%40example.org;UIDVALIDITY=1412093781/;UID=2") - self.assertEqual(p3, "shared/Project-X@example.org") - - def test_imap_mailbox_fs_path(self): - path = imap_mailbox_fs_path("imap://john.doe@example.org@kolab.example.org/Calendar/Personal%20Calendar;UID=3") - self.assertEqual(path, "/var/spool/imap/domain/e/example.org/j/user/john^doe/Calendar/Personal Calendar")
View file
bonnie-0.1.tar.gz/tests/unit/test-005-persistence.py
Deleted
@@ -1,94 +0,0 @@ -import os -import json -import bonnie -from bonnie.broker import persistence -from bonnie.broker.persistence import db, PersistentBase -from bonnie.broker.brokers.zmq_broker.job import Job -from twisted.trial import unittest - -class PlistItem(PersistentBase): - __tablename__ = 'plisttest' - id = db.Column(db.Integer, primary_key=True) - value = db.Column(db.String) - - def __init__(self, value, id=None): - self.value = value - self.id = id - - def __repr__(self): - return '<PlistItem %s:%r>' % (self.id, self.value) - - -class TestBonniePersistence(unittest.TestCase): - - def setUp(self): - pass - - def tearDown(self): - # TODO: clear database - # PersistentBase.metadata.drop_all(bonnie.broker.persistence.engine) - pass - - def test_001_base_list(self): - plist = persistence.List('base', PlistItem) - plist.append(PlistItem("One")) - plist.append(PlistItem("Two")) - item3 = PlistItem("Three") - plist.append(item3) - - self.assertEqual(len(plist), 3) - self.assertEqual(plist[2], item3) - self.assertEqual(2, plist.index(item3)) - self.assertTrue(item3 in plist) - - plist.append(PlistItem("Five")) - plist.append(PlistItem("Six")) - - plist[4] = PlistItem("Five.5") - self.assertEqual(plist[4].value, "Five.5") - - del plist[plist.index(item3)] - plist.pop(3) - plist.pop() - self.assertEqual(len(plist), 2) - - i = 0 - for item in plist: - i += 1 - self.assertTrue(isinstance(item, PlistItem)) - - self.assertEqual(i, 2) - - - def test_003_broker_jobs(self): - worker_jobs = persistence.List('worker', Job) - collector_jobs = persistence.List('collector', Job) - one = Job(state='PENDING', notification='{"state":"pending","event":"test"}', collector_id='C.1') - two = Job(state='PENDING', notification='{"state":"pending","event":"other"}', collector_id='C.1') - done = Job(state='DONE', notification='{"state":"done","event":"passed"}', collector_id='C.1') - worker_jobs.append(one) - worker_jobs.append(two) - worker_jobs.append(done) - - self.assertEqual(len(worker_jobs), 3) - self.assertEqual(len(collector_jobs), 0) - self.assertTrue(one.uuid in [x.uuid for x in worker_jobs]) - pending = [x for x in worker_jobs if x.state == 'PENDING' and x.collector_id == 'C.1'] - self.assertEqual(len(pending), 2) - job = pending.pop() - - self.assertEqual(job, two) - self.assertEqual(len(pending), 1) - self.assertEqual(len(worker_jobs), 3) - - # move job to another list - collector_jobs.append(one) - self.assertTrue(one in collector_jobs) - self.assertTrue(one in worker_jobs) - - worker_jobs.remove(one) - self.assertFalse(one in worker_jobs) - - self.assertEqual(len(worker_jobs), 2) - self.assertEqual(len(collector_jobs), 1) - \ No newline at end of file
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.