Fixed bug where duplicate threads would not get responded

This commit is contained in:
Remy Moll 2022-04-26 10:32:41 +02:00
parent 024da446e7
commit 246729d376
6 changed files with 56 additions and 41 deletions

View File

@ -13,8 +13,9 @@ from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, Up
class ArticleWatcher: class ArticleWatcher:
"""Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" """Wrapper for a newly created article object. Notifies the coordinator upon change/completition"""
def __init__(self, article, **kwargs) -> None: def __init__(self, article, thread, **kwargs) -> None:
self.article = article self.article = article
self.thread = thread
self.completition_notifier = kwargs.get("notifier") self.completition_notifier = kwargs.get("notifier")
@ -48,7 +49,7 @@ class ArticleWatcher:
elif completed_action == "download": elif completed_action == "download":
self.compress.process(self) self.compress.process(self)
elif completed_action == "compress": # last step elif completed_action == "compress": # last step
self.completition_notifier(self.article) self.completition_notifier(self.article, self.thread)
# triggers action in Coordinator # triggers action in Coordinator
elif completed_action == "upload": elif completed_action == "upload":
# this case occurs when upload was faster than compression # this case occurs when upload was faster than compression
@ -122,16 +123,18 @@ class Coordinator(Thread):
def incoming_request(self, message): def incoming_request(self, message):
"""This method is passed onto the slack worker. It gets triggered when a new message is received.""" """This method is passed onto the slack worker. It gets triggered when a new message is received."""
url = message.urls[0] # ignore all the other ones url = message.urls[0] # ignore all the other ones
a, is_new = models.ArticleDownload.get_or_create(article_url=url) article, is_new = models.ArticleDownload.get_or_create(article_url=url)
message.thread.article = a thread = message.thread
message.thread.save() thread.article = article
thread.save()
self.kwargs.update({"notifier" : self.article_complete_notifier}) self.kwargs.update({"notifier" : self.article_complete_notifier})
if is_new or (a.file_name == "" and a.verified == 0): if is_new or (article.file_name == "" and article.verified == 0):
# check for models that were created but were abandonned. This means they have missing information, most importantly no associated file # check for models that were created but were abandonned. This means they have missing information, most importantly no associated file
# this overwrites previously set information, but that should not be too important # this overwrites previously set information, but that should not be too important
ArticleWatcher( ArticleWatcher(
a, article,
thread,
**self.kwargs **self.kwargs
) )
@ -140,7 +143,8 @@ class Coordinator(Thread):
# the watcher orchestrates the procedure and notifies upon completition # the watcher orchestrates the procedure and notifies upon completition
# the watcher will notify once it is sufficiently populated # the watcher will notify once it is sufficiently populated
else: # manually trigger notification immediatly else: # manually trigger notification immediatly
self.article_complete_notifier(a) logger.info(f"Found existing article {article}. Now sending")
self.article_complete_notifier(article, thread)
@ -152,8 +156,8 @@ class Coordinator(Thread):
notifier = lambda article: print(f"Completed manual actions for {article}") notifier = lambda article: print(f"Completed manual actions for {article}")
ArticleWatcher(article, workers_manual = workers, notifier = notifier) ArticleWatcher(article, workers_manual = workers, notifier = notifier)
def article_complete_notifier(self, article): def article_complete_notifier(self, article, thread):
self.worker_slack.bot_worker.respond_channel_message(article) self.worker_slack.bot_worker.respond_channel_message(thread)
self.worker_mail.send(article) self.worker_mail.send(article)

View File

@ -3,8 +3,8 @@ import configuration
import requests import requests
import os import os
import time import time
import asyncio
import sys import sys
from threading import Thread
from slack_sdk.errors import SlackApiError from slack_sdk.errors import SlackApiError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -18,26 +18,29 @@ def init(client) -> None:
global slack_client global slack_client
slack_client = client slack_client = client
# config["archive_id"] = channel_id global LATEST_RECORDED_REACTION
try: try:
LATEST_RECORDED_REACTION = models.Reaction.select(models.Reaction.id).order_by("id")[-1] LATEST_RECORDED_REACTION = models.Reaction.select(models.Reaction.id).order_by("id")[-1]
except IndexError: #query is actually empty, we have never fetched any messages until now except IndexError: #query is actually empty, we have never fetched any messages until now
LATEST_RECORDED_REACTION = 0 LATEST_RECORDED_REACTION = 0
# fetch all te messages we could have possibly missed # fetch all te messages we could have possibly missed
logger.info("Querying missed messages, threads and reactions. This can take some time.") logger.info("Querying missed messages, threads and reactions. This can take some time.")
fetch_missed_channel_messages() fetch_missed_channel_messages() # not threaded
if "nofetch" in sys.argv: t = Thread(target = fetch_missed_channel_reactions) # threaded, runs in background (usually takes a long time)
logger.info("Omitted update of reactions and thread messages because of argument 'nofetch'.") t.start()
if "reducedfetch" in sys.argv:
logger.warning("Only fetching empty threads for bot messages because of argument 'reducedfetch'")
fetch_missed_thread_messages(reduced=True)
else: # perform these two asyncronously else: # perform these two asyncronously
async def run_async(): fetch_missed_thread_messages()
await asyncio.gather(fetch_missed_channel_reactions(), fetch_missed_thread_messages())
asyncio.run(run_async())
def get_past_messages():
def get_unhandled_messages():
"""Gets all messages that have not yet been handled, be it by mistake or by downtime """Gets all messages that have not yet been handled, be it by mistake or by downtime
As the message handler mkaes no distinction between channel messages and thread messages, As the message handler makes no distinction between channel messages and thread messages,
we don't have to worry about them here. we don't have to worry about them here.
""" """
@ -51,10 +54,11 @@ def get_past_messages():
logger.info(f"Set {len(threaded_objects)} thread-messages as not yet handled.") logger.info(f"Set {len(threaded_objects)} thread-messages as not yet handled.")
channel_objects = [t.initiator_message for t in models.Thread.select() if t.message_count == 1 and not t.is_fully_processed] channel_objects = [t.initiator_message for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)]
logger.info(f"Set {len(channel_objects)} channel-messages as not yet handled.") logger.info(f"Set {len(channel_objects)} channel-messages as not yet handled.")
reaction_objects = list(models.Reaction.select().where(models.Reaction.id > LATEST_RECORDED_REACTION)) reaction_objects = list(models.Reaction.select().where(models.Reaction.id > LATEST_RECORDED_REACTION))
logger.info(f"Set {len(reaction_objects)} reactions as not yet handled.")
# the ones newer than the last before the fetch # the ones newer than the last before the fetch
all_messages = channel_objects + threaded_objects all_messages = channel_objects + threaded_objects
@ -108,11 +112,17 @@ def fetch_missed_channel_messages():
logger.info(f"Fetched {new_fetches} new channel messages.") logger.info(f"Fetched {new_fetches} new channel messages.")
async def fetch_missed_thread_messages(): def fetch_missed_thread_messages(reduced=False):
"""After having gotten all base-threads, we need to fetch all their replies""" """After having gotten all base-threads, we need to fetch all their replies"""
# I don't know of a better way: we need to fetch this for each and every thread (except if it is marked as permanently solved) # I don't know of a better way: we need to fetch this for each and every thread (except if it is marked as permanently solved)
logger.info("Starting async fetch of thread messages...") logger.info("Starting fetch of thread messages...")
if reduced:
threads = [t for t in models.Thread.select() if (t.message_count == 1 and not t.is_fully_processed)]
# this only fetches completely empty threads, which might be because the bot-message was not yet saved to the db.
# once we got all the bot-messages the remaining empty threads will be the ones we need to process.
else:
threads = [t for t in models.Thread.select() if not t.is_fully_processed] threads = [t for t in models.Thread.select() if not t.is_fully_processed]
logger.info(f"Fetching history for {len(threads)} empty threads")
new_messages = [] new_messages = []
for i,t in enumerate(threads): for i,t in enumerate(threads):
try: try:
@ -123,7 +133,7 @@ async def fetch_missed_thread_messages():
)["messages"] )["messages"]
except SlackApiError: except SlackApiError:
logger.error("Hit rate limit while querying threaded messages, retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) logger.error("Hit rate limit while querying threaded messages, retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads)))
await asyncio.sleep(int(config["api_wait_time"])) time.sleep(int(config["api_wait_time"]))
messages = slack_client.conversations_replies( messages = slack_client.conversations_replies(
channel = config["archive_id"], channel = config["archive_id"],
ts = t.slack_ts, ts = t.slack_ts,
@ -140,8 +150,8 @@ async def fetch_missed_thread_messages():
logger.info("Fetched {} new threaded messages.".format(len(new_messages))) logger.info("Fetched {} new threaded messages.".format(len(new_messages)))
async def fetch_missed_channel_reactions(): def fetch_missed_channel_reactions():
logger.info("Starting async fetch of channel reactions...") logger.info("Starting background fetch of channel reactions...")
threads = [t for t in models.Thread.select() if not t.is_fully_processed] threads = [t for t in models.Thread.select() if not t.is_fully_processed]
for i,t in enumerate(threads): for i,t in enumerate(threads):
try: try:
@ -152,7 +162,7 @@ async def fetch_missed_channel_reactions():
reactions = query["message"].get("reactions", []) # default = [] reactions = query["message"].get("reactions", []) # default = []
except SlackApiError: # probably a rate_limit: except SlackApiError: # probably a rate_limit:
logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads))) logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads)))
await asyncio.sleep(int(config["api_wait_time"])) time.sleep(int(config["api_wait_time"]))
reactions = query["message"].get("reactions", []) reactions = query["message"].get("reactions", [])
for r in reactions: for r in reactions:

View File

@ -1,9 +1,7 @@
from threading import Thread
from slack_bolt import App from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler from slack_bolt.adapter.socket_mode import SocketModeHandler
import logging import logging
from rich.rule import Rule
import configuration import configuration
from . import message_helpers from . import message_helpers
@ -18,12 +16,11 @@ class BotApp(App):
def __init__(self, callback, *args, **kwargs): def __init__(self, callback, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# models = models
self.callback = callback self.callback = callback
def start(self): def start(self):
message_helpers.init(self.client) message_helpers.init(self.client)
missed_messages, missed_reactions = message_helpers.get_past_messages() missed_messages, missed_reactions = message_helpers.get_unhandled_messages()
[self.handle_incoming_message(m) for m in missed_messages] [self.handle_incoming_message(m) for m in missed_messages]
[self.handle_incoming_reaction(r) for r in missed_reactions] [self.handle_incoming_reaction(r) for r in missed_reactions]
@ -122,10 +119,8 @@ class BotApp(App):
) )
def respond_channel_message(self, article, say=message_helpers.say_substitute): def respond_channel_message(self, thread, say=message_helpers.say_substitute):
# extra={"markup": True} article = thread.article
# self.logger.info(Rule(url[:min(len(url), 30)]))
thread = article.slack_thread.execute()[0]
answers = article.slack_info answers = article.slack_info
for a in answers: for a in answers:
if a["file_path"]: if a["file_path"]:
@ -149,7 +144,6 @@ class BotApp(App):
thread_ts=thread.slack_ts thread_ts=thread.slack_ts
) )
status = True status = True
# self.logger.info(Rule(f"Fully handled (success={status})"))
def startup_status(self): def startup_status(self):

View File

@ -211,6 +211,7 @@ class Thread(ChatBaseModel):
@property @property
def message_count(self): def message_count(self):
# logger.warning("message_count was called")
return self.messages.count() return self.messages.count()
@property @property

View File

@ -136,12 +136,11 @@ class PDFDownloader:
hrefs = [e.get_attribute("href") for e in self.driver.find_elements_by_xpath("//a[@href]")] hrefs = [e.get_attribute("href") for e in self.driver.find_elements_by_xpath("//a[@href]")]
except: except:
hrefs = [] hrefs = []
old = hrefs len_old = len(hrefs)
hrefs = [h for h in hrefs \ hrefs = [h for h in hrefs \
if not sum([(domain in h) for domain in blacklisted]) # sum([True, False, False, False]) == 1 (esp. not 0) if not sum([(domain in h) for domain in blacklisted]) # sum([True, False, False, False]) == 1 (esp. not 0)
] # filter a tiny bit at least ] # filter a tiny bit at least
diff = set(old) ^ set(hrefs) self.logger.info(f"Hrefs result (before:{len_old}, after: {len(hrefs)})")
self.logger.info(f"Removed {len(diff)} hrefs: {diff} (before:{len(old)}, after: {len(hrefs)})")
return hrefs return hrefs

View File

@ -15,6 +15,7 @@ class NewspaperDummy():
title = "Error while running fetch" title = "Error while running fetch"
summary = "Error while running fetch" summary = "Error while running fetch"
text = "Error while running fetch" text = "Error while running fetch"
meta_lang = ""
authors = [] authors = []
keywords = [] keywords = []
@ -23,6 +24,7 @@ def get_description(article_object):
url = article_object.article_url url = article_object.article_url
website = urlparse(url).netloc website = urlparse(url).netloc
article_object.source_name = website article_object.source_name = website
try: try:
pub_date = datetime.datetime.strptime(find_date(url), '%Y-%d-%M') pub_date = datetime.datetime.strptime(find_date(url), '%Y-%d-%M')
except: # other file types except: # other file types
@ -50,6 +52,11 @@ def get_description(article_object):
else: else:
summary = fallback.summary summary = fallback.summary
try:
print(f"lang: {news_article.meta_lang}")
except:
print("could not access meta_lang")
if news_article.meta_lang: if news_article.meta_lang:
lang = news_article.meta_lang lang = news_article.meta_lang
else: else: