Better launch, cleaner shutdown (wip)

This commit is contained in:
Remy Moll
2022-08-11 13:42:45 +02:00
parent bc5eaba519
commit 9ca4985853
14 changed files with 147 additions and 37 deletions

View File

@@ -4,6 +4,7 @@ models = configuration.models
from threading import Thread
import logging
import os
import sys
logger = logging.getLogger(__name__)
from utils_mail import runner as mail_runner
@@ -102,7 +103,7 @@ class ArticleWatcher:
class Coordinator(Thread):
def __init__(self, **kwargs) -> None:
"""Launcher calls this Coordinator as the main thread to handle connections between the other workers (threaded)."""
super().__init__(target = self.launch)
super().__init__(target = self.launch, daemon=True)
def add_workers(self, **kwargs):
self.worker_slack = kwargs.pop("worker_slack", None)
@@ -192,6 +193,13 @@ if __name__ == "__main__":
"worker_slack" : slack_runner,
"worker_mail" : mail_runner,
}
coordinator.add_workers(**kwargs)
coordinator.start()
slack_runner.start()
try:
coordinator.add_workers(**kwargs)
coordinator.start()
slack_runner.start()
except KeyboardInterrupt:
logger.info("Keyboard interrupt. Stopping Slack and Coordinator")
slack_runner.stop()
print("BYE!")
# coordinator was set as a daemon thread, so it will be stopped automatically
sys.exit(0)

View File

@@ -55,7 +55,7 @@ def file_overview(file_url: str, file_attributes: list, options: dict) -> None:
def send_reaction_to_slack_thread(article, reaction):
"""Sends the verification status as a reaction to the associated slack thread. This will significantly decrease load times of the bot"""
"""Sends the verification status as a reaction to the associated slack thread."""
thread = article.slack_thread
messages = models.Message.select().where(models.Message.text.contains(article.article_url))
# TODO rewrite this shit
@@ -63,9 +63,10 @@ def send_reaction_to_slack_thread(article, reaction):
print("Found more than 5 messages. Aborting reactions...")
return
for m in messages:
if not m.has_single_url:
if m.is_processed_override:
print("Message already processed. Aborting reactions...")
elif not m.has_single_url:
print("Found thread but won't send reaction because thread has multiple urls")
pass
else:
ts = m.slack_ts
bot_client.reactions_add(

View File

@@ -37,6 +37,6 @@ def send(article_model):
smtp.sendmail(config["sender"], config["recipient"], mail.as_string())
smtp.quit()
logger.info("Mail successfully sent.")
except Exception as e:
except smtplib.SMTPException as e:
logger.error("Could not send mail for article {}".format(article_model))
logger.info(e)

View File

@@ -14,6 +14,7 @@ LATEST_RECORDED_REACTION = 0
def init(client) -> None:
"""Starts fetching past messages and returns the freshly launched thread"""
global slack_client
slack_client = client
@@ -26,7 +27,7 @@ def init(client) -> None:
# fetch all te messages we could have possibly missed
logger.info("Querying missed messages, threads and reactions. This can take some time.")
fetch_missed_channel_messages() # not threaded
t = Thread(target = fetch_missed_channel_reactions) # threaded, runs in background (usually takes a long time)
t = Thread(target = fetch_missed_channel_reactions, daemon=True) # threaded, runs in background (usually takes a long time)
t.start()
if os.getenv("REDUCEDFETCH", "false") == "true":
@@ -153,16 +154,23 @@ def fetch_missed_channel_reactions():
logger.info("Starting background fetch of channel reactions...")
threads = [t for t in models.Thread.select() if not t.is_fully_processed]
for i,t in enumerate(threads):
reactions = []
try:
query = slack_client.reactions_get(
channel = config["archive_id"],
timestamp = t.slack_ts
)
reactions = query.get("message", []).get("reactions", []) # default = []
except SlackApiError: # probably a rate_limit:
logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads)))
time.sleep(int(config["api_wait_time"]))
reactions = query.get("message", []).get("reactions", [])
except SlackApiError as e:
if e.response.get("error", "") == "message_not_found":
m = t.initiator_message
logger.warning(f"Message (id={m.id}) not found. Skipping and saving...")
# this usually means the message is past the 1000 message limit imposed by slack. Mark it as processed in the db
m.is_processed_override = True
m.save()
else: # probably a rate_limit:
logger.error("Hit rate limit while querying reactions. retrying in {}s ({}/{} queries elapsed)".format(config["api_wait_time"], i, len(threads)))
time.sleep(int(config["api_wait_time"]))
for r in reactions:
reaction_dict_to_model(r, t)

View File

@@ -1,5 +1,6 @@
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from slack_sdk.errors import SlackApiError
import logging
import configuration
@@ -18,7 +19,7 @@ class BotApp(App):
super().__init__(*args, **kwargs)
self.callback = callback
def start(self):
def pre_start(self):
message_helpers.init(self.client)
missed_messages, missed_reactions = message_helpers.get_unhandled_messages()
@@ -124,7 +125,7 @@ class BotApp(App):
answers = article.slack_info
for a in answers:
if a["file_path"]:
try: # either, a["file_path"] does not exist, or the upload resulted in an error
try: # upload resulted in an error
self.client.files_upload(
channels = config["archive_id"],
initial_comment = f"<@{config['responsible_id']}> \n {a['reply_text']}",
@@ -132,12 +133,13 @@ class BotApp(App):
thread_ts = thread.slack_ts
)
status = True
except:
except SlackApiError as e:
say(
"File {} could not be uploaded.".format(a),
thread_ts=thread.slack_ts
)
status = False
self.logger.error(f"File upload failed: {e}")
else: # anticipated that there is no file!
say(
f"<@{config['responsible_id']}> \n {a['reply_text']}",
@@ -171,14 +173,17 @@ class BotRunner():
def handle_incoming_reaction(event, say):
return self.bot_worker.handle_incoming_reaction(event)
# target = self.launch
# super().__init__(target=target)
self.handler = SocketModeHandler(self.bot_worker, config["app_token"])
def start(self):
self.bot_worker.start()
SocketModeHandler(self.bot_worker, config["app_token"]).start()
self.bot_worker.pre_start()
self.handler.start()
def stop(self):
self.handler.close()
print("Bye handler!")
# def respond_to_message(self, message):
# self.bot_worker.handle_incoming_message(message)

View File

@@ -31,7 +31,8 @@ class PDFDownloader:
self.logger.warning("Opening browser GUI because of 'HEADLESS=false'")
options.set_preference('print.save_as_pdf.links.enabled', True)
# Just save if the filetype is pdf already, does not work!
# Just save if the filetype is pdf already
# TODO: this is not working right now
options.set_preference("print.printer_Mozilla_Save_to_PDF.print_to_file", True)
options.set_preference("browser.download.folderList", 2)
@@ -40,6 +41,7 @@ class PDFDownloader:
options.set_preference("browser.download.dir", config["default_download_path"])
self.logger.info("Starting gecko driver")
# peviously, in a single docker image:
# self.driver = webdriver.Firefox(
# options = options,
# service = webdriver.firefox.service.Service(

View File

@@ -57,6 +57,6 @@ def get_description(article_object):
try:
article_object.set_keywords(news_article.keywords)
except AttributeError:
pass # list would have been empty anyway
pass # list would have been empty anyway
return article_object