Source code for geowatchutil.channel.base

from geowatchutil.base import GeoWatchError


[docs]class GeoWatchChannel(object): # Public backend = None mode = None num_procs = None # Private _client = None
[docs] def close(self): pass
def __init__(self, client, mode, num_procs=1): self._client = client self.backend = self._client.backend self.mode = mode self.num_procs = num_procs def __enter__(self): return self def __exit__(self, *args, **kwargs): self.close()
[docs]class GeoWatchChannelTopic(GeoWatchChannel): # Public topic = None def __init__(self, client, topic, mode, num_procs=1): super(GeoWatchChannelTopic, self).__init__(client, mode, num_procs=1) self.topic = topic
[docs]class GeoWatchChannelError(GeoWatchError): def __init__(self, * args, ** kwargs): super(GeoWatchChannelError, self).__init__(self, * args, ** kwargs)