Projects
Kolab:3.4:Updates
bonnie
Log In
Username
Password
We truncated the diff of some files because they were too big. If you want to see the full diff for every file,
click here
.
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 40
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.3.0 +Version: 0.3.1 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware @@ -56,6 +56,28 @@ %description Bonnie for Kolab Groupware +%package elasticsearch +Summary: Elasticsearch meta-package for Bonnie +Group: Applications/System +Provides: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Provides: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: python-elasticsearch >= 1.0.0 + +%description +This meta-package pulls in the Elasticsearch output and storage channels +for Bonnie workers. + +%package riak +Summary: Riak meta-package for Bonnie +Group: Applications/System +Provides: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Provides: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: python-riak >= 2.1.0 + +%description +This meta-package pulls in the Riak output and storage channels +for Bonnie workers. + %package broker Summary: The Bonnie broker Group: Applications/System @@ -90,7 +112,8 @@ Summary: The Bonnie worker Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-elasticsearch >= 1.0 +Requires: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} Requires: python-zmq %description worker @@ -99,6 +122,7 @@ %package wui Summary: The Bonnie Web UI Group: Applications/System +Requires: bonnie-flask Requires: python-flask Requires: python-flask-bootstrap Requires: python-flask-httpauth @@ -164,7 +188,7 @@ getent group %{bonnie_group} &>/dev/null || groupadd -r %{bonnie_group} -g %{bonnie_group_id} &>/dev/null getent passwd %{bonnie_user} &>/dev/null || \ useradd -r -u %{bonnie_user_id} -g %{bonnie_group} -d %{_localstatedir}/lib/%{bonnie_user} -s /sbin/nologin \ - -c "Kolab System Account" %{bonnie_user} &>/dev/null || : + -c "Bonnie Account" %{bonnie_user} &>/dev/null || : # Allow the bonnie user access to mail gpasswd -a bonnie mail >/dev/null 2>&1 @@ -284,6 +308,12 @@ %{python_sitelib}/bonnie/utils.py* %{python_sitelib}/bonnie/plugins/ +%files elasticsearch +%defattr(-,root,root,-) + +%files riak +%defattr(-,root,root,-) + %files broker %defattr(-,root,root,-) %{_sbindir}/bonnie-broker @@ -315,4 +345,6 @@ %defattr(-,root,root,-) %changelog +* Thu Dec 11 2014 Jeroen van Meeuwen <vanmeeuwen@kolabsys.com> - 0.3.1-1 +- New upstream release
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -180,9 +180,6 @@ # Once every 30 seconds, expire stale collectors and # workers, unlock jobs and expire those done. if last_expire < (time.time() - 30): - for _collector in collector.select_by_state(b'READY'): - self._request_collector_state(_collector.identity) - collector.expire() worker.expire() job.unlock() @@ -476,11 +473,9 @@ _job = job.select_for_collector(identity) if _job == None: - #log.info("No jobs for collector %s" % (identity)) return - log.info("Job %s ALLOC to %s" % (_job.uuid, identity)) - #log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) self.routers['collector']['router'].send_multipart( [ @@ -495,11 +490,9 @@ _job = job.select_for_worker(identity) if _job == None: - #log.info("No jobs for worker %s" % (identity)) return - log.info("Job %s ALLOC to %s" % (_job.uuid, identity)) - #log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) self.routers['worker_controller']['router'].send_multipart( [ @@ -524,26 +517,41 @@ jwa = job.count_by_type_and_state('worker', b'ALLOC') _job = job.first() + _job_notification = False if _job == None: jt = 0 else: - _job_notification = json.loads(_job.notification) - _job_timestamp = parse(_job_notification['timestamp']).astimezone(tzutc()) - now = parse( - datetime.datetime.strftime( - datetime.datetime.utcnow(), - "%Y-%m-%dT%H:%M:%S.%fZ" - ) - ).astimezone(tzutc()) - - delta = now - _job_timestamp - if hasattr(delta, 'total_seconds'): - seconds = delta.total_seconds() + while _job_notification == False: + try: + _job_notification = json.loads(_job.notification) + except Exception, errmsg: + job.set_state(_job.uuid, b'FAILED') + _job = job.first() + if _job == None: + jt = 0 + break + + _job_notification == False + + if _job_notification == False: + jt = 0 else: - seconds = (delta.days * 24 * 3600) + delta.seconds - - jt = round(seconds, 0) + _job_timestamp = parse(_job_notification['timestamp']).astimezone(tzutc()) + now = parse( + datetime.datetime.strftime( + datetime.datetime.utcnow(), + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + ).astimezone(tzutc()) + + delta = now - _job_timestamp + if hasattr(delta, 'total_seconds'): + seconds = delta.total_seconds() + else: + seconds = (delta.days * 24 * 3600) + delta.seconds + + jt = round(seconds, 0) stats = { 'cb': collector.count_by_state(b'BUSY'),
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py
Changed
@@ -113,22 +113,18 @@ for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state == b'STALE').all(): log.debug("Purging collector %s as very stale" % (collector.identity), level=7) - if not collector.job == None: - _job = db.query(Job).filter_by(id=collector.job).first() - if not _job == None: - _job.state = b'PENDING' - _job.timestamp = datetime.datetime.utcnow() + for _job in db.query(Job).filter_by(collector=collector.identity).all(): + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() db.delete(collector) db.commit() for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state != b'STALE').all(): log.debug("Marking collector %s as stale" % (collector.identity), level=7) - if not collector.job == None: - _job = db.query(Job).filter_by(id=collector.job).first() - if not _job == None: - _job.state = b'PENDING' - _job.timestamp = datetime.datetime.utcnow() + for _job in db.query(Job).filter_by(collector=collector.identity).all(): + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() collector.state = b'STALE' collector.timestamp = datetime.datetime.utcnow()
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -80,7 +80,7 @@ def first(): db = init_db('jobs') - result = db.query(Job).filter(Job.state != b'DONE').order_by(Job.id).first() + result = db.query(Job).filter(Job.state != b'DONE', Job.state != b'FAILED').order_by(Job.id).first() return result def select(job_uuid): @@ -131,8 +131,6 @@ job.state = b'ALLOC' job.timestamp = datetime.datetime.utcnow() - collector.job = job.id - collector.state = b'BUSY' db.commit() return job @@ -228,9 +226,6 @@ log.debug("Unlocking %s job %s" % (job.job_type, job.uuid), level=7) job.state = b'PENDING' - for collector in db.query(Collector).filter_by(job=job.id).all(): - collector.job = None - for worker in db.query(Worker).filter_by(job=job.id).all(): worker.job = None
View file
bonnie-0.3.0.tar.gz/bonnie/collector/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/collector/__init__.py
Changed
@@ -43,41 +43,30 @@ self.handler_interests = {} - num_threads = int(conf.get('collector', 'num_threads', 5)) + self.num_threads = int(conf.get('collector', 'num_threads', 5)) + self.num_threads_busy = 0 if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): - self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, (), 1) + self.pool = multiprocessing.Pool(self.num_threads, self._worker_process_start, (), 1) else: - self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, ()) - - def _worker_process_start(self, *args, **kw): - log.debug("Worker process %s initializing" % (multiprocessing.current_process().name), level=1) + self.pool = multiprocessing.Pool(self.num_threads, self._worker_process_start, ()) def execute(self, commands, job_uuid, notification): """ Dispatch collector job to the according handler(s) """ - log.debug("Executing collection for %s" % (commands), level=8) + self.num_threads_busy += 1 # execute this asynchronously in a child process self.pool.apply_async( - async_execute_handlers, - ( - commands.split(), - notification, - job_uuid - ), - callback = self._execute_callback - ) - - def _execute_callback(self, result): - (notification, job_uuid) = result - - # pass result back to input module(s) - input_modules = conf.get('collector', 'input_modules').split(',') - for _input in self.input_modules.values(): - if _input.name() in input_modules: - _input.callback_done(job_uuid, notification) + async_execute_handlers, + ( + commands.split(), + notification, + job_uuid + ), + callback = self._execute_callback + ) def register_input(self, interests): self.input_interests = interests @@ -99,7 +88,13 @@ handler = _class() handler.register(callback=self.register_handler) - input_modules = conf.get('collector', 'input_modules').split(',') + input_modules = conf.get('collector', 'input_modules') + + if input_modules == None: + input_modules = "" + + input_modules = [x.strip() for x in input_modules.split(',')] + for _input in self.input_modules.values(): if _input.name() in input_modules: _input.run(callback=self.execute, interests=self.handler_interests.keys()) @@ -113,6 +108,22 @@ self.pool.close() + def _execute_callback(self, result): + (notification, job_uuid) = result + + self.num_threads_busy -= 1 + + # pass result back to input module(s) + input_modules = conf.get('collector', 'input_modules').split(',') + for _input in self.input_modules.values(): + if _input.name() in input_modules: + _input.callback_done(job_uuid, notification, threads=self._threads_available()) + + def _threads_available(self): + return (self.num_threads - self.num_threads_busy) + + def _worker_process_start(self, *args, **kw): + log.info("Worker process %s initializing" % (multiprocessing.current_process().name)) def async_execute_handlers(commands, notification, job_uuid): """ @@ -120,6 +131,9 @@ To be run an an asynchronous child process. """ + + log.info("COLLECT %r for %s by %s" % (commands, job_uuid, multiprocessing.current_process().name)) + # register handlers with the interrests again in this subprocess handler_interests = {}
View file
bonnie-0.3.0.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.3.1.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -24,6 +24,7 @@ """ +import datetime import json import os import socket @@ -36,6 +37,7 @@ log = bonnie.getLogger('bonnie.collector.ZMQInput') class ZMQInput(object): + report_timestamp = 0 state = b"READY" def __init__(self, *args, **kw): @@ -53,6 +55,7 @@ self.collector.connect(zmq_broker_address) self.stream = zmqstream.ZMQStream(self.collector) + self.ioloop = ioloop.IOLoop.instance() def name(self): return 'zmq_input' @@ -60,10 +63,18 @@ def register(self, *args, **kw): pass - def report_state(self, interests=[]): + def report_state(self, new_timeout=False): log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) - self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) - self.report_timestamp = time.time() + + if self.report_timestamp < (time.time() - 10): + self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) + self.report_timestamp = time.time() + + if new_timeout: + self.ioloop.add_timeout(datetime.timedelta(seconds=10), self.report_state) + + def report_state_with_timeout(self): + self.report_state(new_timeout=True) def run(self, callback=None, interests=[]): log.info("[%s] starting", self.identity) @@ -71,10 +82,10 @@ # report READY state with interests self.interests = interests self.notify_callback = callback - self.report_state() self.stream.on_recv(self._cb_on_recv_multipart) - ioloop.IOLoop.instance().start() + self.ioloop.add_timeout(datetime.timedelta(seconds=10), self.report_state_with_timeout) + self.ioloop.start() def _cb_on_recv_multipart(self, message): """ @@ -85,6 +96,8 @@ if cmd == b"STATE": self.report_state() + elif self.state == b'BUSY': + self.report_state() else: job_uuid = message[1] notification = message[2] @@ -92,9 +105,15 @@ if not self.notify_callback == None: self.notify_callback(cmd, job_uuid, notification) - def callback_done(self, job_uuid, result): + def callback_done(self, job_uuid, result, threads = 0): log.debug("Handler callback done for job %s: %r" % (job_uuid, result), level=8) - self.report_timestamp = time.time() + #log.info("Threads available: %d" % (threads)) + + if threads > 0: + self.state = b'READY' + else: + self.state = b'BUSY' + self.collector.send_multipart([b"DONE", job_uuid, result]) log.info("Job %s DONE by %s" % (job_uuid, self.identity))
View file
bonnie-0.3.0.tar.gz/bonnie/worker/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/__init__.py
Changed
@@ -152,14 +152,15 @@ __class.register(callback=self.register_input) self.input_modules[_class] = __class - output_modules = conf.get('worker', 'output_modules').split(',') + output_modules = [x.strip() for x in conf.get('worker', 'output_modules', '').split(',')] + for _class in outputs.list_classes(): _output = _class() if _output.name() in output_modules: _output.register(callback=self.register_output) self.output_modules[_class] = _output - storage_modules = conf.get('worker', 'storage_modules').split(',') + storage_modules = [x.strip() for x in conf.get('worker', 'storage_modules', '').split(',')] for _class in storage.list_classes(): _storage = _class() if _storage.name() in storage_modules: @@ -167,9 +168,9 @@ self.storage_modules[_class] = _storage self.storage = _storage - output_exclude_events = conf.get('worker', 'output_exclude_events', None) - if not output_exclude_events == None: - self.output_exclude_events = output_exclude_events.split(',') + output_exclude_events = conf.get('worker', 'output_exclude_events', '') + + self.output_exclude_events = [x.strip() for x in output_exclude_events.split(',')] def event_notification(self, notification): """
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/base.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/base.py
Changed
@@ -18,9 +18,19 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # +import bonnie +conf = bonnie.getConf() + class HandlerBase(object): + features = [] + def __init__(self, *args, **kw): - pass + self.features = conf.get('bonnie', 'features') + + if self.features == None: + self.features = "" + + self.features = [x.strip() for x in self.features.split(',')] def register(self, callback): interests = {
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/messagebase.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/messagebase.py
Changed
@@ -23,6 +23,8 @@ from bonnie.utils import parse_imap_uri from bonnie.worker.handlers import HandlerBase +conf = bonnie.getConf() + class MessageHandlerBase(HandlerBase): event = None @@ -34,12 +36,23 @@ # call super for some basic notification processing (notification, jobs) = super(MessageHandlerBase, self).run(notification) + relevant = False + + if 'archive' in self.features: + relevant = True + + if 'backup' in self.features: + relevant = True + + if not relevant: + return (notification, jobs) + # Insert the URI UID (if it exists) in to uidset for further handlers if notification.has_key('uri') and notification.has_key('uidset'): uri = parse_imap_uri(notification['uri']) if uri.has_key('UID'): - notification['uidset'] = uri['UID'] + notification['uidset'] = [ uri['UID'] ] # message notifications require message headers if not notification.has_key('messageHeaders'):
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/messagenew.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/messagenew.py
Changed
@@ -22,10 +22,35 @@ Base handler for an event notification of type 'MessageNew' """ +import bonnie from bonnie.worker.handlers import MessageAppendHandler +conf = bonnie.getConf() + class MessageNewHandler(MessageAppendHandler): event = 'MessageNew' def __init__(self, *args, **kw): super(MessageNewHandler, self).__init__(*args, **kw) + + def run(self, notification): + # call super for some basic notification processing + (notification, jobs) = super(MessageAppendHandler, self).run(notification) + + relevant = False + + if 'archive' in self.features: + relevant = True + + if 'backup' in self.features: + relevant = True + + if not relevant: + return (notification, jobs) + + if not notification.has_key('messageContent') or notification['messageContent'] in [None, ""]: + self.log.debug("Adding FETCH job for " + self.event, level=8) + return (notification, [ b"FETCH" ]) + + return (notification, jobs) +
View file
bonnie-0.3.0.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.3.1.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -156,7 +156,7 @@ self.set_state_ready() - if report is not None and self.lastping < (now - random.randint(55,65)): + if report is not None and self.lastping < (now - random.randint(300,600)): report() self.lastping = now
View file
bonnie-0.3.0.tar.gz/bonnie/worker/outputs/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/outputs/__init__.py
Changed
@@ -18,13 +18,27 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # -from elasticsearch_output import ElasticSearchOutput +__all__ = [] -__all__ = [ - 'ElasticSearchOutput' - ] +try: + from elasticsearch_output import ElasticSearchOutput + __all__.append('ElasticSearchOutput') +except ImportError, errmsg: + pass + +try: + from riak_output import RiakOutput + __all__.append('RiakOutput') +except ImportError, errmsg: + pass def list_classes(): - return [ - ElasticSearchOutput - ] + classes = [] + + if 'ElasticSearchOutput' in __all__: + classes.append(ElasticSearchOutput) + + if 'RiakOutput' in __all__: + classes.append(RiakOutput) + + return classes
View file
bonnie-0.3.1.tar.gz/bonnie/worker/outputs/riak_output.py
Added
@@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# Thomas Bruederli (Kolab Systems) <bruederli a kolabsys.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import datetime + +from dateutil.parser import parse +from dateutil.tz import tzutc + +import riak +import json + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.RiakOutput') + +class RiakOutput(object): + def __init__(self, *args, **kw): + riak_output_address = conf.get( + 'worker', + 'riak_output_address' + ) + + if riak_output_address == None: + riak_output_address = 'localhost' + + self.riak = riak.Riak( + host=riak_output_address + ) + + def name(self): + return 'riak_output' + + def register(self, callback): + self.worker = callback({'_all': { 'callback': self.run }}) + + def notification2log(self, notification): + """ + Convert the given event notification record into a valid log entry + """ + keymap = { + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': None, + 'messageContent': None, + 'bodyStructure': None, + 'metadata': None, + 'acl': None, + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'olduidset', + 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', + } + + log = { '@version': bonnie.API_VERSION } + + for key,val in notification.iteritems(): + newkey = keymap[key] if keymap.has_key(key) else key + if newkey is not None: + # convert NIL values into None which is more appropriate + if isinstance(val, list): + val = [x for x in val if not x == "NIL"] + elif val == "NIL": + val = None + + log[newkey] = val + + return log + + def run(self, notification): + # The output should have UTC timestamps, but gets "2014-05-16T12:55:53.870+02:00" + try: + timestamp = parse(notification['timestamp']).astimezone(tzutc()) + except: + timestamp = datetime.datetime.now(tzutc()) + + notification['@timestamp'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + index = 'logstash-%s' % (datetime.datetime.strftime(timestamp, "%Y.%m.%d")) + jobs = [] + + # for notifications concerning multiple messages, create entries for each message + if notification.has_key('messageHeaders') and isinstance(notification['messageHeaders'], dict) and len(notification['messageHeaders']) > 0: + for uid,headers in notification['messageHeaders'].iteritems(): + notification['uidset'] = uid + notification['headers'] = headers + notification['message'] = None + if notification.has_key('messageContent') and notification['messageContent'].has_key(uid): + notification['message'] = notification['messageContent'][uid] + # no need for bodystructure if we have the real message content + notification.pop('bodyStructure', None) + + # remove vnd.cmu.envelope if we have headers + notification.pop('vnd.cmu.envelope', None) + + try: + self.riak.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, errmsg: + log.warning("Riak create exception: %r", e) + jobs.append(b'POSTPONE') + break + + else: + try: + self.riak.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, errmsg: + log.warning("Riak create exception: %r", e) + jobs.append(b'POSTPONE') + + return (notification, jobs)
View file
bonnie-0.3.0.tar.gz/bonnie/worker/storage/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/storage/__init__.py
Changed
@@ -19,14 +19,30 @@ # from caching import CachedDict -from elasticsearch_storage import ElasticSearchStorage __all__ = [ - 'CachedDict', - 'ElasticSearchStorage' + 'CachedDict' ] +try: + from elasticsearch_storage import ElasticSearchStorage + __all__.append('ElasticSearchStorage') +except ImportError, errmsg: + pass + +try: + from riak_storage import RiakStorage + __all__.append('RiakStorage') +except ImportError, errmsg: + pass + def list_classes(): - return [ - ElasticSearchStorage - ] + classes = [] + + if 'ElasticSearchStorage' in __all__: + classes.append(ElasticSearchStorage) + + if 'RiakStorage' in __all__: + classes.append(RiakStorage) + + return classes
View file
bonnie-0.3.1.tar.gz/bonnie/worker/storage/riak_storage.py
Added
@@ -0,0 +1,577 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# Thomas Bruederli (Kolab Systems) <bruederli a kolabsys.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import datetime +import hashlib +import json +import re +import urllib +import random +import riak +import time + +from dateutil.tz import tzutc +from bonnie.utils import parse_imap_uri +from bonnie.worker.storage import CachedDict + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.RiakStorage') + +class RiakStorage(object): + """ + Storage node writing object data into Riak + """ + default_index = 'objects' + default_doctype = 'object' + folders_index = 'objects' + folders_doctype = 'folder' + users_index = 'objects' + users_doctype = 'user' + + def __init__(self, *args, **kw): + riak_output_address = conf.get( + 'worker', + 'riak_storage_address' + ) + + if riak_output_address == None: + riak_output_address = 'localhost' + + self.es = riak.Elasticsearch( + host=riak_output_address + ) + + # use dicts with automatic expiration for caching user/folder + # lookups + self.user_id_cache = CachedDict(300) + self.folder_id_cache = CachedDict(120) + + def name(self): + return 'riak_storage' + + def register(self, callback, **kw): + if callback is not None: + self.worker = callback( + interests = { + 'uidset': { + 'callback': self.resolve_folder_uri + }, + 'folder_uniqueid': { + 'callback': self.resolve_folder_uri + }, + 'mailboxID': { + 'callback': self.resolve_folder_uri, + 'kw': { 'attrib': 'mailboxID' } + } + } + ) + + def get(self, key, index=None, doctype=None, fields=None, **kw): + """ + Standard API for accessing key/value storage + """ + _index = index or self.default_index + _doctype = doctype or self.default_doctype + try: + res = self.riak.get( + index = _index, + doc_type = _doctype, + id = key, + _source_include = fields or '*' + ) + + log.debug( + "Riak get result for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + res + ), + level = 8 + ) + + if res['found']: + result = self._transform_result(res) + else: + result = None + + except riak.exceptions.NotFoundError, errmsg: + log.debug( + "Riak entry not found for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + errmsg + ) + ) + + result = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + result = None + + return result + + def set(self, key, value, index=None, doctype=None, **kw): + """ + Standard API for writing to key/value storage + """ + _index = index or self.default_index + _doctype = doctype or self.default_doctype + + try: + existing = self.riak.get( + index = _index, + doc_type = _doctype, + id = key, + fields = None + ) + + log.debug( + "Riak get result for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + existing + ), + level = 8 + ) + + except riak.exceptions.NotFoundError, errmsg: + existing = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + existing = None + + if existing is None: + try: + ret = self.riak.create( + index = _index, + doc_type = _doctype, + id = key, + body = value, + consistency = 'one', + replication = 'async' + ) + + log.debug( + "Created ES object for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + ret + ), + level = 8 + ) + + except Exception, errmsg: + log.warning("Riak create exception: %r" % (errmsg)) + ret = None + + else: + try: + ret = self.riak.update( + index = _index, + doc_type = _doctype, + id = key, + body = { 'doc': value }, + consistency = 'one', + replication = 'async'
View file
bonnie-0.3.0.tar.gz/conf/bonnie.conf -> bonnie-0.3.1.tar.gz/conf/bonnie.conf
Changed
@@ -1,3 +1,6 @@ +[bonnie] +features = archive, audit, backup, dlp + [broker] zmq_dealer_router_bind_address = tcp://*:5570 zmq_collector_router_bind_address = tcp://*:5571 @@ -17,7 +20,7 @@ zmq_broker_address = tcp://localhost:5570 blacklist_users = cyrus-admin blacklist_events = Login,Logout,AclChange -input_exclude_events = +input_exclude_events = [worker] num_childs = 0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.