From 246729d37623dfb46edb05750ef85f547a90417e Mon Sep 17 00:00:00 2001 From: Remy Moll Date: Tue, 26 Apr 2022 10:32:41 +0200 Subject: [PATCH] Fixed bug where duplicate threads would not get responded --- app/runner.py | 24 ++++++++------ app/utils_slack/message_helpers.py | 48 +++++++++++++++++----------- app/utils_slack/runner.py | 12 ++----- app/utils_storage/models.py | 1 + app/utils_worker/download/browser.py | 5 ++- app/utils_worker/fetch/runner.py | 7 ++++ 6 files changed, 56 insertions(+), 41 deletions(-) diff --git a/app/runner.py b/app/runner.py index a51bbd2..19ab559 100644 --- a/app/runner.py +++ b/app/runner.py @@ -13,8 +13,9 @@ from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, Up class ArticleWatcher: """Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" - def __init__(self, article, **kwargs) -> None: + def __init__(self, article, thread, **kwargs) -> None: self.article = article + self.thread = thread self.completition_notifier = kwargs.get("notifier") @@ -48,7 +49,7 @@ class ArticleWatcher: elif completed_action == "download": self.compress.process(self) elif completed_action == "compress": # last step - self.completition_notifier(self.article) + self.completition_notifier(self.article, self.thread) # triggers action in Coordinator elif completed_action == "upload": # this case occurs when upload was faster than compression @@ -122,16 +123,18 @@ class Coordinator(Thread): def incoming_request(self, message): """This method is passed onto the slack worker. It gets triggered when a new message is received.""" url = message.urls[0] # ignore all the other ones - a, is_new = models.ArticleDownload.get_or_create(article_url=url) - message.thread.article = a - message.thread.save() + article, is_new = models.ArticleDownload.get_or_create(article_url=url) + thread = message.thread + thread.article = article + thread.save() self.kwargs.update({"notifier" : self.article_complete_notifier}) - if is_new or (a.file_name == "" and a.verified == 0): + if is_new or (article.file_name == "" and article.verified == 0): # check for models that were created but were abandonned. This means they have missing information, most importantly no associated file # this overwrites previously set information, but that should not be too important ArticleWatcher( - a, + article, + thread, **self.kwargs ) @@ -140,7 +143,8 @@ class Coordinator(Thread): # the watcher orchestrates the procedure and notifies upon completition # the watcher will notify once it is sufficiently populated else: # manually trigger notification immediatly - self.article_complete_notifier(a) + logger.info(f"Found existing article {article}. Now sending") + self.article_complete_notifier(article, thread) @@ -152,8 +156,8 @@ class Coordinator(Thread): notifier = lambda article: print(f"Completed manual actions for {article}") ArticleWatcher(article, workers_manual = workers, notifier = notifier) - def article_complete_notifier(self, article): - self.worker_slack.bot_worker.respond_channel_message(article) + def article_complete_notifier(self, article, thread): + self.worker_slack.bot_worker.respond_channel_message(thread) self.worker_mail.send(article) diff --git a/app/utils_slack/message_helpers.py b/app/utils_slack/message_helpers.py index 782be69..5a67ab0 100644 --- a/app/utils_slack/message_helpers.py +++ b/app/utils_slack/message_helpers.py @@ -3,8 +3,8 @@ import configuration import requests import os import time -import asyncio import sys +from threading import Thread from slack_sdk.errors import SlackApiError logger = logging.getLogger(__name__) @@ -18,26 +18,29 @@ def init(client) -> None: global slack_client slack_client = client - # config["archive_id"] = channel_id + global LATEST_RECORDED_REACTION try: LATEST_RECORDED_REACTION = models.Reaction.select(models.Reaction.id).order_by("id")[-1] except IndexError: #query is actually empty, we have never fetched any messages until now LATEST_RECORDED_REACTION = 0 - # fetch all te messages we could have possibly missed + # fetch all te messages we could have possibly missed logger.info("Querying missed messages, threads and reactions. This can take some time.") - fetch_missed_channel_messages() - if "nofetch" in sys.argv: - logger.info("Omitted update of reactions and thread messages because of argument 'nofetch'.") + fetch_missed_channel_messages() # not threaded + t = Thread(target = fetch_missed_channel_reactions) # threaded, runs in background (usually takes a long time) + t.start() + + if "reducedfetch" in sys.argv: + logger.warning("Only fetching empty threads for bot messages because of argument 'reducedfetch'") + fetch_missed_thread_messages(reduced=True) else: # perform these two asyncronously - async def run_async(): - await asyncio.gather(fetch_missed_channel_reactions(), fetch_missed_thread_messages()) - asyncio.run(run_async()) + fetch_missed_thread_messages() + -def get_past_messages(): +def get_unhandled_messages(): """Gets all messages that have not yet been handled, be it by mistake or by downtime - As the message handler mkaes no distinction between channel messages and thread messages, + As the message handler makes no distinction between channel messages and thread messages, we don't have to worry about them here. """ @@ -51,10 +54,11 @@ def get_past_messages(): logger.info(f"Set {len(threaded_objects)} thread-messages as not yet handled.") - channel_objects = [t.initiator_message for t in models.Thread.select() if t.message_count == 1 and not t.is_fully_processed] + channel_objects = [t.initiator_message for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)] logger.info(f"Set {len(channel_objects)} channel-messages as not yet handled.") reaction_objects = list(models.Reaction.select().where(models.Reaction.id > LATEST_RECORDED_REACTION)) + logger.info(f"Set {len(reaction_objects)} reactions as not yet handled.") # the ones newer than the last before the fetch all_messages = channel_objects + threaded_objects @@ -108,11 +112,17 @@ def fetch_missed_channel_messages(): logger.info(f"Fetched {new_fetches} new channel messages.") -async def fetch_missed_thread_messages(): +def fetch_missed_thread_messages(reduced=False): """After having gotten all base-threads, we need to fetch all their replies""" # I don't know of a better way: we need to fetch this for each and every thread (except if it is marked as permanently solved) - logger.info("Starting async fetch of thread messages...") - threads = [t for t in models.Thread.select() if not t.is_fully_processed] + logger.info("Starting fetch of thread messages...") + if reduced: + threads = [t for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)] + # this only fetches completely empty threads, which might be because the bot-message was not yet saved to the db. + # once we got all the bot-messages the remaining empty threads will be the ones we need to process. + else: + threads = [t for t in models.Thread.select() if not t.is_fully_processed] + logger.info(f"Fetching history for {len(threads)} empty threads") new_messages = [] for i,t in enumerate(threads): try: @@ -123,7 +133,7 @@ async def fetch_missed_thread_messages(): )["messages"] except SlackApiError: logger.error("Hit rate limit while querying threaded messages, retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) - await asyncio.sleep(int(config["api_wait_time"])) + time.sleep(int(config["api_wait_time"])) messages = slack_client.conversations_replies( channel = config["archive_id"], ts = t.slack_ts, @@ -140,8 +150,8 @@ async def fetch_missed_thread_messages(): logger.info("Fetched {} new threaded messages.".format(len(new_messages))) -async def fetch_missed_channel_reactions(): - logger.info("Starting async fetch of channel reactions...") +def fetch_missed_channel_reactions(): + logger.info("Starting background fetch of channel reactions...") threads = [t for t in models.Thread.select() if not t.is_fully_processed] for i,t in enumerate(threads): try: @@ -152,7 +162,7 @@ async def fetch_missed_channel_reactions(): reactions = query["message"].get("reactions", []) # default = [] except SlackApiError: # probably a rate_limit: logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) - await asyncio.sleep(int(config["api_wait_time"])) + time.sleep(int(config["api_wait_time"])) reactions = query["message"].get("reactions", []) for r in reactions: diff --git a/app/utils_slack/runner.py b/app/utils_slack/runner.py index 1b146fa..450d460 100644 --- a/app/utils_slack/runner.py +++ b/app/utils_slack/runner.py @@ -1,9 +1,7 @@ -from threading import Thread from slack_bolt import App from slack_bolt.adapter.socket_mode import SocketModeHandler import logging -from rich.rule import Rule import configuration from . import message_helpers @@ -18,12 +16,11 @@ class BotApp(App): def __init__(self, callback, *args, **kwargs): super().__init__(*args, **kwargs) - # models = models self.callback = callback def start(self): message_helpers.init(self.client) - missed_messages, missed_reactions = message_helpers.get_past_messages() + missed_messages, missed_reactions = message_helpers.get_unhandled_messages() [self.handle_incoming_message(m) for m in missed_messages] [self.handle_incoming_reaction(r) for r in missed_reactions] @@ -122,10 +119,8 @@ class BotApp(App): ) - def respond_channel_message(self, article, say=message_helpers.say_substitute): - # extra={"markup": True} - # self.logger.info(Rule(url[:min(len(url), 30)])) - thread = article.slack_thread.execute()[0] + def respond_channel_message(self, thread, say=message_helpers.say_substitute): + article = thread.article answers = article.slack_info for a in answers: if a["file_path"]: @@ -149,7 +144,6 @@ class BotApp(App): thread_ts=thread.slack_ts ) status = True - # self.logger.info(Rule(f"Fully handled (success={status})")) def startup_status(self): diff --git a/app/utils_storage/models.py b/app/utils_storage/models.py index 7d2efd4..6ea604b 100644 --- a/app/utils_storage/models.py +++ b/app/utils_storage/models.py @@ -211,6 +211,7 @@ class Thread(ChatBaseModel): @property def message_count(self): + # logger.warning("message_count was called") return self.messages.count() @property diff --git a/app/utils_worker/download/browser.py b/app/utils_worker/download/browser.py index f1767b2..a747335 100644 --- a/app/utils_worker/download/browser.py +++ b/app/utils_worker/download/browser.py @@ -136,12 +136,11 @@ class PDFDownloader: hrefs = [e.get_attribute("href") for e in self.driver.find_elements_by_xpath("//a[@href]")] except: hrefs = [] - old = hrefs + len_old = len(hrefs) hrefs = [h for h in hrefs \ if not sum([(domain in h) for domain in blacklisted]) # sum([True, False, False, False]) == 1 (esp. not 0) ] # filter a tiny bit at least - diff = set(old) ^ set(hrefs) - self.logger.info(f"Removed {len(diff)} hrefs: {diff} (before:{len(old)}, after: {len(hrefs)})") + self.logger.info(f"Hrefs result (before:{len_old}, after: {len(hrefs)})") return hrefs diff --git a/app/utils_worker/fetch/runner.py b/app/utils_worker/fetch/runner.py index 960a0f2..70a7241 100644 --- a/app/utils_worker/fetch/runner.py +++ b/app/utils_worker/fetch/runner.py @@ -15,6 +15,7 @@ class NewspaperDummy(): title = "Error while running fetch" summary = "Error while running fetch" text = "Error while running fetch" + meta_lang = "" authors = [] keywords = [] @@ -23,6 +24,7 @@ def get_description(article_object): url = article_object.article_url website = urlparse(url).netloc article_object.source_name = website + try: pub_date = datetime.datetime.strptime(find_date(url), '%Y-%d-%M') except: # other file types @@ -50,6 +52,11 @@ def get_description(article_object): else: summary = fallback.summary + try: + print(f"lang: {news_article.meta_lang}") + except: + print("could not access meta_lang") + if news_article.meta_lang: lang = news_article.meta_lang else: