Projects
Kolab:3.4
bonnie
Log In
Username
Password
We truncated the diff of some files because they were too big. If you want to see the full diff for every file,
click here
.
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 40
View file
bonnie.spec
Changed
@@ -17,7 +17,7 @@ %global bonnie_group_id 415 Name: bonnie -Version: 0.3.0 +Version: 0.3.1 Release: 1%{?dist} Summary: Bonnie for Kolab Groupware @@ -56,6 +56,28 @@ %description Bonnie for Kolab Groupware +%package elasticsearch +Summary: Elasticsearch meta-package for Bonnie +Group: Applications/System +Provides: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Provides: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: python-elasticsearch >= 1.0.0 + +%description +This meta-package pulls in the Elasticsearch output and storage channels +for Bonnie workers. + +%package riak +Summary: Riak meta-package for Bonnie +Group: Applications/System +Provides: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Provides: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: python-riak >= 2.1.0 + +%description +This meta-package pulls in the Riak output and storage channels +for Bonnie workers. + %package broker Summary: The Bonnie broker Group: Applications/System @@ -90,7 +112,8 @@ Summary: The Bonnie worker Group: Applications/System Requires: %{name} = %{?epoch:%{epoch}:}%{version}-%{release} -Requires: python-elasticsearch >= 1.0 +Requires: %{name}(output) = %{?epoch:%{epoch}:}%{version}-%{release} +Requires: %{name}(storage) = %{?epoch:%{epoch}:}%{version}-%{release} Requires: python-zmq %description worker @@ -99,6 +122,7 @@ %package wui Summary: The Bonnie Web UI Group: Applications/System +Requires: bonnie-flask Requires: python-flask Requires: python-flask-bootstrap Requires: python-flask-httpauth @@ -164,7 +188,7 @@ getent group %{bonnie_group} &>/dev/null || groupadd -r %{bonnie_group} -g %{bonnie_group_id} &>/dev/null getent passwd %{bonnie_user} &>/dev/null || \ useradd -r -u %{bonnie_user_id} -g %{bonnie_group} -d %{_localstatedir}/lib/%{bonnie_user} -s /sbin/nologin \ - -c "Kolab System Account" %{bonnie_user} &>/dev/null || : + -c "Bonnie Account" %{bonnie_user} &>/dev/null || : # Allow the bonnie user access to mail gpasswd -a bonnie mail >/dev/null 2>&1 @@ -284,6 +308,12 @@ %{python_sitelib}/bonnie/utils.py* %{python_sitelib}/bonnie/plugins/ +%files elasticsearch +%defattr(-,root,root,-) + +%files riak +%defattr(-,root,root,-) + %files broker %defattr(-,root,root,-) %{_sbindir}/bonnie-broker @@ -315,4 +345,6 @@ %defattr(-,root,root,-) %changelog +* Thu Dec 11 2014 Jeroen van Meeuwen <vanmeeuwen@kolabsys.com> - 0.3.1-1 +- New upstream release
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/__init__.py
Changed
@@ -180,9 +180,6 @@ # Once every 30 seconds, expire stale collectors and # workers, unlock jobs and expire those done. if last_expire < (time.time() - 30): - for _collector in collector.select_by_state(b'READY'): - self._request_collector_state(_collector.identity) - collector.expire() worker.expire() job.unlock() @@ -476,11 +473,9 @@ _job = job.select_for_collector(identity) if _job == None: - #log.info("No jobs for collector %s" % (identity)) return - log.info("Job %s ALLOC to %s" % (_job.uuid, identity)) - #log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) self.routers['collector']['router'].send_multipart( [ @@ -495,11 +490,9 @@ _job = job.select_for_worker(identity) if _job == None: - #log.info("No jobs for worker %s" % (identity)) return - log.info("Job %s ALLOC to %s" % (_job.uuid, identity)) - #log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) + log.debug("Sending %s to %s" % (_job.uuid, identity), level=7) self.routers['worker_controller']['router'].send_multipart( [ @@ -524,26 +517,41 @@ jwa = job.count_by_type_and_state('worker', b'ALLOC') _job = job.first() + _job_notification = False if _job == None: jt = 0 else: - _job_notification = json.loads(_job.notification) - _job_timestamp = parse(_job_notification['timestamp']).astimezone(tzutc()) - now = parse( - datetime.datetime.strftime( - datetime.datetime.utcnow(), - "%Y-%m-%dT%H:%M:%S.%fZ" - ) - ).astimezone(tzutc()) - - delta = now - _job_timestamp - if hasattr(delta, 'total_seconds'): - seconds = delta.total_seconds() + while _job_notification == False: + try: + _job_notification = json.loads(_job.notification) + except Exception, errmsg: + job.set_state(_job.uuid, b'FAILED') + _job = job.first() + if _job == None: + jt = 0 + break + + _job_notification == False + + if _job_notification == False: + jt = 0 else: - seconds = (delta.days * 24 * 3600) + delta.seconds - - jt = round(seconds, 0) + _job_timestamp = parse(_job_notification['timestamp']).astimezone(tzutc()) + now = parse( + datetime.datetime.strftime( + datetime.datetime.utcnow(), + "%Y-%m-%dT%H:%M:%S.%fZ" + ) + ).astimezone(tzutc()) + + delta = now - _job_timestamp + if hasattr(delta, 'total_seconds'): + seconds = delta.total_seconds() + else: + seconds = (delta.days * 24 * 3600) + delta.seconds + + jt = round(seconds, 0) stats = { 'cb': collector.count_by_state(b'BUSY'),
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/collector.py
Changed
@@ -113,22 +113,18 @@ for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state == b'STALE').all(): log.debug("Purging collector %s as very stale" % (collector.identity), level=7) - if not collector.job == None: - _job = db.query(Job).filter_by(id=collector.job).first() - if not _job == None: - _job.state = b'PENDING' - _job.timestamp = datetime.datetime.utcnow() + for _job in db.query(Job).filter_by(collector=collector.identity).all(): + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() db.delete(collector) db.commit() for collector in db.query(Collector).filter(Collector.timestamp <= (datetime.datetime.utcnow() - datetime.timedelta(0, 90)), Collector.state != b'STALE').all(): log.debug("Marking collector %s as stale" % (collector.identity), level=7) - if not collector.job == None: - _job = db.query(Job).filter_by(id=collector.job).first() - if not _job == None: - _job.state = b'PENDING' - _job.timestamp = datetime.datetime.utcnow() + for _job in db.query(Job).filter_by(collector=collector.identity).all(): + _job.state = b'PENDING' + _job.timestamp = datetime.datetime.utcnow() collector.state = b'STALE' collector.timestamp = datetime.datetime.utcnow()
View file
bonnie-0.3.0.tar.gz/bonnie/broker/brokers/zmq_broker/job.py -> bonnie-0.3.1.tar.gz/bonnie/broker/brokers/zmq_broker/job.py
Changed
@@ -80,7 +80,7 @@ def first(): db = init_db('jobs') - result = db.query(Job).filter(Job.state != b'DONE').order_by(Job.id).first() + result = db.query(Job).filter(Job.state != b'DONE', Job.state != b'FAILED').order_by(Job.id).first() return result def select(job_uuid): @@ -131,8 +131,6 @@ job.state = b'ALLOC' job.timestamp = datetime.datetime.utcnow() - collector.job = job.id - collector.state = b'BUSY' db.commit() return job @@ -228,9 +226,6 @@ log.debug("Unlocking %s job %s" % (job.job_type, job.uuid), level=7) job.state = b'PENDING' - for collector in db.query(Collector).filter_by(job=job.id).all(): - collector.job = None - for worker in db.query(Worker).filter_by(job=job.id).all(): worker.job = None
View file
bonnie-0.3.0.tar.gz/bonnie/collector/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/collector/__init__.py
Changed
@@ -43,41 +43,30 @@ self.handler_interests = {} - num_threads = int(conf.get('collector', 'num_threads', 5)) + self.num_threads = int(conf.get('collector', 'num_threads', 5)) + self.num_threads_busy = 0 if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): - self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, (), 1) + self.pool = multiprocessing.Pool(self.num_threads, self._worker_process_start, (), 1) else: - self.pool = multiprocessing.Pool(num_threads, self._worker_process_start, ()) - - def _worker_process_start(self, *args, **kw): - log.debug("Worker process %s initializing" % (multiprocessing.current_process().name), level=1) + self.pool = multiprocessing.Pool(self.num_threads, self._worker_process_start, ()) def execute(self, commands, job_uuid, notification): """ Dispatch collector job to the according handler(s) """ - log.debug("Executing collection for %s" % (commands), level=8) + self.num_threads_busy += 1 # execute this asynchronously in a child process self.pool.apply_async( - async_execute_handlers, - ( - commands.split(), - notification, - job_uuid - ), - callback = self._execute_callback - ) - - def _execute_callback(self, result): - (notification, job_uuid) = result - - # pass result back to input module(s) - input_modules = conf.get('collector', 'input_modules').split(',') - for _input in self.input_modules.values(): - if _input.name() in input_modules: - _input.callback_done(job_uuid, notification) + async_execute_handlers, + ( + commands.split(), + notification, + job_uuid + ), + callback = self._execute_callback + ) def register_input(self, interests): self.input_interests = interests @@ -99,7 +88,13 @@ handler = _class() handler.register(callback=self.register_handler) - input_modules = conf.get('collector', 'input_modules').split(',') + input_modules = conf.get('collector', 'input_modules') + + if input_modules == None: + input_modules = "" + + input_modules = [x.strip() for x in input_modules.split(',')] + for _input in self.input_modules.values(): if _input.name() in input_modules: _input.run(callback=self.execute, interests=self.handler_interests.keys()) @@ -113,6 +108,22 @@ self.pool.close() + def _execute_callback(self, result): + (notification, job_uuid) = result + + self.num_threads_busy -= 1 + + # pass result back to input module(s) + input_modules = conf.get('collector', 'input_modules').split(',') + for _input in self.input_modules.values(): + if _input.name() in input_modules: + _input.callback_done(job_uuid, notification, threads=self._threads_available()) + + def _threads_available(self): + return (self.num_threads - self.num_threads_busy) + + def _worker_process_start(self, *args, **kw): + log.info("Worker process %s initializing" % (multiprocessing.current_process().name)) def async_execute_handlers(commands, notification, job_uuid): """ @@ -120,6 +131,9 @@ To be run an an asynchronous child process. """ + + log.info("COLLECT %r for %s by %s" % (commands, job_uuid, multiprocessing.current_process().name)) + # register handlers with the interrests again in this subprocess handler_interests = {}
View file
bonnie-0.3.0.tar.gz/bonnie/collector/inputs/zmq_input.py -> bonnie-0.3.1.tar.gz/bonnie/collector/inputs/zmq_input.py
Changed
@@ -24,6 +24,7 @@ """ +import datetime import json import os import socket @@ -36,6 +37,7 @@ log = bonnie.getLogger('bonnie.collector.ZMQInput') class ZMQInput(object): + report_timestamp = 0 state = b"READY" def __init__(self, *args, **kw): @@ -53,6 +55,7 @@ self.collector.connect(zmq_broker_address) self.stream = zmqstream.ZMQStream(self.collector) + self.ioloop = ioloop.IOLoop.instance() def name(self): return 'zmq_input' @@ -60,10 +63,18 @@ def register(self, *args, **kw): pass - def report_state(self, interests=[]): + def report_state(self, new_timeout=False): log.debug("[%s] Reporting state %s, %r" % (self.identity, self.state, self.interests), level=9) - self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) - self.report_timestamp = time.time() + + if self.report_timestamp < (time.time() - 10): + self.collector.send_multipart([b"STATE", self.state, " ".join(self.interests)]) + self.report_timestamp = time.time() + + if new_timeout: + self.ioloop.add_timeout(datetime.timedelta(seconds=10), self.report_state) + + def report_state_with_timeout(self): + self.report_state(new_timeout=True) def run(self, callback=None, interests=[]): log.info("[%s] starting", self.identity) @@ -71,10 +82,10 @@ # report READY state with interests self.interests = interests self.notify_callback = callback - self.report_state() self.stream.on_recv(self._cb_on_recv_multipart) - ioloop.IOLoop.instance().start() + self.ioloop.add_timeout(datetime.timedelta(seconds=10), self.report_state_with_timeout) + self.ioloop.start() def _cb_on_recv_multipart(self, message): """ @@ -85,6 +96,8 @@ if cmd == b"STATE": self.report_state() + elif self.state == b'BUSY': + self.report_state() else: job_uuid = message[1] notification = message[2] @@ -92,9 +105,15 @@ if not self.notify_callback == None: self.notify_callback(cmd, job_uuid, notification) - def callback_done(self, job_uuid, result): + def callback_done(self, job_uuid, result, threads = 0): log.debug("Handler callback done for job %s: %r" % (job_uuid, result), level=8) - self.report_timestamp = time.time() + #log.info("Threads available: %d" % (threads)) + + if threads > 0: + self.state = b'READY' + else: + self.state = b'BUSY' + self.collector.send_multipart([b"DONE", job_uuid, result]) log.info("Job %s DONE by %s" % (job_uuid, self.identity))
View file
bonnie-0.3.0.tar.gz/bonnie/worker/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/__init__.py
Changed
@@ -152,14 +152,15 @@ __class.register(callback=self.register_input) self.input_modules[_class] = __class - output_modules = conf.get('worker', 'output_modules').split(',') + output_modules = [x.strip() for x in conf.get('worker', 'output_modules', '').split(',')] + for _class in outputs.list_classes(): _output = _class() if _output.name() in output_modules: _output.register(callback=self.register_output) self.output_modules[_class] = _output - storage_modules = conf.get('worker', 'storage_modules').split(',') + storage_modules = [x.strip() for x in conf.get('worker', 'storage_modules', '').split(',')] for _class in storage.list_classes(): _storage = _class() if _storage.name() in storage_modules: @@ -167,9 +168,9 @@ self.storage_modules[_class] = _storage self.storage = _storage - output_exclude_events = conf.get('worker', 'output_exclude_events', None) - if not output_exclude_events == None: - self.output_exclude_events = output_exclude_events.split(',') + output_exclude_events = conf.get('worker', 'output_exclude_events', '') + + self.output_exclude_events = [x.strip() for x in output_exclude_events.split(',')] def event_notification(self, notification): """
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/base.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/base.py
Changed
@@ -18,9 +18,19 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # +import bonnie +conf = bonnie.getConf() + class HandlerBase(object): + features = [] + def __init__(self, *args, **kw): - pass + self.features = conf.get('bonnie', 'features') + + if self.features == None: + self.features = "" + + self.features = [x.strip() for x in self.features.split(',')] def register(self, callback): interests = {
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/messagebase.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/messagebase.py
Changed
@@ -23,6 +23,8 @@ from bonnie.utils import parse_imap_uri from bonnie.worker.handlers import HandlerBase +conf = bonnie.getConf() + class MessageHandlerBase(HandlerBase): event = None @@ -34,12 +36,23 @@ # call super for some basic notification processing (notification, jobs) = super(MessageHandlerBase, self).run(notification) + relevant = False + + if 'archive' in self.features: + relevant = True + + if 'backup' in self.features: + relevant = True + + if not relevant: + return (notification, jobs) + # Insert the URI UID (if it exists) in to uidset for further handlers if notification.has_key('uri') and notification.has_key('uidset'): uri = parse_imap_uri(notification['uri']) if uri.has_key('UID'): - notification['uidset'] = uri['UID'] + notification['uidset'] = [ uri['UID'] ] # message notifications require message headers if not notification.has_key('messageHeaders'):
View file
bonnie-0.3.0.tar.gz/bonnie/worker/handlers/messagenew.py -> bonnie-0.3.1.tar.gz/bonnie/worker/handlers/messagenew.py
Changed
@@ -22,10 +22,35 @@ Base handler for an event notification of type 'MessageNew' """ +import bonnie from bonnie.worker.handlers import MessageAppendHandler +conf = bonnie.getConf() + class MessageNewHandler(MessageAppendHandler): event = 'MessageNew' def __init__(self, *args, **kw): super(MessageNewHandler, self).__init__(*args, **kw) + + def run(self, notification): + # call super for some basic notification processing + (notification, jobs) = super(MessageAppendHandler, self).run(notification) + + relevant = False + + if 'archive' in self.features: + relevant = True + + if 'backup' in self.features: + relevant = True + + if not relevant: + return (notification, jobs) + + if not notification.has_key('messageContent') or notification['messageContent'] in [None, ""]: + self.log.debug("Adding FETCH job for " + self.event, level=8) + return (notification, [ b"FETCH" ]) + + return (notification, jobs) +
View file
bonnie-0.3.0.tar.gz/bonnie/worker/inputs/zmq_input.py -> bonnie-0.3.1.tar.gz/bonnie/worker/inputs/zmq_input.py
Changed
@@ -156,7 +156,7 @@ self.set_state_ready() - if report is not None and self.lastping < (now - random.randint(55,65)): + if report is not None and self.lastping < (now - random.randint(300,600)): report() self.lastping = now
View file
bonnie-0.3.0.tar.gz/bonnie/worker/outputs/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/outputs/__init__.py
Changed
@@ -18,13 +18,27 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # -from elasticsearch_output import ElasticSearchOutput +__all__ = [] -__all__ = [ - 'ElasticSearchOutput' - ] +try: + from elasticsearch_output import ElasticSearchOutput + __all__.append('ElasticSearchOutput') +except ImportError, errmsg: + pass + +try: + from riak_output import RiakOutput + __all__.append('RiakOutput') +except ImportError, errmsg: + pass def list_classes(): - return [ - ElasticSearchOutput - ] + classes = [] + + if 'ElasticSearchOutput' in __all__: + classes.append(ElasticSearchOutput) + + if 'RiakOutput' in __all__: + classes.append(RiakOutput) + + return classes
View file
bonnie-0.3.1.tar.gz/bonnie/worker/outputs/riak_output.py
Added
@@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# Thomas Bruederli (Kolab Systems) <bruederli a kolabsys.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import datetime + +from dateutil.parser import parse +from dateutil.tz import tzutc + +import riak +import json + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.RiakOutput') + +class RiakOutput(object): + def __init__(self, *args, **kw): + riak_output_address = conf.get( + 'worker', + 'riak_output_address' + ) + + if riak_output_address == None: + riak_output_address = 'localhost' + + self.riak = riak.Riak( + host=riak_output_address + ) + + def name(self): + return 'riak_output' + + def register(self, callback): + self.worker = callback({'_all': { 'callback': self.run }}) + + def notification2log(self, notification): + """ + Convert the given event notification record into a valid log entry + """ + keymap = { + 'timestamp': None, + 'clientIP': 'client_ip', + 'clientPort': None, + 'serverPort': None, + 'serverDomain': 'domain', + 'aclRights': 'acl_rights', + 'aclSubject': 'acl_subject', + 'mailboxID': 'mailbox_id', + 'messageSize': 'message_size', + 'messageHeaders': None, + 'messageContent': None, + 'bodyStructure': None, + 'metadata': None, + 'acl': None, + 'flagNames': 'flag_names', + 'diskUsed': 'disk_used', + 'vnd.cmu.oldUidset': 'olduidset', + 'vnd.cmu.sessionId': 'session_id', + 'vnd.cmu.midset': 'message_id', + 'vnd.cmu.unseenMessages': 'unseen_messages', + } + + log = { '@version': bonnie.API_VERSION } + + for key,val in notification.iteritems(): + newkey = keymap[key] if keymap.has_key(key) else key + if newkey is not None: + # convert NIL values into None which is more appropriate + if isinstance(val, list): + val = [x for x in val if not x == "NIL"] + elif val == "NIL": + val = None + + log[newkey] = val + + return log + + def run(self, notification): + # The output should have UTC timestamps, but gets "2014-05-16T12:55:53.870+02:00" + try: + timestamp = parse(notification['timestamp']).astimezone(tzutc()) + except: + timestamp = datetime.datetime.now(tzutc()) + + notification['@timestamp'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + index = 'logstash-%s' % (datetime.datetime.strftime(timestamp, "%Y.%m.%d")) + jobs = [] + + # for notifications concerning multiple messages, create entries for each message + if notification.has_key('messageHeaders') and isinstance(notification['messageHeaders'], dict) and len(notification['messageHeaders']) > 0: + for uid,headers in notification['messageHeaders'].iteritems(): + notification['uidset'] = uid + notification['headers'] = headers + notification['message'] = None + if notification.has_key('messageContent') and notification['messageContent'].has_key(uid): + notification['message'] = notification['messageContent'][uid] + # no need for bodystructure if we have the real message content + notification.pop('bodyStructure', None) + + # remove vnd.cmu.envelope if we have headers + notification.pop('vnd.cmu.envelope', None) + + try: + self.riak.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, errmsg: + log.warning("Riak create exception: %r", e) + jobs.append(b'POSTPONE') + break + + else: + try: + self.riak.create( + index=index, + doc_type='logs', + body=self.notification2log(notification) + ) + except Exception, errmsg: + log.warning("Riak create exception: %r", e) + jobs.append(b'POSTPONE') + + return (notification, jobs)
View file
bonnie-0.3.0.tar.gz/bonnie/worker/storage/__init__.py -> bonnie-0.3.1.tar.gz/bonnie/worker/storage/__init__.py
Changed
@@ -19,14 +19,30 @@ # from caching import CachedDict -from elasticsearch_storage import ElasticSearchStorage __all__ = [ - 'CachedDict', - 'ElasticSearchStorage' + 'CachedDict' ] +try: + from elasticsearch_storage import ElasticSearchStorage + __all__.append('ElasticSearchStorage') +except ImportError, errmsg: + pass + +try: + from riak_storage import RiakStorage + __all__.append('RiakStorage') +except ImportError, errmsg: + pass + def list_classes(): - return [ - ElasticSearchStorage - ] + classes = [] + + if 'ElasticSearchStorage' in __all__: + classes.append(ElasticSearchStorage) + + if 'RiakStorage' in __all__: + classes.append(RiakStorage) + + return classes
View file
bonnie-0.3.1.tar.gz/bonnie/worker/storage/riak_storage.py
Added
@@ -0,0 +1,577 @@ +# -*- coding: utf-8 -*- +# Copyright 2010-2014 Kolab Systems AG (http://www.kolabsys.com) +# +# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com> +# Thomas Bruederli (Kolab Systems) <bruederli a kolabsys.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import datetime +import hashlib +import json +import re +import urllib +import random +import riak +import time + +from dateutil.tz import tzutc +from bonnie.utils import parse_imap_uri +from bonnie.worker.storage import CachedDict + +import bonnie +conf = bonnie.getConf() +log = bonnie.getLogger('bonnie.worker.RiakStorage') + +class RiakStorage(object): + """ + Storage node writing object data into Riak + """ + default_index = 'objects' + default_doctype = 'object' + folders_index = 'objects' + folders_doctype = 'folder' + users_index = 'objects' + users_doctype = 'user' + + def __init__(self, *args, **kw): + riak_output_address = conf.get( + 'worker', + 'riak_storage_address' + ) + + if riak_output_address == None: + riak_output_address = 'localhost' + + self.es = riak.Elasticsearch( + host=riak_output_address + ) + + # use dicts with automatic expiration for caching user/folder + # lookups + self.user_id_cache = CachedDict(300) + self.folder_id_cache = CachedDict(120) + + def name(self): + return 'riak_storage' + + def register(self, callback, **kw): + if callback is not None: + self.worker = callback( + interests = { + 'uidset': { + 'callback': self.resolve_folder_uri + }, + 'folder_uniqueid': { + 'callback': self.resolve_folder_uri + }, + 'mailboxID': { + 'callback': self.resolve_folder_uri, + 'kw': { 'attrib': 'mailboxID' } + } + } + ) + + def get(self, key, index=None, doctype=None, fields=None, **kw): + """ + Standard API for accessing key/value storage + """ + _index = index or self.default_index + _doctype = doctype or self.default_doctype + try: + res = self.riak.get( + index = _index, + doc_type = _doctype, + id = key, + _source_include = fields or '*' + ) + + log.debug( + "Riak get result for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + res + ), + level = 8 + ) + + if res['found']: + result = self._transform_result(res) + else: + result = None + + except riak.exceptions.NotFoundError, errmsg: + log.debug( + "Riak entry not found for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + errmsg + ) + ) + + result = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + result = None + + return result + + def set(self, key, value, index=None, doctype=None, **kw): + """ + Standard API for writing to key/value storage + """ + _index = index or self.default_index + _doctype = doctype or self.default_doctype + + try: + existing = self.riak.get( + index = _index, + doc_type = _doctype, + id = key, + fields = None + ) + + log.debug( + "Riak get result for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + existing + ), + level = 8 + ) + + except riak.exceptions.NotFoundError, errmsg: + existing = None + + except Exception, errmsg: + log.warning("Riak get exception: %r" % (errmsg)) + existing = None + + if existing is None: + try: + ret = self.riak.create( + index = _index, + doc_type = _doctype, + id = key, + body = value, + consistency = 'one', + replication = 'async' + ) + + log.debug( + "Created ES object for %s/%s/%s: %r" % ( + _index, + _doctype, + key, + ret + ), + level = 8 + ) + + except Exception, errmsg: + log.warning("Riak create exception: %r" % (errmsg)) + ret = None + + else: + try: + ret = self.riak.update( + index = _index, + doc_type = _doctype, + id = key, + body = { 'doc': value }, + consistency = 'one', + replication = 'async'
View file
bonnie-0.3.0.tar.gz/conf/bonnie.conf -> bonnie-0.3.1.tar.gz/conf/bonnie.conf
Changed
@@ -1,3 +1,6 @@ +[bonnie] +features = archive, audit, backup, dlp + [broker] zmq_dealer_router_bind_address = tcp://*:5570 zmq_collector_router_bind_address = tcp://*:5571 @@ -17,7 +20,7 @@ zmq_broker_address = tcp://localhost:5570 blacklist_users = cyrus-admin blacklist_events = Login,Logout,AclChange -input_exclude_events = +input_exclude_events = [worker] num_childs = 0
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.