Source code for geowatchutil.store.geowatch_store_s3

import boto3

from geowatchutil.store.base import GeoWatchStore


[docs]class GeoWatchStoreS3(GeoWatchStore): # Public bucket = None # Private _client = None
[docs] def read(self): return self._codec.unpack(self._get())
def _get(self): return self._client.get_object(Bucket=self.bucket, Key=self.key).decode('utf-8') def _put(self, package): return self._client.put_object( Bucket=self.bucket, Key=self.key, Body=package.encode('utf-8'), ContentType=self._codec.content_type)
[docs] def flush(self): messages = self._buffer.get_messages() if self.which == "first": self._put(self._codec.pack(messages, which=self.which)) # _codec.pack returns text representation elif self.which == "index": self._put(self._codec.pack(messages, which=self.which, which_index=self.which_index)) # _codec.pack returns text representation else: self._put(self._codec.pack(messages)) # _codec.pack returns text representation self._buffer.clear()
[docs] def create_bucket(self, bucket): return self._client.create_bucket(Bucket=self.bucket)
[docs] def delete_bucket(self, bucket): return self._client.delete_bucket(Bucket=self.bucket)
[docs] def close(self): pass
def __init__(self, key, codec, aws_region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_bucket=None, which="all", which_index=0): super(GeoWatchStoreS3, self).__init__("s3", key, codec, which=which, which_index=which_index) if aws_region and aws_access_key_id and aws_secret_access_key and aws_bucket: session = boto3.session.Session( region_name=aws_region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) self._client = session.client('s3') self.bucket = aws_bucket else: print "Could not create GeoWatch client for S3 Backend. Missing parameters."