190 lines
7.5 KiB
Python

"""Main coordination of other util classes. Handles inbound and outbound calls"""
from time import sleep
import configuration
models = configuration.models
from threading import Thread
import logging
logger = logging.getLogger(__name__)
import sys
from collections import OrderedDict
from utils_mail import runner as MailRunner
from utils_slack import runner as SlackRunner
from utils_worker.workers import DownloadWorker, FetchWorker, UploadWorker
class ArticleWatcher:
"""Wrapper for a newly created article object. Notifies the coordinator upon change/completition"""
def __init__(self, article, workers_in, workers_out) -> None:
self.article = article
self.workers_in = workers_in
self.workers_out = workers_out
self.completition_notified = False
for w_dict in self.workers_in:
worker = self.get_next_worker(w_dict) # gets the first worker of each dict (they get processed independently)
worker.process(self)
def get_next_worker(self, worker_dict, worker_name=""):
"""Returns the worker coming after the one with key worker_name"""
if worker_name == "": # first one
return worker_dict[list(worker_dict.keys())[0]]
# for i,w_dict in enumerate(workers_list):
keys = list(worker_dict.keys())
next_key_ind = keys.index(worker_name) + 1
try:
key = keys[next_key_ind]
return worker_dict[key]
except IndexError:
return None
def update(self, worker_name):
"""Called by the workers to notify the watcher of a completed step"""
for w_dict in self.workers_in:
if worker_name in w_dict.keys():
next_worker = self.get_next_worker(w_dict, worker_name)
if next_worker:
if next_worker == "out":
self.completion_notifier()
else: # it's just another in-worker
next_worker.process(self)
else: # no next worker, we are done
logger.info(f"No worker after {worker_name}")
def completion_notifier(self):
"""Triggers the out-workers to process the article, that is to send out a message"""
for w_dict in self.workers_out:
worker = self.get_next_worker(w_dict)
worker.send(self.article)
self.article.sent = True
self.article.save()
def __str__(self) -> str:
return f"ArticleWatcher with id {self.article_id}"
class Dispatcher(Thread):
def __init__(self) -> None:
"""Thread to handle handle incoming requests and control the workers"""
self.workers_in = []
self.workers_out = []
super().__init__(target = self.launch)
def launch(self) -> None:
# start workers (each worker is a thread)
for w_dict in self.workers_in: # for reduced operations such as upload, some workers are not set
for w in w_dict.values():
if isinstance(w, Thread):
w.start()
# get all articles not fully processed
unsent = models.ArticleDownload.filter(sent = False) # if past messages have not been sent, they must be reevaluated
for a in unsent:
self.incoming_request(article=a)
def incoming_request(self, message=None, article=None):
"""This method is passed onto the slack worker. It then is called when a new message is received."""
if message is not None:
try:
url = message.urls[0] # ignore all the other ones
except IndexError:
return
article, is_new = models.ArticleDownload.get_or_create(article_url=url)
article.slack_ts = message.ts # either update the timestamp (to the last reference to the article) or set it for the first time
article.save()
elif article is not None:
is_new = False
logger.info(f"Received article {article} in incoming_request")
else:
logger.error("Dispatcher.incoming_request called with no arguments")
return
if is_new or (article.file_name == "" and article.verified == 0) \
or (not is_new and len(self.workers_in) == 1): # this is for upload
# check for models that were created but were abandonned. This means they have missing information, most importantly no associated file
# this overwrites previously set information, but that should not be too important
ArticleWatcher(
article,
workers_in=self.workers_in,
workers_out=self.workers_out,
)
else: # manually trigger notification immediatly
logger.info(f"Found existing article {article}. Now sending")
class PrintWorker:
def __init__(self, action, sent = False) -> None:
self.action = action
self.sent = sent
def send(self, article):
print(f"{self.action} article {article}")
if self.sent:
article.sent = True
article.save()
def keep_alive(self): # keeps script running, because there is nothing else in the main thread
while True: sleep(1)
if __name__ == "__main__":
dispatcher = Dispatcher()
if "upload" in sys.argv:
articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "" or models.ArticleDownload.archive_url == "TODO:UPLOAD").execute()
logger.info(f"Launching upload to archive for {len(articles)} articles.")
dispatcher.workers_in = [{"UploadWorker": UploadWorker()}]
print_worker = PrintWorker("Uploaded")
dispatcher.workers_out = [{"PrintWorker": print_worker}]
dispatcher.start()
for a in articles:
dispatcher.incoming_request(article=a)
print_worker.keep_alive()
else: # launch with full action
try:
slack_runner = SlackRunner.BotRunner(dispatcher.incoming_request)
# All workers are implemented as a threaded queue. But the individual model requires a specific processing order:
# fetch -> download (-> compress) -> complete
# This is reflected in the following list of workers:
workers_in = [
OrderedDict({"FetchWorker": FetchWorker(), "DownloadWorker": DownloadWorker(), "NotifyRunner": "out"}),
OrderedDict({"UploadWorker": UploadWorker()})
]
# The two dicts are processed independently. First element of first dict is called at the same time as the first element of the second dict
# Inside a dict, the order of the keys gives the order of execution (only when the first element is done, the second is called, etc...)
workers_out = [{"SlackRunner": slack_runner},{"MailRunner": MailRunner}]
dispatcher.workers_in = workers_in
dispatcher.workers_out = workers_out
dispatcher.start() # starts the thread, (ie. runs launch())
slack_runner.start() # last one to start, inside the main thread
except KeyboardInterrupt:
logger.info("Keyboard interrupt. Stopping Slack and dispatcher")
slack_runner.stop()
dispatcher.join()
for w_dict in workers_in:
for w in w_dict.values():
if isinstance(w, Thread):
w.stop()
# All threads are launched as a daemon thread, meaning that any 'leftover' should exit along with the sys call
sys.exit(0)