Source code for geowatchutil.client.geowatch_client_slack

import json

from slackclient import SlackClient

from geowatchutil.client.base import GeoWatchClientWebHook


[docs]class GeoWatchClientSlack(GeoWatchClientWebHook): # Private url_api = "https://slack.com/api"
[docs] def check_topic_exists(self, channel, timeout=5, verbose=True): exists = False self.join_channel(channel, verbose=verbose) try: url = "{base}/channels.info?token={authtoken}&channel={channel}".format( base=self.url_api, authtoken=self.authtoken, channel=channel) self._get(url) exists = True except: exists = False if verbose: if exists: print "Channel "+channel+" exists." else: print "Channel "+channel+" does not exist." return exists
[docs] def create_channel(self, channel, shards=1, timeout=5, verbose=True): if self.check_channel_exists(channel, timeout=timeout, verbose=verbose): return False created = False try: url = "{base}/channels.create?token={authtoken}&name={channel}".format( base=self.url_api, authtoken=self.authtoken, channel=channel) self._get(url) created = True except: created = False if verbose: if created: print "Channel "+channel+" created." else: print "Channel "+channel+" could not be created" return created
[docs] def join_channel(self, channel, verbose=True): if verbose: print "Joining channel "+channel print "Bots currently can't join channels. You need to invite manually with /invite @botname"
# https://github.com/slackhq/node-slack-client/issues/26 # self._client.api_call("channels.join", channel="channel")
[docs] def archive_channel(self, channel, timeout=5, verbose=True): if not self.check_channel_exists(channel, timeout=timeout, verbose=verbose): return False archived = False try: url = "{base}/channels.archive?token={authtoken}&channel={channel}".format( base=self.url_api, authtoken=self.authtoken, channel=channel) self._get(url) archived = True except: archived = False if verbose: if archived: print "Channel "+channel+" archived." else: print "Channel "+channel+" could not be archived." return archived
[docs] def archive_channels(self, channels, ignore_errors=True, timeout=5, verbose=False): archived = True for channel in channels: archived = self.archive_channel(channel, timeout=timeout, verbose=verbose) if (not ignore_errors) and (not archived): break return archived
[docs] def list_channels(self, exclude_archived=True, verbose=False): if self.authtoken: url = "{base}/channels.list?token={authtoken}&exclude_archived={exclude_archived}".format( base=self.url_api, authtoken=self.authtoken, exclude_archived=exclude_archived) response = self._get(url) data = json.loads(response) if verbose: print response channels = [] for channel in data['channels']: channels.append(channel['name']) return channels else: print "No authtoken present." return []
def __init__(self, url_webhook="", authtoken=None, templates=None): super(GeoWatchClientSlack, self).__init__( backend="slack", url_webhook=url_webhook, authtoken=authtoken, templates=templates) if authtoken: self._client = SlackClient(authtoken) d = None try: r = json.loads(self._client.api_call("auth.test")) self._user_id = r[u'user_id'] self._user_name = r[u'user'] except: print "Could not initialize Slack Client user"