Source code for geowatchutil.runtime

"""
Provides runtime functions.  These functions wrap factory functions with exception handling and timeouts.
"""
import copy
import time

from geowatchutil.base import GeoWatchError, GeoWatchModeError


[docs]def provision_consumer(backend, **kwargs): return provision_node(backend, "consumer", **kwargs)
[docs]def provision_producer(backend, **kwargs): return provision_node(backend, "producer", **kwargs)
[docs]def provision_duplex(backend, **kwargs): return provision_node(backend, "duplex", **kwargs)
[docs]def provision_node(backend, mode, **kwargs): """ Provision a new GeoWatch Node. ``backend`` is either: 'kafka', 'kinesis', 'slack', 'sns', or 'sqs'. If ``client=None`` in ``kwargs``, :meth:`provision_producer` will create a client. :meth:`provision_producer` returns a tuple ``(client, producer)``. **Examples** .. code-block:: python from geowatchutil.runtime import provision_producer client, consumer = provision_producer('kafka', host="localhost") client, consumer = provision_producer( 'kinesis', aws_region=aws_region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, topic_prefix=topic_prefix) """ print "provision_node" if mode not in ["consumer", "producer", "duplex"]: raise GeoWatchModeError("GeoWatch mode error in provision_node.") node = None client = kwargs.pop('client', None) topic = kwargs.pop('topic', None) codec = kwargs.pop('codec', None) verbose = kwargs.get('verbose', False) max_tries = kwargs.get('max_tries', 3) sleep_period = kwargs.get('sleep_period', 5) topic_check = kwargs.get('topic_check', True) # Defaults to True tries = 0 while tries < max_tries: #try: if 1 == 1: if not client: from geowatchutil.client.factory import build_client client = build_client(backend, **kwargs) if client: if topic and topic_check: if not client.check_topic_exists(topic, verbose=verbose): client.create_topic(topic) try: client.wait_topic(topic, verbose=verbose) except: print "Waited for topic. Topic was never created." if client.check_topic_exists(topic, verbose=verbose): from geowatchutil.node.factory import build_node node = build_node(client, mode, codec, topic, **kwargs) else: from geowatchutil.node.factory import build_node node = build_node(client, mode, codec, topic, **kwargs) if node: break tries += 1 print "Tried to initialize node. Try ", tries, ". Trying again.." time.sleep(sleep_period) return (client, node)
[docs]def provision_store(backend, key, codec, **kwargs): """ Provision a new GeoWatch store. ``backend`` is either: 'file', 'memcached', 's3', or 'wfs'. ``codec`` is either: 'plain', 'json', 'tilerequest', or 'wfs'. :meth:`provision_store` returns a ``store``. **Examples** .. code-block:: python from geowatchutil.runtime import provision_store store = provision_store( 's3', 'results.json', 'json', aws_region=aws_region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_bucket=aws_bucket) """ store = None verbose = kwargs.get('verbose', False) max_tries = kwargs.get('max_tries', 12) sleep_period = kwargs.get('sleep_period', 5) tries = 0 while tries < max_tries: try: from geowatchutil.store.factory import build_store store = build_store(backend, key, codec, **kwargs) except: if verbose: print "Error in provision_store. Could not get lock on GeoWatch server. Try "+str(tries)+"." store = None if store: break tries += 1 time.sleep(sleep_period) return store
[docs]def provision_brokers(watchlist, globalconfig=None, templates=None, brokerfilter=None, verbose=False): """ Provision new GeoWatch brokers. ``watchlist`` is a list of broker dict configurations. ``globalconfig`` is a dict configuration for shared variables, such as AWS Region, AWS Credentials, etc. Fallback for missing broker-specific values. If ``brokerfilter`` are non-None then they are used for filtering the brokers against their message filters. This is more efficient than initializing brokers that will never be used. :meth:`provision_brokers` returns a list of :class:`GeoWatchBroker` brokers. **Examples** .. code-block:: python from geowatchutil.runtime import provision_brokers config = {'aws_access_key_id':'', 'aws_secret_access_key':'', 'aws_region':''} brokers = provision_brokers(settings.GEOWATCH_BROKERS_CRON, config=config) for broker in brokers: broker.run(maxcycle=1) """ brokers = [] for w in watchlist: if w.get('enabled', True): valid = True if brokerfilter and ('filter_metadata' in w): for k in brokerfilter: if not ((k in w['filter_metadata']) and (brokerfilter[k] in w['filter_metadata'][k])): valid = False break if valid: b = provision_broker(w, globalconfig=globalconfig, templates=templates, verbose=verbose) brokers.append(b) return brokers
[docs]def provision_broker(brokerconfig, globalconfig, templates=None, verbose=True): """ Provision new GeoWatch broker. ``brokerconfig`` is a broker dict configurations. ``globalconfig`` is a dict configuration for shared variables, such as AWS Region, AWS Credentials, etc. Fallback for missing broker-specific values. :meth:`provision_broker` returns a :class:`GeoWatchBroker` broker. **Examples** .. code-block:: python from geowatchutil.runtime import provision_broker globalconfig = {'aws_access_key_id':'', 'aws_secret_access_key':'', 'aws_region':''} brokers = provision_brokers(settings.GEOWATCH_BROKERS_CRON, globalconfig=globalconfig) for broker in brokers: broker.run(maxcycle=1) """ from geowatchutil.broker.factory import build_broker broker_kwargs = build_broker_kwargs( brokerconfig, globalconfig, templates=templates, verbose=verbose) broker = build_broker( brokerconfig.get('name', None), brokerconfig.get('description', None), **broker_kwargs) return broker
[docs]def build_broker_kwargs(brokerconfig, globalconfig, templates=None, verbose=False): consumers = [] producers = [] duplex = [] stores_out = [] if 'consumers' in brokerconfig: for c in brokerconfig['consumers']: if c.get('enabled', True): c2 = copy.deepcopy(c) if globalconfig: c2.update(globalconfig) consumers.extend(_provision_geowatch_nodes("consumer", c2, verbose=verbose)) if 'producers' in brokerconfig: for c in brokerconfig['producers']: if c.get('enabled', True): c2 = copy.deepcopy(c) if globalconfig: c2.update(globalconfig) producers.extend(_provision_geowatch_nodes("producer", c2, templates=templates, verbose=verbose)) if 'duplex' in brokerconfig: for c in brokerconfig['duplex']: if c.get('enabled', True): c2 = copy.deepcopy(c) if globalconfig: c2.update(globalconfig) duplex.extend(_provision_geowatch_nodes("duplex", c2, templates=templates, verbose=verbose)) if 'stores_out' in brokerconfig: for c in brokerconfig['stores_out']: if c.get('enabled', True): c2 = copy.deepcopy(c) if globalconfig: c2.update(globalconfig) store = provision_store( c2['backend'], c2['key'], c2['codec'], ** c2['options']) stores_out.append(store) return { "filter_metadata": brokerconfig.get('filter_metadata', None), "count": brokerconfig.get('count', 1), "consumers": consumers, "producers": producers, "duplex": duplex, "stores_out": stores_out, "deduplicate": False, "verbose": verbose }
def _provision_geowatch_nodes(mode, c, templates=None, verbose=False): """ Builds and returns list of nodes based on config `c`. """ nodes = [] topics = c['topics'] if ('topics' in c) else ([c['topic']] if ('topic' in c) else []) client = None kwargs = { "topic_check": c.get('topic_check', None), "client": client, "codec": c['codec'], "authtoken": c.get('authtoken', None), "topic_prefix": c.get('topic_prefix', None), "aws_region": c.get('aws_region', None), "aws_access_key_id": c.get('aws_access_key_id', None), "aws_secret_access_key": c.get('aws_secret_access_key', None), "verbose": verbose } if mode == "producer" or mode =="duplex": kwargs.update({ "templates": (templates.get(c['templates'], None) if ('templates' in c) else None), "url_webhook": c.get('url_webhook', None) }) for topic in topics: kwargs.update({'topic': topic}) client, node = provision_node(c['backend'], mode, **kwargs) nodes.append(node) return nodes