"""Main coordination of other util classes. Handles inbound and outbound calls""" from time import sleep import configuration models = configuration.models from threading import Thread import logging logger = logging.getLogger(__name__) import sys from collections import OrderedDict from utils_mail import runner as MailRunner from utils_slack import runner as SlackRunner from utils_worker.workers import DownloadWorker, FetchWorker, UploadWorker class ArticleWatcher: """Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" def __init__(self, article, workers_in, workers_out) -> None: self.article = article self.workers_in = workers_in self.workers_out = workers_out self.completition_notified = False for w_dict in self.workers_in: worker = self.get_next_worker(w_dict) # gets the first worker of each dict (they get processed independently) worker.process(self) def get_next_worker(self, worker_dict, worker_name=""): """Returns the worker coming after the one with key worker_name""" if worker_name == "": # first one return worker_dict[list(worker_dict.keys())[0]] # for i,w_dict in enumerate(workers_list): keys = list(worker_dict.keys()) next_key_ind = keys.index(worker_name) + 1 try: key = keys[next_key_ind] return worker_dict[key] except IndexError: return None def update(self, worker_name): """Called by the workers to notify the watcher of a completed step""" for w_dict in self.workers_in: if worker_name in w_dict.keys(): next_worker = self.get_next_worker(w_dict, worker_name) if next_worker: if next_worker == "out": self.completion_notifier() else: # it's just another in-worker next_worker.process(self) else: # no next worker, we are done logger.info(f"No worker after {worker_name}") def completion_notifier(self): """Triggers the out-workers to process the article, that is to send out a message""" for w_dict in self.workers_out: worker = self.get_next_worker(w_dict) worker.send(self.article) self.article.sent = True self.article.save() def __str__(self) -> str: return f"ArticleWatcher with id {self.article_id}" class Dispatcher(Thread): def __init__(self) -> None: """Thread to handle handle incoming requests and control the workers""" self.workers_in = [] self.workers_out = [] super().__init__(target = self.launch) def launch(self) -> None: # start workers (each worker is a thread) for w_dict in self.workers_in: # for reduced operations such as upload, some workers are not set for w in w_dict.values(): if isinstance(w, Thread): w.start() # get all articles not fully processed unsent = models.ArticleDownload.filter(sent = False) # if past messages have not been sent, they must be reevaluated for a in unsent: self.incoming_request(article=a) def incoming_request(self, message=None, article=None): """This method is passed onto the slack worker. It then is called when a new message is received.""" if message is not None: try: url = message.urls[0] # ignore all the other ones except IndexError: return article, is_new = models.ArticleDownload.get_or_create(article_url=url) article.slack_ts = message.ts # either update the timestamp (to the last reference to the article) or set it for the first time article.save() elif article is not None: is_new = False logger.info(f"Received article {article} in incoming_request") else: logger.error("Dispatcher.incoming_request called with no arguments") return if is_new or (article.file_name == "" and article.verified == 0) \ or (not is_new and len(self.workers_in) == 1): # this is for upload # check for models that were created but were abandonned. This means they have missing information, most importantly no associated file # this overwrites previously set information, but that should not be too important ArticleWatcher( article, workers_in=self.workers_in, workers_out=self.workers_out, ) else: # manually trigger notification immediatly logger.info(f"Found existing article {article}. Now sending") class PrintWorker: def send(self, article): print(f"Uploaded article {article}") def keep_alive(self): # keeps script running, because there is nothing else in the main thread while True: sleep(1) if __name__ == "__main__": dispatcher = Dispatcher() if "upload" in sys.argv: articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "" or models.ArticleDownload.archive_url == "TODO:UPLOAD").execute() logger.info(f"Launching upload to archive for {len(articles)} articles.") dispatcher.workers_in = [{"UploadWorker": UploadWorker()}] dispatcher.workers_out = [{"PrintWorker": PrintWorker()}] dispatcher.start() for a in articles: dispatcher.incoming_request(article=a) PrintWorker().keep_alive() else: # launch with full action try: slack_runner = SlackRunner.BotRunner(dispatcher.incoming_request) # All workers are implemented as a threaded queue. But the individual model requires a specific processing order: # fetch -> download (-> compress) -> complete # This is reflected in the following list of workers: workers_in = [ OrderedDict({"FetchWorker": FetchWorker(), "DownloadWorker": DownloadWorker(), "NotifyRunner": "out"}), OrderedDict({"UploadWorker": UploadWorker()}) ] # The two dicts are processed independently. First element of first dict is called at the same time as the first element of the second dict # Inside a dict, the order of the keys gives the order of execution (only when the first element is done, the second is called, etc...) workers_out = [{"SlackRunner": slack_runner},{"MailRunner": MailRunner}] dispatcher.workers_in = workers_in dispatcher.workers_out = workers_out dispatcher.start() # starts the thread, (ie. runs launch()) slack_runner.start() # last one to start, inside the main thread except KeyboardInterrupt: logger.info("Keyboard interrupt. Stopping Slack and dispatcher") slack_runner.stop() dispatcher.join() for w_dict in workers_in: for w in w_dict.values(): if isinstance(w, Thread): w.stop() # All threads are launched as a daemon thread, meaning that any 'leftover' should exit along with the sys call sys.exit(0)