Source code for geowatchutil.channel.geowatch_channel_kafka

from geowatchutil.channel.base import GeoWatchChannelTopic

from kafka import SimpleProducer, MultiProcessConsumer


[docs]class GeoWatchChannelKafka(GeoWatchChannelTopic): # Public group = None # Private _kafka_consumer = None _kafka_producer = None @classmethod
[docs] def encode(cls, message): return message.encode('utf-8')
@classmethod
[docs] def decode(cls, message): return message.decode('utf-8')
[docs] def send_message(self, message): self._kafka_producer.send_messages(self.topic, message)
[docs] def send_messages(self, messages): self._kafka_producer.send_messages(self.topic, *messages)
[docs] def get_messages_raw(self, count, block=True, timeout=5): return self._kafka_consumer.get_messages(count=count, block=True, timeout=timeout)
[docs] def close(self): self._client.close()
def __init__(self, client, topic, mode, num_procs=1, group=None): super(GeoWatchChannelKafka, self).__init__(client, topic, mode, num_procs=num_procs) self.group = group if mode == "duplex" or mode == "consumer": self._kafka_consumer = MultiProcessConsumer(self._client._client, self.group, self._client.topic_prefix + self.topic, num_procs=self.num_procs) if mode == "duplex" or mode == "producer": self._kafka_producer = SimpleProducer(self._client._client)