[docs]class GeoWatchBuffer(object):
# Public
limit = None
check_limit = False
# Private
_messages = []
[docs] def add_message(self, message):
if self.check_limit:
if not self.full():
self._messages.append(message)
else:
raise GeoWatchBufferError("The new message would have exceeded the buffer's limit of "+self.limit+".")
else:
self._messages.append(message)
[docs] def add_messages(self, messages):
if self.check_limit:
if self.size() + len(messages) <= self.limit:
self._messages.extend(messages)
else:
raise GeoWatchBufferError("The new messages would have exceeded the buffer's limit of "+self.limit+".")
else:
self._messages.extend(messages)
[docs] def get_messages(self):
return self._messages
[docs] def pop_messages(self, count=-1):
if count == -1 or count >= self.size():
r = self._messages
self.clear()
return r
else:
r = self._messages[:count]
self._messages = self._messages[count:]
return r
[docs] def full(self):
if self.limit > 0:
return len(self._messages) >= self.limit
else:
return False
[docs] def empty(self):
return len(self._messages) == 0
[docs] def size(self):
return len(self._messages)
[docs] def clear(self):
self._messages = []
def __init__(self, limit=0, check_limit=False):
self.limit = limit
self.check_limit = check_limit
self._messages = []
[docs]class GeoWatchBufferError(Exception):
def __init__(self, * args, ** kwargs):
super(GeoWatchBufferError, self).__init__(self, * args, ** kwargs)