Source code for geowatchutil.client.geowatch_client_kafka

from geowatchutil.client.base import GeoWatchClientTopic

from kafka import KafkaClient


[docs]class GeoWatchClientKafka(GeoWatchClientTopic):
[docs] def check_topic_exists(self, topic, timeout=5, verbose=True): exists = False try: md = self._client.has_metadata_for_topic(self.topic_prefix + topic) exists = (md is not None) except: exists = False if verbose: if exists: print "Topic "+topic+" exists." else: print "Topic "+topic+" does not exist." return exists
[docs] def create_topic(self, topic, shards=1, timeout=5, verbose=True): if self.check_topic_exists(topic, timeout=timeout, verbose=verbose): return False created = False try: self._client.ensure_topic_exists(self.topic_prefix + topic, timeout=timeout) created = True except: created = False if verbose: if created: print "Topic created." else: print "Topic could not be created" return created
[docs] def delete_topic(self, topic, timeout=5, verbose=True): return False # kafka client cannot delete topics. Only server can.
[docs] def delete_topics(self, topics, ignore_errors=True, timeout=5, verbose=True): return False # kafka client cannot delete topics. Only server can.
[docs] def list_topics(self, limit=100, verbose=True): return self._client.topic_partitions.keys()
[docs] def close(self): return self._client.close()
def __init__(self, topic_prefix="", host=None): super(GeoWatchClientKafka, self).__init__(backend="kafka", topic_prefix=topic_prefix) if host: self._client = KafkaClient(host) else: print "Could not create GeoWatch client for kafka backend. Missing parameters."