Source code for geowatchutil.channel.geowatch_channel_kinesis

from geowatchutil.channel.base import GeoWatchChannelTopic


[docs]class GeoWatchChannelKinesis(GeoWatchChannelTopic): # Public shard_it_type = 'LATEST' # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by a specific sequence number. # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number. # TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard. # LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard. shard_id = None # Private _shard_it = None @classmethod
[docs] def encode(self, message): return message
@classmethod
[docs] def decode(self, message): return message
[docs] def send_message(self, message): partition_key = message[:256] self._client._client.put_record( StreamName=(self._client.topic_prefix + self.topic), Data=message, PartitionKey=partition_key)
[docs] def send_messages(self, messages): records = [] for message in messages: partition_key = message[:256] records.append({'Data': message, 'PartitionKey': partition_key}) self._client._client.put_records(Records=records, StreamName=(self._client.topic_prefix + self.topic))
[docs] def get_messages_raw(self, count, block=True, timeout=5): """ get_messages_raw will return raw Kinesis objects """ return self._client._client.get_records(ShardIterator=self._shard_it, Limit=count)
def __init__(self, client, topic, mode, num_procs=1, shard_id=u'shardId-000000000000', shard_it_type='LATEST'): super(GeoWatchChannelKinesis, self).__init__(client, topic, mode, num_procs=num_procs) self.shard_it_type = shard_it_type self.shard_id = shard_id if mode == "duplex" or mode == "consumer": # Special Note: # GetShardIterator has a limit of 5 transactions per second per account per open shard. # http://boto3.readthedocs.org/en/latest/reference/services/kinesis.html#Kinesis.Client.get_shard_iterator response = self._client._client.get_shard_iterator( StreamName=(self._client.topic_prefix + self.topic), ShardId=self.shard_id, ShardIteratorType=self.shard_it_type) self._shard_it = response['ShardIterator'] else: self._shard_it = None