Source code for geowatchutil.client.geowatch_client_kinesis

from geowatchutil.client.base import GeoWatchClientTopic

import boto3


[docs]class GeoWatchClientKinesis(GeoWatchClientTopic):
[docs] def check_topic_exists(self, topic, timeout=5, verbose=True): exists = False try: self._client.describe_stream(StreamName=(self.topic_prefix + topic)) exists = True except: exists = False if verbose: if exists: print "Topic "+topic+" exists." else: print "Topic "+topic+" does not exist." return exists
[docs] def create_topic(self, topic, shards=1, timeout=5, verbose=True): if self.check_topic_exists(topic, timeout=timeout, verbose=verbose): return False created = False try: self._client.create_stream(StreamName=(self.topic_prefix + topic), ShardCount=shards) created = True except: created = False if verbose: if created: print "Topic created." else: print "Topic could not be created" return created
[docs] def delete_topic(self, topic, timeout=5, verbose=True): if not self.check_topic_exists(topic, timeout=timeout, verbose=verbose): return False deleted = False try: self._client.delete_stream(StreamName=(self.topic_prefix + topic)) deleted = True except: deleted = False if verbose: if deleted: print "Topic "+topic+" deleted." else: print "Topic "+topic+" could not be deleted" return deleted
[docs] def delete_topics(self, topics, ignore_errors=True, timeout=5, verbose=True): deleted = True for topic in topics: deleted = self.delete_topic(topic, timeout=timeout, verbose=verbose) if (not ignore_errors) and (not deleted): break return deleted
[docs] def list_topics(self, limit=100, verbose=True): streams = self._client.list_streams(Limit=limit) stream_names = streams[u'StreamNames'] return stream_names
[docs] def wait_topic(self, topic, verbose=False): if verbose: print "Waiting for topic ", topic waiter = self._client.get_waiter('stream_exists') waiter.wait(StreamName=(self.topic_prefix + topic))
def __init__(self, topic_prefix="", aws_region=None, aws_access_key_id=None, aws_secret_access_key=None): super(GeoWatchClientKinesis, self).__init__(backend="kinesis", topic_prefix=topic_prefix) if aws_region and aws_access_key_id and aws_secret_access_key: session = boto3.session.Session( region_name=aws_region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) self._client = session.client('kinesis') else: print "Could not create GeoWatch client for Kinesis Backend. Missing parameters."