import logging import configuration import requests import os import time import asyncio import sys from slack_sdk.errors import SlackApiError logger = logging.getLogger(__name__) config = configuration.parsed["SLACK"] models = configuration.models slack_client = "dummy" LATEST_RECORDED_REACTION = 0 def init(client) -> None: global slack_client slack_client = client # config["archive_id"] = channel_id try: LATEST_RECORDED_REACTION = models.Reaction.select(models.Reaction.id).order_by("id")[-1] except IndexError: #query is actually empty, we have never fetched any messages until now LATEST_RECORDED_REACTION = 0 # fetch all te messages we could have possibly missed logger.info("Querying missed messages, threads and reactions. This can take some time.") fetch_missed_channel_messages() if "nofetch" in sys.argv: logger.info("Omitted update of reactions and thread messages because of argument 'nofetch'.") else: # perform these two asyncronously async def run_async(): await asyncio.gather(fetch_missed_channel_reactions(), fetch_missed_thread_messages()) asyncio.run(run_async()) def get_past_messages(): """Gets all messages that have not yet been handled, be it by mistake or by downtime As the message handler mkaes no distinction between channel messages and thread messages, we don't have to worry about them here. """ threaded_objects = [] for t in models.Thread.select(): if t.message_count > 1: # if only one message was written, it is the channel message msg = t.last_message if msg.is_by_human: threaded_objects.append(msg) # else don't, nothing to process logger.info(f"Set {len(threaded_objects)} thread-messages as not yet handled.") channel_objects = [t.initiator_message for t in models.Thread.select() if t.message_count == 1 and not t.is_fully_processed] logger.info(f"Set {len(channel_objects)} channel-messages as not yet handled.") reaction_objects = list(models.Reaction.select().where(models.Reaction.id > LATEST_RECORDED_REACTION)) # the ones newer than the last before the fetch all_messages = channel_objects + threaded_objects return all_messages, reaction_objects def fetch_missed_channel_messages(): # latest processed message_ts is: presaved = models.Message.select().order_by(models.Message.ts) if not presaved: last_ts = 0 else: last_message = presaved[-1] last_ts = last_message.slack_ts result = slack_client.conversations_history( channel=config["archive_id"], oldest=last_ts ) new_messages = result.get("messages", []) # # filter the last one, it is a duplicate! (only if the db is not empty!) # if last_ts != 0 and len(new_messages) != 0: # new_messages.pop(-1) new_fetches = 0 for m in new_messages: # print(m) message_dict_to_model(m) new_fetches += 1 refetch = result.get("has_more", False) while refetch: # we have not actually fetched them all try: result = slack_client.conversations_history( channel = config["archive_id"], cursor = result["response_metadata"]["next_cursor"], oldest = last_ts ) # fetches 100 messages, older than the [-1](=oldest) element of new_fetches refetch = result.get("has_more", False) new_messages = result.get("messages", []) for m in new_messages: message_dict_to_model(m) new_fetches += 1 except SlackApiError: # Most likely a rate-limit logger.error("Error while fetching channel messages. (likely rate limit) Retrying in {} seconds...".format(config["api_wait_time"])) time.sleep(config["api_wait_time"]) refetch = True logger.info(f"Fetched {new_fetches} new channel messages.") async def fetch_missed_thread_messages(): """After having gotten all base-threads, we need to fetch all their replies""" # I don't know of a better way: we need to fetch this for each and every thread (except if it is marked as permanently solved) logger.info("Starting async fetch of thread messages...") threads = [t for t in models.Thread.select() if not t.is_fully_processed] new_messages = [] for i,t in enumerate(threads): try: messages = slack_client.conversations_replies( channel = config["archive_id"], ts = t.slack_ts, oldest = t.messages[-1].slack_ts )["messages"] except SlackApiError: logger.error("Hit rate limit while querying threaded messages, retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) await asyncio.sleep(config["api_wait_time"]) messages = slack_client.conversations_replies( channel = config["archive_id"], ts = t.slack_ts, oldest = t.messages[-1].slack_ts )["messages"] messages.pop(0) # the first message is the one posted in the channel. We already processed it! for m in messages: # only append *new* messages res = message_dict_to_model(m) if res: new_messages.append(res) logger.info("Fetched {} new threaded messages.".format(len(new_messages))) async def fetch_missed_channel_reactions(): logger.info("Starting async fetch of channel reactions...") threads = [t for t in models.Thread.select() if not t.is_fully_processed] for i,t in enumerate(threads): try: query = slack_client.reactions_get( channel = config["archive_id"], timestamp = t.slack_ts ) reactions = query["message"].get("reactions", []) # default = [] except SlackApiError: # probably a rate_limit: logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) await asyncio.sleep(config["api_wait_time"]) reactions = query["message"].get("reactions", []) for r in reactions: reaction_dict_to_model(r, t) # Helpers for message conversion to db-objects def reaction_dict_to_model(reaction, thread=None): if thread is None: m_ts = reaction["item"]["ts"] message = models.Message.get(ts = float(m_ts)) thread = message.thread if "name" in reaction.keys(): # fetched through manual api query content = reaction["name"] elif "reaction" in reaction.keys(): # fetched through events content = reaction["reaction"] else: logger.error(f"Weird reaction received: {reaction}") return None r, _ = models.Reaction.get_or_create( type = content, message = thread.initiator_message ) logger.info("Saved reaction [{}]".format(content)) return r def message_dict_to_model(message): if message["type"] == "message": thread_ts = message["thread_ts"] if "thread_ts" in message else message["ts"] uid = message.get("user", "BAD USER") if uid == "BAD USER": logger.critical("Message has no user?? {}".format(message)) return None user, _ = models.User.get_or_create(user_id = uid) thread, _ = models.Thread.get_or_create(thread_ts = thread_ts) m, new = models.Message.get_or_create( user = user, thread = thread, ts = message["ts"], channel_id = config["archive_id"], text = message["text"] ) logger.info("Saved (text) {} (new={})".format(m, new)) for f in message.get("files", []): #default: [] m.file_type = f["filetype"] m.perma_link = f["url_private_download"] m.save() logger.info("Saved permalink {} to {} (possibly overwriting)".format(f["name"], m)) if new: return m else: return None else: logger.warning("What should I do of {}".format(message)) return None def say_substitute(*args, **kwargs): logger.info("Now sending message through say-substitute: {}".format(" - ".join(args))) slack_client.chat_postMessage( channel=config["archive_id"], text=" - ".join(args), **kwargs ) def save_as_related_file(url, article_object): r = requests.get(url, headers={"Authorization": "Bearer {}".format(slack_client.token)}) saveto = article_object.save_path ftype = url[url.rfind(".") + 1:] fname = "{} - related no {}.{}".format( article_object.file_name.replace(".pdf",""), len(article_object.related) + 1, ftype ) with open(os.path.join(saveto, fname), "wb") as f: f.write(r.content) article_object.set_related([fname]) logger.info("Added {} to model {}".format(fname, article_object)) return fname def react_file_path_message(fname, article_object): saveto = article_object.save_path file_path = os.path.join(saveto, fname) if os.path.exists(file_path): article_object.set_related([fname]) logger.info("Added {} to model {}".format(fname, article_object)) return True else: return False def is_message_in_archiving(message) -> bool: if isinstance(message, dict): return message["channel"] == config["archive_id"] else: return message.channel_id == config["archive_id"] def is_reaction_in_archiving(event) -> bool: if isinstance(event, dict): return event["item"]["channel"] == config["archive_id"] else: return event.message.channel_id == config["archive_id"]