Projects
Kolab:Winterfell
pykolab-python3
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
Expand all
Collapse all
Changes of Revision 2
View file
pykolab-0.9.0.tar.gz/pykolab/itip/__init__.py
Changed
@@ -173,112 +173,105 @@ return itip_objects -def check_event_conflict(kolab_event, itip_event): +def check_event_conflict_impl(kolab_event, itip_event): """ Determine whether the given kolab event conflicts with the given itip event """ conflict = False # don't consider conflict with myself - if kolab_event.uid == itip_event'uid': - return conflict + if kolab_event.uid == itip_event.uid: + return False # don't consider conflict if event has TRANSP:TRANSPARENT - if _is_transparent(kolab_event): - return conflict - - if _is_transparent(itip_event'xml'): - return conflict + if _is_transparent(kolab_event) or _is_transparent(itip_event): + return False _es = to_dt(kolab_event.get_start()) # use iCal style end date: next day for all-day events _ee = to_dt(kolab_event.get_ical_dtend()) - _is = to_dt(itip_event'start') - _ie = to_dt(itip_event'end') + _is = to_dt(itip_event.get_start()) + _ie = to_dt(itip_event.get_ical_dtend()) # Escape looping through anything if neither of the events is recurring. - if not itip_event'xml'.is_recurring() and not kolab_event.is_recurring(): + if not itip_event.is_recurring() and not kolab_event.is_recurring(): return check_date_conflict(_es, _ee, _is, _ie) - loop = 0 + if _ee < _is: + earlierEvent = kolab_event + laterEvent = itip_event + else: + earlierEvent = itip_event + laterEvent = kolab_event - done = False + # We can't move forward, so no conflict + if not earlierEvent.is_recurring(): + return False + + targetStart = to_dt(laterEvent.get_start()) + targetEnd = to_dt(laterEvent.get_ical_dtend()) + + eventStart = to_dt(earlierEvent.get_start()) + eventEnd = to_dt(earlierEvent.get_ical_dtend()) + + loop = 0 # naive loops to check for collisions in (recurring) events # TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday) - while not conflict and not done: + while not conflict: loop += 1 + # Safeguard + if loop > 100: + log.warning("Breaking conflict detection loop after %d iterations." % (loop)) + return False + + # Scroll forward the earlier event recurrence until we're at the target date. + while eventEnd < targetStart and eventStart is not None: + log.debug( + "Attempt to move forward kolab event recurrence from {} closer to {}".format( + eventStart, + targetStart + ), + level=8 + ) - # Scroll forward the kolab event recurrence until we're in the prime - # spot. We choose to start with the Kolab event because that is likely - # the older one. - if _ee < _is: - while _ee < _is and _es is not None and kolab_event.is_recurring(): - log.debug( - "Attempt to move forward kolab event recurrence from {} closer to {}".format( - _ee, - _is - ), - level=8 - ) - - __es = to_dt(kolab_event.get_next_occurence(_es)) - - if __es is not None and not __es == _es: - _es = __es - _ee = to_dt(kolab_event.get_occurence_end_date(_es)) - else: - done = True - break - - # Scroll forward the itip event recurrence until we're in the - # prime spot, this time with the iTip event. - if _ie < _es: - while _ie < _es and _is is not None and itip_event'xml'.is_recurring(): - log.debug( - "Attempt to move forward itip event recurrence from {} closer to {}".format( - _ie, - _es - ), - level=8 - ) - - __is = to_dt(itip_event'xml'.get_next_occurence(_is)) - - if __is is not None and not _is == __is: - _is = __is - _ie = to_dt(itip_event'xml'.get_occurence_end_date(_is)) - else: - done = True - break + nextEventStart = to_dt(earlierEvent.get_next_occurence(eventStart)) + + if nextEventStart is not None and nextEventStart != eventStart: + eventStart = nextEventStart + eventEnd = to_dt(earlierEvent.get_occurence_end_date(eventStart)) + else: + # We can't move forward, break and compare + return check_date_conflict(eventStart, eventEnd, targetStart, targetEnd) # Now that we have some events somewhere in the same neighborhood... - conflict = check_date_conflict(_es, _ee, _is, _ie) + conflict = check_date_conflict(eventStart, eventEnd, targetStart, targetEnd) log.debug( - "* Comparing itip at %s/%s with kolab at %s/%s: conflict - %r (occurence - %d)" % ( - _is, _ie, _es, _ee, conflict, loop + "* Comparing earlier at %s/%s with later at %s/%s: conflict - %r (occurence - %d)" % ( + eventStart, eventEnd, targetStart, targetEnd, conflict, loop ), level=8 ) if not conflict: - if kolab_event.is_recurring() and itip_event'xml'.is_recurring(): - if not kolab_event.has_exceptions() and not itip_event'xml'.has_exceptions(): - log.debug("No conflict, both recurring, but neither with exceptions", level=8) - done = True - break - - _is = to_dt(itip_event'xml'.get_next_occurence(_is)) - - if _is is not None: - _ie = to_dt(itip_event'xml'.get_occurence_end_date(_is)) + # Move the later event one forward, and try again + targetStart = to_dt(laterEvent.get_next_occurence(targetStart)) + if targetStart is not None: + targetEnd = to_dt(itip_event.get_occurence_end_date(targetStart)) else: - done = True + return False return conflict +def check_event_conflict(kolab_event, itip_event): + """ + Determine whether the given kolab event conflicts with the given itip event + """ + return check_event_conflict_impl(kolab_event, itip_event'xml') + + def _is_transparent(event): return event.get_transparency() or event.get_status() == kolabformat.StatusCancelled
View file
pykolab-0.9.0.tar.gz/pykolab/wap_client/__init__.py
Changed
@@ -30,7 +30,7 @@ kolab_wap_url = conf.get('kolab_wap', 'api_url') -if not kolab_wap_url == None: +if kolab_wap_url is not None: result = urlparse(kolab_wap_url) else: result = None @@ -52,15 +52,16 @@ conn = None + def authenticate(username=None, password=None, domain=None): global session_id - if username == None: + if username is None: username = conf.get('ldap', 'bind_dn') - if password == None: + if password is None: password = conf.get('ldap', 'bind_pw') - if domain == None: + if domain is None: domain = conf.get('kolab', 'primary_domain') post = json.dumps( @@ -80,10 +81,11 @@ session_id = response'session_token' return True + def connect(uri=None): global conn, API_SSL, API_PORT, API_HOSTNAME, API_BASE - if not uri == None: + if uri is not None: result = urlparse(uri) if hasattr(result, 'scheme') and result.scheme == 'https': @@ -99,7 +101,7 @@ if hasattr(result, 'path'): API_BASE = result.path - if conn == None: + if conn is None: if API_SSL: conn = httplib.HTTPSConnection(API_HOSTNAME, API_PORT) else: @@ -109,6 +111,7 @@ return conn + def disconnect(quit=False): global conn, session_id @@ -120,15 +123,17 @@ conn.close() conn = None + def domain_add(domain, aliases=): dna = conf.get('ldap', 'domain_name_attribute') post = json.dumps({ - dna: domain + aliases + dna: domain + aliases }) return request('POST', 'domain.add', post=post) + def domain_delete(domain, force=False): domain_id, domain_attrs = domain_find(domain).popitem() @@ -141,34 +146,41 @@ return request('POST', 'domain.delete', post=post) + def domain_find(domain): dna = conf.get('ldap', 'domain_name_attribute') - get = { dna: domain } + get = {dna: domain} return request('GET', 'domain.find', get=get) + def domain_info(domain): domain_id, domain_attrs = domain_find(domain) - get = { 'id': domain_id } + get = {'id': domain_id} return request('GET', 'domain.info', get=get) + def domains_capabilities(): return request('GET', 'domains.capabilities') + def domains_list(): return request('GET', 'domains.list') + def form_value_generate(params): post = json.dumps(params) return request('POST', 'form_value.generate', post=post) + def form_value_generate_password(*args, **kw): return request('GET', 'form_value.generate_password') + def form_value_list_options(object_type, object_type_id, attribute): post = json.dumps( { @@ -180,24 +192,26 @@ return request('POST', 'form_value.list_options', post=post) + def form_value_select_options(object_type, object_type_id, attribute): post = json.dumps( { 'object_type': object_type, 'type_id': object_type_id, - 'attributes': attribute + 'attributes': attribute } ) return request('POST', 'form_value.select_options', post=post) + def get_group_input(): group_types = group_types_list() if len(group_types) > 1: for key in group_types: if not key == "status": - print("%s) %s" % (key,group_typeskey'name')) + print("%s) %s" % (key, group_typeskey'name')) group_type_id = utils.ask_question("Please select the group type") @@ -223,11 +237,12 @@ paramsattribute = utils.ask_question(attribute) for attribute in group_type_info'auto_form_fields': - retval = eval("group_form_value_generate_%s(params)" % (attribute)) + retval = eval("group_form_value_generate_%s(params)" % attribute) paramsattribute = retvalattribute return params + def get_user_input(): user_types = user_types_list() @@ -235,7 +250,7 @@ print("") for key in user_types'list': if not key == "status": - print("%s) %s" % (key,user_types'list'key'name')) + print("%s) %s" % (key, user_types'list'key'name')) print("") user_type_id = utils.ask_question("Please select the user type") @@ -264,7 +279,8 @@ for attribute in user_type_info'form_fields': if isinstance(user_type_info'form_fields'attribute, dict): - if 'optional' in user_type_info'form_fields'attribute and user_type_info'form_fields'attribute'optional': + if ('optional' in user_type_info'form_fields'attribute) and \ + user_type_info'form_fields'attribute'optional': may_attrs.append(attribute) else: must_attrs.append(attribute) @@ -284,7 +300,7 @@ default = attribute_valuesattribute'default' paramsattribute = utils.ask_menu( - "Choose the %s value" % (attribute), + "Choose the %s value" % attribute, attribute_valuesattribute'list', default=default ) @@ -295,7 +311,7 @@ default = user_type_info'form_fields'attribute'default' paramsattribute = utils.ask_menu( - "Choose the %s value" % (attribute), + "Choose the %s value" % attribute, user_type_info'form_fields'attribute'values', default=default ) @@ -314,16 +330,18 @@ return params + def group_add(params=None): - if params == None: + if params is None: params = get_group_input() post = json.dumps(params) return request('POST', 'group.add', post=post) + def group_delete(params=None): - if params == None: + if params is None: params = { 'id': utils.ask_question("Name of group to delete", "group") } @@ -332,66 +350,79 @@ return request('POST', 'group.delete', post=post) + def group_form_value_generate_mail(params=None): - if params == None: + if params is None: params = get_user_input() params = json.dumps(params) return request('POST', 'group_form_value.generate_mail', params) + def group_find(params=None): - post = { 'search': { 'params': {} } } + post = {'search': {'params': {}}} - for (k,v) in params.items(): - post'search''params'k = { 'value': v, 'type': 'exact' } + for (k, v) in params.items(): + post'search''params'k = {'value': v, 'type': 'exact'} return request('POST', 'group.find', post=json.dumps(post)) + def group_info(group=None): - if group == None: + if group is None: group = utils.ask_question("group DN") - return request('GET', 'group.info', get={ 'id': group }) + return request('GET', 'group.info', get={'id': group}) + def group_members_list(group=None): - if group == None: + if group is None: group = utils.ask_question("Group email address") - group = request('GET', 'group.members_list?group=%s' % (group)) + group = request('GET', 'group.members_list?group=%s' % group) return group + def group_types_list(): return request('GET', 'group_types.list') + def groups_list(params={}): return request('POST', 'groups.list', post=json.dumps(params)) + def ou_add(params={}): return request('POST', 'ou.add', post=json.dumps(params)) + def ou_delete(params={}): return request('POST', 'ou.delete', post=json.dumps(params)) + def ou_edit(params={}): return request('POST', 'ou.edit', post=json.dumps(params)) + def ou_find(params=None): - post = { 'search': { 'params': {} } } + post = {'search': {'params': {}}} - for (k,v) in params.items(): - post'search''params'k = { 'value': v, 'type': 'exact' } + for (k, v) in params.items(): + post'search''params'k = {'value': v, 'type': 'exact'} return request('POST', 'ou.find', post=json.dumps(post)) + def ou_info(ou): - _params = { 'id': ou } + _params = {'id': ou} ou = request('GET', 'ou.info', get=_params) return ou + def ous_list(params={}): return request('POST', 'ous.list', post=json.dumps(params)) + def request(method, api_uri, get=None, post=None, headers={}): response_data = request_raw(method, api_uri, get, post, headers) @@ -402,10 +433,11 @@ print("%s: %s (code %s)" % (response_data'status', response_data'reason', response_data'code')) return False + def request_raw(method, api_uri, get=None, post=None, headers={}, isretry=False): global session_id - if not session_id == None: + if session_id is not None: headers"X-Session-Token" = session_id reconnect = False @@ -414,12 +446,12 @@ if conf.debuglevel > 8: conn.set_debuglevel(9) - if not get == None: + if get is not None: _get = "?%s" % (urllib.urlencode(get)) else: _get = "" - log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE,api_uri), (get, post)), level=8) + log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE, api_uri), (get, post)), level=8) try: conn.request(method.upper(), "%s/%s%s" % (API_BASE, api_uri, _get), post, headers) @@ -427,7 +459,7 @@ response = conn.getresponse() data = response.read() - log.debug(_("Got response: %r") % (data), level=8) + log.debug(_("Got response: %r") % data, level=8) except (httplib.BadStatusLine, httplib.CannotSendRequest) as e: if isretry: @@ -448,41 +480,48 @@ return response_data + def resource_add(params=None): - if params == None: + if params is None: params = get_user_input() return request('POST', 'resource.add', post=json.dumps(params)) + def resource_delete(params=None): - if params == None: + if params is None: params = { 'id': utils.ask_question("Resource DN to delete", "resource") } return request('POST', 'resource.delete', post=json.dumps(params)) + def resource_find(params=None): - post = { 'search': { 'params': {} } } + post = {'search': {'params': {}}} - for (k,v) in params.items(): - post'search''params'k = { 'value': v, 'type': 'exact' } + for (k, v) in params.items(): + post'search''params'k = {'value': v, 'type': 'exact'} return request('POST', 'resource.find', post=json.dumps(post)) + def resource_info(resource=None): - if resource == None: + if resource is None: resource = utils.ask_question("Resource DN") - return request('GET', 'resource.info', get={ 'id': resource }) + return request('GET', 'resource.info', get={'id': resource}) + def resource_types_list(): return request('GET', 'resource_types.list') + def resources_list(params={}): return request('POST', 'resources.list', post=json.dumps(params)) + def role_add(params=None): - if params == None: + if params is None: role_name = utils.ask_question("Role name") params = { 'cn': role_name @@ -492,11 +531,13 @@ return request('POST', 'role.add', params) + def role_capabilities(): return request('GET', 'role.capabilities') + def role_delete(params=None): - if params == None: + if params is None: role_name = utils.ask_question("Role name") role = role_find_by_attribute({'cn': role_name}) params = { @@ -513,70 +554,81 @@ return request('POST', 'role.delete', post=post) + def role_find_by_attribute(params=None): - if params == None: + if params is None: role_name = utils.ask_question("Role name") else: role_name = params'cn' - get = { 'cn': role_name } + get = {'cn': role_name} role = request('GET', 'role.find_by_attribute', get=get) return role + def role_info(role_name): role = role_find_by_attribute({'cn': role_name}) - get = { 'role': role'id' } + get = {'role': role'id'} role = request('GET', 'role.info', get=get) return role + def roles_list(): return request('GET', 'roles.list') + def sharedfolder_add(params=None): - if params == None: + if params is None: params = get_user_input() return request('POST', 'sharedfolder.add', post=json.dumps(params)) + def sharedfolder_delete(params=None): - if params == None: + if params is None: params = { 'id': utils.ask_question("Shared Folder DN to delete", "sharedfolder") } return request('POST', 'sharedfolder.delete', post=json.dumps(params)) + def sharedfolders_list(params={}): return request('POST', 'sharedfolders.list', post=json.dumps(params)) + def system_capabilities(domain=None): - return request('GET', 'system.capabilities', get={'domain':domain}) + return request('GET', 'system.capabilities', get={'domain': domain}) + def system_get_domain(): return request('GET', 'system.get_domain') + def system_select_domain(domain=None): - if domain == None: + if domain is None: domain = utils.ask_question("Domain name") - get = { 'domain': domain } + get = {'domain': domain} return request('GET', 'system.select_domain', get=get) + def user_add(params=None): - if params == None: + if params is None: params = get_user_input() params = json.dumps(params) return request('POST', 'user.add', post=params) + def user_delete(params=None): - if params == None: + if params is None: params = { 'id': utils.ask_question("Username for user to delete", "user") } @@ -585,8 +637,9 @@ return request('POST', 'user.delete', post=post) -def user_edit(user = None, attributes={}): - if user == None: + +def user_edit(user=None, attributes={}): + if user is None: get = { 'id': utils.ask_question("Username for user to edit", "user") } @@ -606,12 +659,13 @@ return user_edit + def user_find(attribs=None): - if attribs == None: + if attribs is None: post = { 'search': { 'params': { - utils.ask_question("Attribute") : { + utils.ask_question("Attribute"): { 'value': utils.ask_question("value"), 'type': 'exact' } @@ -619,10 +673,10 @@ } } else: - post = { 'search': { 'params': {} } } + post = {'search': {'params': {}}} - for (k,v) in attribs.items(): - post'search''params'k = { 'value': v, 'type': 'exact' } + for (k, v) in attribs.items(): + post'search''params'k = {'value': v, 'type': 'exact'} post = json.dumps(post) @@ -630,38 +684,44 @@ return user + def user_form_value_generate(params=None): - if params == None: + if params is None: params = get_user_input() post = json.dumps(params) return request('POST', 'form_value.generate', post=post) + def user_form_value_generate_uid(params=None): - if params == None: + if params is None: params = get_user_input() params = json.dumps(params) return request('POST', 'form_value.generate_uid', params) + def user_form_value_generate_userpassword(*args, **kw): result = form_value_generate_password() - return { 'userpassword': result'password' } + return {'userpassword': result'password'} + def user_info(user=None): - if user == None: + if user is None: user = utils.ask_question("User email address") - _params = { 'id': user } + _params = {'id': user} user = request('GET', 'user.info', get=_params) return user + def users_list(params={}): return request('POST', 'users.list', post=json.dumps(params)) + def user_types_list(): return request('GET', 'user_types.list')
View file
pykolab-0.9.0.tar.gz/tests/unit/test-011-itip.py
Changed
@@ -457,6 +457,7 @@ itip_event = itip.events_from_message(message_from_string(itip_recurring))0 self.assertTrue(itip.check_event_conflict(event3, itip_event), "Conflict in two recurring events") + self.assertTrue(itip.check_event_conflict_impl(itip_event'xml', event3), "Conflict in two recurring events reverse") event4 = Event() event4.set_recurrence(rrule) @@ -530,6 +531,23 @@ self.assertFalse(itip.check_event_conflict(event, itip_event), "Conflicting dates (exception)") + def test_002_check_event_conflict_forever_recurring(self): + # This test is here to make sure performance issue is fixed (T1988) + # make the event recurring forever + itip_recurring_forever = itip_recurring.replace("RRULE:FREQ=DAILY;INTERVAL=1;COUNT=5", "RRULE:FREQ=WEEKLY;BYDAY=MO") + itip_event = itip.events_from_message(message_from_string(itip_recurring_forever))0 + + rrule = kolabformat.RecurrenceRule() + rrule.setFrequency(kolabformat.RecurrenceRule.Weekly) + + event = Event() + event.set_recurrence(rrule) + event.set_start(datetime.datetime(2012, 6, 29, 9, 30, 0, tzinfo=pytz.utc)) + event.set_end(datetime.datetime(2012, 6, 29, 10, 30, 0, tzinfo=pytz.utc)) + + self.assertFalse(itip.check_event_conflict_impl(event, itip_event'xml'), "No conflict") + self.assertFalse(itip.check_event_conflict_impl(itip_event'xml', event), "No conflict, reverse") + def test_003_send_reply(self): itip_events = itip.events_from_message(message_from_string(itip_non_multipart)) itip.send_reply("resource-collection-car@example.org", itip_events, "SUMMARY=%(summary)s; STATUS=%(status)s; NAME=%(name)s;")
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.