Source code for geowatchutil.buffer.base

[docs]class GeoWatchBuffer(object): # Public limit = None check_limit = False # Private _messages = []
[docs] def add_message(self, message): if self.check_limit: if not self.full(): self._messages.append(message) else: raise GeoWatchBufferError("The new message would have exceeded the buffer's limit of "+self.limit+".") else: self._messages.append(message)
[docs] def add_messages(self, messages): if self.check_limit: if self.size() + len(messages) <= self.limit: self._messages.extend(messages) else: raise GeoWatchBufferError("The new messages would have exceeded the buffer's limit of "+self.limit+".") else: self._messages.extend(messages)
[docs] def get_messages(self): return self._messages
[docs] def pop_messages(self, count=-1): if count == -1 or count >= self.size(): r = self._messages self.clear() return r else: r = self._messages[:count] self._messages = self._messages[count:] return r
[docs] def full(self): if self.limit > 0: return len(self._messages) >= self.limit else: return False
[docs] def empty(self): return len(self._messages) == 0
[docs] def size(self): return len(self._messages)
[docs] def clear(self): self._messages = []
def __init__(self, limit=0, check_limit=False): self.limit = limit self.check_limit = check_limit self._messages = []
[docs]class GeoWatchBufferError(Exception): def __init__(self, * args, ** kwargs): super(GeoWatchBufferError, self).__init__(self, * args, ** kwargs)