Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 40
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.3.0 +Version: 0.3.1 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware @@ -56,6 +56,28 @@ %description Bonnie for Kolab Groupware +%package elasticsearch +Summary: Elasticsearch meta-package for Bonnie +Group: Applications/System +Provides: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Provides: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: python-elasticsearch >= 1.0.0 + +%description +This meta-package pulls in the Elasticsearch output and storage channels +for Bonnie workers. + +%package riak +Summary: Riak meta-package for Bonnie +Group: Applications/System +Provides: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Provides: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: python-riak >= 2.1.0 + +%description +This meta-package pulls in the Riak output and storage channels +for Bonnie workers. + %package broker Summary: The Bonnie broker Group: Applications/System @@ -90,7 +112,8 @@ Summary: The Bonnie worker Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-elasticsearch >= 1.0 +Requires: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} Requires: python-zmq %description worker @@ -99,6 +122,7 @@ %package wui Summary: The Bonnie Web UI Group: Applications/System +Requires: bonnie-flask Requires: python-flask Requires: python-flask-bootstrap Requires: python-flask-httpauth @@ -164,7 +188,7 @@ getent group %{bonnie_group} &>/dev/null || groupadd -r %{bonnie_group} -g %{bonnie_group_id} &>/dev/null getent passwd %{bonnie_user} &>/dev/null || \ useradd -r -u %{bonnie_user_id} -g %{bonnie_group} -d %{_localstatedir}/lib/%{bonnie_user} -s /sbin/nologin \ - -c "Kolab System Account" %{bonnie_user} &>/dev/null || : + -c "Bonnie Account" %{bonnie_user} &>/dev/null || : # Allow the bonnie user access to mail gpasswd -a bonnie mail >/dev/null 2>&1 @@ -284,6 +308,12 @@ %{python_sitelib}/bonnie/utils.py* %{python_sitelib}/bonnie/plugins/ +%files elasticsearch +%defattr(-,root,root,-) + +%files riak +%defattr(-,root,root,-) + %files broker %defattr(-,root,root,-) %{_sbindir}/bonnie-broker @@ -315,4 +345,6 @@ %defattr(-,root,root,-) %changelog +* Thu Dec 11 2014 Jeroen van Meeuwen <vanmeeuwen@kolabsys.com> - 0.3.1-1 +- New upstream release
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -180,9 +180,6 @@ # Once every 30 seconds, expire stale collectors and # workers, unlock jobs and expire those done. if last_expire < (time.time() - 30): - for _collector in collector.select_by_state(b'READY'): - self._request_collector_state(_collector.identity) - collector.expire() worker.expire() job.unlock() @@ -476,11 +473,9 @@ _job = job.select_for_collector(identity) if _job == None: - #log.info("No jobs for collector %s" % (identity)) return - log.info("Job %s ALLOC to %s" % (_job.uuid, identity)) - #log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) self.routers['collector']['router'].send_multipart( [ @@ -495,11 +490,9 @@ _job = job.select_for_worker(identity) if _job == None: - #log.info("No jobs for worker %s" % (identity)) return - log.info("Job %s ALLOC to %s" % (_job.uuid, identity)) - #log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) self.routers['worker_controller']['router'].send_multipart( [ @@ -524,26 +517,41 @@ jwa = job.count_by_type_and_state('worker', b'ALLOC') _job = job.first() + _job_notification = False if _job == None: jt = 0 else: - _job_notification = json.loads(_job.notification) - _job_timestamp = parse(_job_notification['timestamp']).astimezone(tzutc()) - now = parse( - datetime.datetime.strftime( - datetime.datetime.utcnow(), - "%Y-%m-%dT%H:%M:%S.%fZ" - ) - ).astimezone(tzutc()) - - delta = now - _job_timestamp - if hasattr(delta, 'total_seconds'): - seconds = delta.total_seconds() + while _job_notification == False: + try: + _job_notification = json.loads(_job.notification) + except Exception, errmsg: + job.set_state(_job.uuid, b'FAILED') + _job = job.first() + if _job == None: + jt = 0 + break + + _job_notification == False + + if _job_notification == False: + jt = 0 else: - seconds = (delta.days * 24 * 3600) + delta.seconds - - jt = round(seconds, 0) + _job_timestamp = parse(_job_notification['timestamp']).astimezone(tzutc()) + now = parse( + datetime.datetime.strftime( + datetime.datetime.utcnow(), + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + ).astimezone(tzutc()) + + delta = now - _job_timestamp + if hasattr(delta, 'total_seconds'): + seconds = delta.total_seconds() + else: + seconds = (delta.days * 24 * 3600) + delta.seconds + + jt = round(seconds, 0) stats = { 'cb': collector.count_by_state(b'BUSY'),
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py
Changed
@@ -113,22 +113,18 @@ for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state == b'STALE').all(): log.debug("Purging collector %s as very stale" % (collector.identity), level=7) - if not collector.job == None: - _job = db.query(Job).filter_by(id=collector.job).first() - if not _job == None: - _job.state = b'PENDING' - _job.timestamp = datetime.datetime.utcnow() + for _job in db.query(Job).filter_by(collector=collector.identity).all(): + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() db.delete(collector) db.commit() for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state != b'STALE').all(): log.debug("Marking collector %s as stale" % (collector.identity), level=7) - if not collector.job == None: - _job = db.query(Job).filter_by(id=collector.job).first() - if not _job == None: - _job.state = b'PENDING' - _job.timestamp = datetime.datetime.utcnow() + for _job in db.query(Job).filter_by(collector=collector.identity).all(): + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() collector.state = b'STALE' collector.timestamp = datetime.datetime.utcnow()
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -80,7 +80,7 @@ def first(): db = init_db('jobs') - result = db.query(Job).filter(Job.state != b'DONE').order_by(Job.id).first() + result = db.query(Job).filter(Job.state != b'DONE', Job.state != b'FAILED').order_by(Job.id).first() return result def select(job_uuid): @@ -131,8 +131,6 @@ job.state = b'ALLOC' job.timestamp = datetime.datetime.utcnow() - collector.job = job.id - collector.state = b'BUSY' db.commit() return job @@ -228,9 +226,6 @@ log.debug("Unlocking %s job %s" % (job.job_type, job.uuid), level=7) job.state = b'PENDING' - for collector in db.query(Collector).filter_by(job=job.id).all(): - collector.job = None - for worker in db.query(Worker).filter_by(job=job.id).all(): worker.job = None
View file
bonnie-0.3.0.tar.gz/bonnie/collector/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/collector/__init__.py
Changed
@@ -43,41 +43,30 @@ self.handler_interests = {} - num_threads = int(conf.get('collector', 'num_threads', 5)) + self.num_threads = int(conf.get('collector', 'num_threads', 5)) + self.num_threads_busy = 0 if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): - self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, (), 1) + self.pool = multiprocessing.Pool(self.num_threads, self._worker_process_start, (), 1) else: - self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, ()) - - def _worker_process_start(self, *args, **kw): - log.debug("Worker process %s initializing" % (multiprocessing.current_process().name), level=1) + self.pool = multiprocessing.Pool(self.num_threads, self._worker_process_start, ()) def execute(self, commands, job_uuid, notification): """ Dispatch collector job to the according handler(s) """ - log.debug("Executing collection for %s" % (commands), level=8) + self.num_threads_busy += 1 # execute this asynchronously in a child process self.pool.apply_async( - async_execute_handlers, - ( - commands.split(), - notification, - job_uuid - ), - callback = self._execute_callback - ) - - def _execute_callback(self, result): - (notification, job_uuid) = result - - # pass result back to input module(s) - input_modules = conf.get('collector', 'input_modules').split(',') - for _input in self.input_modules.values(): - if _input.name() in input_modules: - _input.callback_done(job_uuid, notification) + async_execute_handlers, + ( + commands.split(), + notification, + job_uuid + ), + callback = self._execute_callback + ) def register_input(self, interests): self.input_interests = interests @@ -99,7 +88,13 @@ handler = _class() handler.register(callback=self.register_handler) - input_modules = conf.get('collector', 'input_modules').split(',') + input_modules = conf.get('collector', 'input_modules') + + if input_modules == None: + input_modules = "" + + input_modules = [x.strip() for x in input_modules.split(',')] + for _input in self.input_modules.values(): if _input.name() in input_modules: _input.run(callback=self.execute, interests=self.handler_interests.keys()) @@ -113,6 +108,22 @@ self.pool.close() + def _execute_callback(self, result): + (notification, job_uuid) = result + + self.num_threads_busy -= 1 + + # pass result back to input module(s) + input_modules = conf.get('collector', 'input_modules').split(',') + for _input in self.input_modules.values(): + if _input.name() in input_modules: + _input.callback_done(job_uuid, notification, threads=self._threads_available()) + + def _threads_available(self): + return (self.num_threads - self.num_threads_busy) + + def _worker_process_start(self, *args, **kw): + log.info("Worker process %s initializing" % (multiprocessing.current_process().name)) def async_execute_handlers(commands, notification, job_uuid): """ @@ -120,6 +131,9 @@ To be run an an asynchronous child process. """ + + log.info("COLLECT %r for %s by %s" % (commands, job_uuid, multiprocessing.current_process().name)) + # register handlers with the interrests again in this subprocess handler_interests = {}
View file
bonnie-0.3.0.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.3.1.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -24,6 +24,7 @@ """ +import datetime import json import os import socket @@ -36,6 +37,7 @@ log = bonnie.getLogger('bonnie.collector.ZMQInput') class ZMQInput(object): + report_timestamp = 0 state = b"READY" def __init__(self, *args, **kw): @@ -53,6 +55,7 @@ self.collector.connect(zmq_broker_address) self.stream = zmqstream.ZMQStream(self.collector) + self.ioloop = ioloop.IOLoop.instance() def name(self): return 'zmq_input' @@ -60,10 +63,18 @@ def register(self, *args, **kw): pass - def report_state(self, interests=[]): + def report_state(self, new_timeout=False): log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) - self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) - self.report_timestamp = time.time() + + if self.report_timestamp < (time.time() - 10): + self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) + self.report_timestamp = time.time() + + if new_timeout: + self.ioloop.add_timeout(datetime.timedelta(seconds=10), self.report_state) + + def report_state_with_timeout(self): + self.report_state(new_timeout=True) def run(self, callback=None, interests=[]): log.info("[%s] starting", self.identity) @@ -71,10 +82,10 @@ # report READY state with interests self.interests = interests self.notify_callback = callback - self.report_state() self.stream.on_recv(self._cb_on_recv_multipart) - ioloop.IOLoop.instance().start() + self.ioloop.add_timeout(datetime.timedelta(seconds=10), self.report_state_with_timeout) + self.ioloop.start() def _cb_on_recv_multipart(self, message): """ @@ -85,6 +96,8 @@ if cmd == b"STATE": self.report_state() + elif self.state == b'BUSY': + self.report_state() else: job_uuid = message[1] notification = message[2] @@ -92,9 +105,15 @@ if not self.notify_callback == None: self.notify_callback(cmd, job_uuid, notification) - def callback_done(self, job_uuid, result): + def callback_done(self, job_uuid, result, threads = 0): log.debug("Handler callback done for job %s: %r" % (job_uuid, result), level=8) - self.report_timestamp = time.time() + #log.info("Threads available: %d" % (threads)) + + if threads > 0: + self.state = b'READY' + else: + self.state = b'BUSY' + self.collector.send_multipart([b"DONE", job_uuid, result]) log.info("Job %s DONE by %s" % (job_uuid, self.identity))
View file
bonnie-0.3.0.tar.gz/bonnie/worker/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/__init__.py
Changed
@@ -152,14 +152,15 @@ __class.register(callback=self.register_input) self.input_modules[_class] = __class - output_modules = conf.get('worker', 'output_modules').split(',') + output_modules = [x.strip() for x in conf.get('worker', 'output_modules', '').split(',')] + for _class in outputs.list_classes(): _output = _class() if _output.name() in output_modules: _output.register(callback=self.register_output) self.output_modules[_class] = _output - storage_modules = conf.get('worker', 'storage_modules').split(',') + storage_modules = [x.strip() for x in conf.get('worker', 'storage_modules', '').split(',')] for _class in storage.list_classes(): _storage = _class() if _storage.name() in storage_modules: @@ -167,9 +168,9 @@ self.storage_modules[_class] = _storage self.storage = _storage - output_exclude_events = conf.get('worker', 'output_exclude_events', None) - if not output_exclude_events == None: - self.output_exclude_events = output_exclude_events.split(',') + output_exclude_events = conf.get('worker', 'output_exclude_events', '') + + self.output_exclude_events = [x.strip() for x in output_exclude_events.split(',')] def event_notification(self, notification): """
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/base.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/base.py
Changed
@@ -18,9 +18,19 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # +import bonnie +conf = bonnie.getConf() + class HandlerBase(object): + features = [] + def __init__(self, *args, **kw): - pass + self.features = conf.get('bonnie', 'features') + + if self.features == None: + self.features = "" + + self.features = [x.strip() for x in self.features.split(',')] def register(self, callback): interests = {
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/messagebase.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/messagebase.py
Changed
@@ -23,6 +23,8 @@ from bonnie.utils import parse_imap_uri from bonnie.worker.handlers import HandlerBase +conf = bonnie.getConf() + class MessageHandlerBase(HandlerBase): event = None @@ -34,12 +36,23 @@ # call super for some basic notification processing (notification, jobs) = super(MessageHandlerBase, self).run(notification) + relevant = False + + if 'archive' in self.features: + relevant = True + + if 'backup' in self.features: + relevant = True + + if not relevant: + return (notification, jobs) + # Insert the URI UID (if it exists) in to uidset for further handlers if notification.has_key('uri') and notification.has_key('uidset'): uri = parse_imap_uri(notification['uri']) if uri.has_key('UID'): - notification['uidset'] = uri['UID'] + notification['uidset'] = [ uri['UID'] ] # message notifications require message headers if not notification.has_key('messageHeaders'):
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/messagenew.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/messagenew.py
Changed
@@ -22,10 +22,35 @@ Base handler for an event notification of type 'MessageNew' """ +import bonnie from bonnie.worker.handlers import MessageAppendHandler +conf = bonnie.getConf() + class MessageNewHandler(MessageAppendHandler): event = 'MessageNew' def __init__(self, *args, **kw): super(MessageNewHandler, self).__init__(*args, **kw) + + def run(self, notification): + # call super for some basic notification processing + (notification, jobs) = super(MessageAppendHandler, self).run(notification) + + relevant = False + + if 'archive' in self.features: + relevant = True + + if 'backup' in self.features: + relevant = True + + if not relevant: + return (notification, jobs) + + if not notification.has_key('messageContent') or notification['messageContent'] in [None, ""]: + self.log.debug("Adding FETCH job for " + self.event, level=8) + return (notification, [ b"FETCH" ]) + + return (notification, jobs) +
View file
bonnie-0.3.0.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.3.1.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -156,7 +156,7 @@ self.set_state_ready() - if report is not None and self.lastping < (now - random.randint(55,65)): + if report is not None and self.lastping < (now - random.randint(300,600)): report() self.lastping = now
View file
bonnie-0.3.0.tar.gz/bonnie/worker/outputs/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/outputs/__init__.py
Changed
@@ -18,13 +18,27 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # -from elasticsearch_output import ElasticSearchOutput +__all__ = [] -__all__ = [ - 'ElasticSearchOutput' - ] +try: + from elasticsearch_output import ElasticSearchOutput + __all__.append('ElasticSearchOutput') +except ImportError, errmsg: + pass + +try: + from riak_output import RiakOutput + __all__.append('RiakOutput') +except ImportError, errmsg: + pass def list_classes(): - return [ - ElasticSearchOutput - ] + classes = [] + + if 'ElasticSearchOutput' in __all__: + classes.append(ElasticSearchOutput) + + if 'RiakOutput' in __all__: + classes.append(RiakOutput) + + return classes
View file
bonnie-0.3.1.tar.gz/bonnie/worker/outputs/riak_output.py
Added
@@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# Thomas Bruederli (Kolab Systems) <bruederli a kolabsys.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import datetime + +from dateutil.parser import parse +from dateutil.tz import tzutc + +import riak +import json + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.RiakOutput') + +class RiakOutput(object): + def __init__(self, *args, **kw): + riak_output_address = conf.get( + 'worker', + 'riak_output_address' + ) + + if riak_output_address == None: + riak_output_address = 'localhost' + + self.riak = riak.Riak( + host=riak_output_address + ) + + def name(self): + return 'riak_output' + + def register(self, callback): + self.worker = callback({'_all': { 'callback': self.run }}) + + def notification2log(self, notification): + """ + Convert the given event notification record into a valid log entry + """ + keymap = { + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': None, + 'messageContent': None, + 'bodyStructure': None, + 'metadata': None, + 'acl': None, + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'olduidset', + 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', + } + + log = { '@version': bonnie.API_VERSION } + + for key,val in notification.iteritems(): + newkey = keymap[key] if keymap.has_key(key) else key + if newkey is not None: + # convert NIL values into None which is more appropriate + if isinstance(val, list): + val = [x for x in val if not x == "NIL"] + elif val == "NIL": + val = None + + log[newkey] = val + + return log + + def run(self, notification): + # The output should have UTC timestamps, but gets "2014-05-16T12:55:53.870+02:00" + try: + timestamp = parse(notification['timestamp']).astimezone(tzutc()) + except: + timestamp = datetime.datetime.now(tzutc()) + + notification['@timestamp'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + index = 'logstash-%s' % (datetime.datetime.strftime(timestamp, "%Y.%m.%d")) + jobs = [] + + # for notifications concerning multiple messages, create entries for each message + if notification.has_key('messageHeaders') and isinstance(notification['messageHeaders'], dict) and len(notification['messageHeaders']) > 0: + for uid,headers in notification['messageHeaders'].iteritems(): + notification['uidset'] = uid + notification['headers'] = headers + notification['message'] = None + if notification.has_key('messageContent') and notification['messageContent'].has_key(uid): + notification['message'] = notification['messageContent'][uid] + # no need for bodystructure if we have the real message content + notification.pop('bodyStructure', None) + + # remove vnd.cmu.envelope if we have headers + notification.pop('vnd.cmu.envelope', None) + + try: + self.riak.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, errmsg: + log.warning("Riak create exception: %r", e) + jobs.append(b'POSTPONE') + break + + else: + try: + self.riak.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, errmsg: + log.warning("Riak create exception: %r", e) + jobs.append(b'POSTPONE') + + return (notification, jobs)
View file
bonnie-0.3.0.tar.gz/bonnie/worker/storage/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/storage/__init__.py
Changed
@@ -19,14 +19,30 @@ # from caching import CachedDict -from elasticsearch_storage import ElasticSearchStorage __all__ = [ - 'CachedDict', - 'ElasticSearchStorage' + 'CachedDict' ] +try: + from elasticsearch_storage import ElasticSearchStorage + __all__.append('ElasticSearchStorage') +except ImportError, errmsg: + pass + +try: + from riak_storage import RiakStorage + __all__.append('RiakStorage') +except ImportError, errmsg: + pass + def list_classes(): - return [ - ElasticSearchStorage - ] + classes = [] + + if 'ElasticSearchStorage' in __all__: + classes.append(ElasticSearchStorage) + + if 'RiakStorage' in __all__: + classes.append(RiakStorage) + + return classes
View file
bonnie-0.3.1.tar.gz/bonnie/worker/storage/riak_storage.py
Added
@@ -0,0 +1,577 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# Thomas Bruederli (Kolab Systems) <bruederli a kolabsys.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import datetime +import hashlib +import json +import re +import urllib +import random +import riak +import time + +from dateutil.tz import tzutc +from bonnie.utils import parse_imap_uri +from bonnie.worker.storage import CachedDict + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.RiakStorage') + +class RiakStorage(object): + """ + Storage node writing object data into Riak + """ + default_index = 'objects' + default_doctype = 'object' + folders_index = 'objects' + folders_doctype = 'folder' + users_index = 'objects' + users_doctype = 'user' + + def __init__(self, *args, **kw): + riak_output_address = conf.get( + 'worker', + 'riak_storage_address' + ) + + if riak_output_address == None: + riak_output_address = 'localhost' + + self.es = riak.Elasticsearch( + host=riak_output_address + ) + + # use dicts with automatic expiration for caching user/folder + # lookups + self.user_id_cache = CachedDict(300) + self.folder_id_cache = CachedDict(120) + + def name(self): + return 'riak_storage' + + def register(self, callback, **kw): + if callback is not None: + self.worker = callback( + interests = { + 'uidset': { + 'callback': self.resolve_folder_uri + }, + 'folder_uniqueid': { + 'callback': self.resolve_folder_uri + }, + 'mailboxID': { + 'callback': self.resolve_folder_uri, + 'kw': { 'attrib': 'mailboxID' } + } + } + ) + + def get(self, key, index=None, doctype=None, fields=None, **kw): + """ + Standard API for accessing key/value storage + """ + _index = index or self.default_index + _doctype = doctype or self.default_doctype + try: + res = self.riak.get( + index = _index, + doc_type = _doctype, + id = key, + _source_include = fields or '*' + ) + + log.debug( + "Riak get result for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + res + ), + level = 8 + ) + + if res['found']: + result = self._transform_result(res) + else: + result = None + + except riak.exceptions.NotFoundError, errmsg: + log.debug( + "Riak entry not found for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + errmsg + ) + ) + + result = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + result = None + + return result + + def set(self, key, value, index=None, doctype=None, **kw): + """ + Standard API for writing to key/value storage + """ + _index = index or self.default_index + _doctype = doctype or self.default_doctype + + try: + existing = self.riak.get( + index = _index, + doc_type = _doctype, + id = key, + fields = None + ) + + log.debug( + "Riak get result for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + existing + ), + level = 8 + ) + + except riak.exceptions.NotFoundError, errmsg: + existing = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + existing = None + + if existing is None: + try: + ret = self.riak.create( + index = _index, + doc_type = _doctype, + id = key, + body = value, + consistency = 'one', + replication = 'async' + ) + + log.debug( + "Created ES object for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + ret + ), + level = 8 + ) + + except Exception, errmsg: + log.warning("Riak create exception: %r" % (errmsg)) + ret = None + + else: + try: + ret = self.riak.update( + index = _index, + doc_type = _doctype, + id = key, + body = { 'doc': value }, + consistency = 'one', + replication = 'async' + ) + + log.debug( + "Updated ES object for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + ret + ), + level = 8 + ) + + except Exception, errmsg: + log.warning("Riak update exception: %r" % (errmsg)) + + return ret + + def select(self, query, index=None, doctype=None, fields=None, sortby=None, limit=None, **kw): + """ + Standard API for querying storage + + @param query: List of query parameters, each represented as a triplet of (<field> <op> <value>). + combined to an AND list of search criterias. <value> can either be + - a string for direct comparison + - a list for "in" comparisons + - a tuple with two values for range queries + @param index: Index name (i.e. database name) + @param doctype: Document type (i.e. table name) + @param fields: List of fields to retrieve (string, comma-separated) + @param sortby: Fields to be used fort sorting the results (string, comma-separated) + @param limit: Number of records to return + """ + result = None + args = dict( + index = index or self.default_index, + doc_type = doctype or self.default_doctype, + _source_include = fields or '*' + ) + + if isinstance(query, dict): + args['body'] = query + elif isinstance(query, list): + args['q'] = self._build_query(query) + else: + args['q'] = query + + if sortby is not None: + args['sort'] = sortby + if limit is not None: + args['size'] = int(limit) + + try: + res = self.riak.search(**args) + log.debug( + "Riak select result for %r: %r" % ( + args['q'] or args['body'], + res + ), + level = 8 + ) + + except riak.exceptions.NotFoundError, errmsg: + log.debug( + "Riak entry not found for %r: %r" % ( + args['q'] or args['body'], + errmsg + ), + level = 8 + ) + + res = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + res = None + + if res is not None and res.has_key('hits'): + result = dict(total=res['hits']['total']) + result['hits'] = [self._transform_result(x) for x in res['hits']['hits']] + else: + result = None + + return result + + def _build_query(self, params, boolean='AND'): + """ + Convert the given list of query parameters into a Lucene + query string. + """ + query = [] + for p in params: + if isinstance(p, str): + # direct query string + query.append(p) + + elif isinstance(p, tuple) and len(p) == 3: + # <field> <op> <value> triplet + (field, op, value) = p + op_ = '-' if op == '!=' else '' + + if isinstance(value, list): + value_ = '("' + '","'.join(value) + '")' + elif isinstance(value, tuple): + value_ = '[%s TO %s]' % value + else: + quote = '"' if not '*' in str(value) else '' + value_ = quote + str(value) + quote + + query.append('%s%s:%s' % (op_, field, value_)) + + elif isinstance(p, tuple) and len(p) == 2: + # group/subquery with boolean operator + (op, subquery) = p + query.append('(' + self._build_query(subquery, op) + ')') + + return (' '+boolean+' ').join(query) + + def _transform_result(self, res): + """ + Turn an riak result item into a simple dict + """ + result = res['_source'] if res.has_key('_source') else dict() + result['_id'] = res['_id'] + result['_index'] = res['_index'] + result['_doctype'] = res['_type'] + + if res.has_key('_score'): + result['_score'] = res['_score'] + + return result + + def resolve_username(self, user, user_data=None, force=False): + """ + Resolve the given username to the corresponding nsuniqueid + from LDAP + """ + if not '@' in user: + return user + + # return id cached in memory + if self.user_id_cache.has_key(user): + return self.user_id_cache[user] + + user_id = None + + # find existing entry in our storage backend + result = self.select( + [ ('user', '=', user) ], + index=self.users_index, + doctype=self.users_doctype, + sortby='@timestamp:desc', + limit=1 + ) + + if result and result['total'] > 0: + user_id = result['hits'][0]['_id'] + + elif user_data and user_data.has_key('id'): + # user data (from LDAP) is provided + user_id = user_data['id'] + + # insert a user record into our database + del user_data['id'] + user_data['user'] = user + user_data['@timestamp'] = datetime.datetime.now( + tzutc() + ).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + self.set( + user_id, + user_data, + index=self.users_index, + doctype=self.users_doctype + ) + + elif force: + user_id = hashlib.md5(user).hexdigest() + + # cache this for 5 minutes + if user_id is not None: + self.user_id_cache[user] = user_id + + return user_id + + + def notificaton2folder(self, notification, attrib='uri'): + """ + Turn the given notification record into a folder document. + including the computation of a unique identifier which is a + checksum of the (relevant) folder properties. + """ + # split the uri parameter into useful parts + uri = parse_imap_uri(notification[attrib]) + + # re-compose folder uri + templ = "imap://%(user)s@%(domain)s@%(host)s/" + if uri['user'] is None: + templ = "imap://%(host)s/" + folder_uri = templ % uri + urllib.quote(uri['path']) + + if not notification.has_key('metadata'): + return False + + if not notification.has_key('folder_uniqueid') and \ + notification['metadata'].has_key( + '/shared/vendor/cmu/cyrus-imapd/uniqueid' + ): + + notification['folder_uniqueid'] = notification['metadata']['/shared/vendor/cmu/cyrus-imapd/uniqueid'] + + if not notification.has_key('folder_uniqueid'): + notification['folder_uniqueid'] = hashlib.md5( + notification[attrib] + ).hexdigest() + + body = { + '@version': bonnie.API_VERSION, + '@timestamp': datetime.datetime.now( + tzutc() + ).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + + 'uniqueid': notification['folder_uniqueid'], + 'metadata': notification['metadata'], + 'acl': dict( + (self.resolve_username(k, force=True),v) for k,v in notification['acl'].iteritems() + ), + + 'type': notification['metadata']['/shared/vendor/kolab/folder-type'] if notification['metadata'].has_key('/shared/vendor/kolab/folder-type') else 'mail', + + 'owner': uri['user'] + '@' + uri['domain'] if uri['user'] is not None else 'nobody', + 'server': uri['host'], + 'name': re.sub('@.+$', '', uri['path']), + 'uri': folder_uri, + } + + # compute folder object signature and the unique identifier + ignore_metadata = [ + '/shared/vendor/cmu/cyrus-imapd/lastupdate', + '/shared/vendor/cmu/cyrus-imapd/pop3newuidl', + '/shared/vendor/cmu/cyrus-imapd/size' + ] + + signature = { + '@version': bonnie.API_VERSION, + 'owner': body['owner'], + 'server': body['server'], + 'uniqueid': notification['folder_uniqueid'], + 'metadata': [ + (k,v) for k,v in sorted(body['metadata'].iteritems()) if k not in ignore_metadata + ], + + 'acl': [ + (k,v) for k,v in sorted(body['acl'].iteritems()) + ], + } + + serialized = ";".join( + "%s:%s" % (k,v) for k,v in sorted(signature.iteritems()) + ) + + folder_id = hashlib.md5(serialized).hexdigest() + + return dict(id=folder_id, body=body) + + def resolve_folder_uri(self, notification, attrib='uri'): + """ + Resolve the folder uri (or folder_uniqueid) into an + riak object ID. + """ + # no folder resolving required + if not notification.has_key(attrib) or \ + notification.has_key('folder_id'): + + return (notification, []) + + now = int(time.time()) + base_uri = re.sub(';.+$', '', notification[attrib]) + jobs = [] + + log.debug( + "Resolve folder for %r = %r" % (attrib, base_uri), + level = 8 + ) + + # return id cached in memory + if not notification.has_key('metadata') and \ + self.folder_id_cache.has_key(base_uri): + + notification['folder_id'] = self.folder_id_cache[base_uri] + return (notification, []) + + # mailbox resolving requires metadata + if not notification.has_key('metadata'): + log.debug("Adding GETMETADATA job", level=8) + jobs.append(b"GETMETADATA") + + # before creating a folder entry, we should collect folder ACLs + if not notification.has_key('acl'): + log.debug("Adding GETACL", level=8) + jobs.append(b"GETACL") + + # reject notification with additional collector jobs + if len(jobs) > 0: + return (notification, jobs) + + # extract folder properties and a unique identifier from the + # notification + folder = self.notificaton2folder(notification) + + # abort if notificaton2folder() failed + if folder is False: + return (notification, []) + + # lookup existing entry + existing = self.get( + index=self.folders_index, + doctype=self.folders_doctype, + key=folder['id'], + fields='uniqueid,name' + ) + + # create an entry for the referenced imap folder + if existing is None: + log.debug( + "Create folder object for: %r" % ( + folder['body']['uri'] + ), + level = 8 + ) + + ret = self.set( + index = self.folders_index, + doctype = self.folders_doctype, + key = folder['id'], + value = folder['body'] + ) + + if ret is None: + folder = None + + # update entry if name changed + elif folder['body']['uniqueid'] == existing['uniqueid'] and \ + not folder['body']['name'] == existing['name']: + + try: + ret = self.riak.update( + index = self.folders_index, + doc_type = self.folders_doctype, + id = folder['id'], + body = { + 'doc': { + 'name': folder['body']['name'], + 'uri': folder['body']['uri'] + } + }, + consistency = 'one', + replication = 'async' + ) + + log.debug("Updated folder object: %r" % (ret), level=8) + + except Exception, errmsg: + log.warning("Riak update exception: %r" % (errmsg)) + + # add reference to internal folder_id + if folder is not None: + self.folder_id_cache[base_uri] = folder['id'] + notification['folder_id'] = folder['id'] + + return (notification, []) + + def report(self): + """ + Callback from the worker main loop to trigger periodic jobs + """ + # clean-up in-memory caches from time to time + self.user_id_cache.expunge() + self.folder_id_cache.expunge()
View file
bonnie-0.3.0.tar.gz/conf/bonnie.conf -> bonnie-0.3.1.tar.gz/conf/bonnie.conf
Changed
@@ -1,3 +1,6 @@ +[bonnie] +features = archive, audit, backup, dlp + [broker] zmq_dealer_router_bind_address = tcp://*:5570 zmq_collector_router_bind_address = tcp://*:5571 @@ -17,7 +20,7 @@ zmq_broker_address = tcp://localhost:5570 blacklist_users = cyrus-admin blacklist_events = Login,Logout,AclChange -input_exclude_events = +input_exclude_events = [worker] num_childs = 0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.