"""Main coordination of other util classes. Handles inbound and outbound calls""" import configuration models = configuration.models from threading import Thread import logging import os logger = logging.getLogger(__name__) from utils_mail import runner as mail_runner from utils_slack import runner as slack_runner from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, UploadWorker class ArticleWatcher: """Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" def __init__(self, article, thread, **kwargs) -> None: self.article_id = article.id # in case article becomes None at any point, we can still track the article self.article = article self.thread = thread self.completition_notifier = kwargs.get("notifier") self.fetch = kwargs.get("worker_fetch", None) self.download = kwargs.get("worker_download", None) self.compress = kwargs.get("worker_compress", None) self.upload = kwargs.get("worker_upload", None) self.completition_notified = False # self._download_called = self._compression_called = False self._fetch_completed = self._download_completed = self._compression_completed = self._upload_completed = False # first step: gather metadata if self.fetch and self.upload: self.fetch.process(self) # this will call the update_status method self.upload.process(self) # idependent from the rest else: # the full kwargs were not provided, only do a manual run # overwrite update_status() because calls from the workers will result in erros self.update_status = lambda completed: logger.info(f"Completed action {completed}") for w in kwargs.get("workers_manual"): w.process(self) def update_status(self, completed_action): """Checks and notifies internal completition-status. Article download is complete iff fetch and download were successfull and compression was run """ # if self.completition_notified and self._compression_completed and self._fetch_completed and self._download_completed and self._upload_completed, we are done if completed_action == "fetch": self.download.process(self) elif completed_action == "download": self.compress.process(self) elif completed_action == "compress": # last step self.completition_notifier(self.article, self.thread) # triggers action in Coordinator elif completed_action == "upload": # this case occurs when upload was faster than compression pass else: logger.warning(f"update_status called with unusual configuration: {completed_action}") # ====== Attributes to be modified by the util workers @property def fetch_completed(self): return self._fetch_completed @fetch_completed.setter def fetch_completed(self, value: bool): self._fetch_completed = value self.update_status("fetch") @property def download_completed(self): return self._download_completed @download_completed.setter def download_completed(self, value: bool): self._download_completed = value self.update_status("download") @property def compression_completed(self): return self._compression_completed @compression_completed.setter def compression_completed(self, value: bool): self._compression_completed = value self.update_status("compress") @property def upload_completed(self): return self._upload_completed @upload_completed.setter def upload_completed(self, value: bool): self._upload_completed = value self.update_status("upload") def __str__(self) -> str: return f"Article with id {self.article_id}" class Coordinator(Thread): def __init__(self, **kwargs) -> None: """Launcher calls this Coordinator as the main thread to handle connections between the other workers (threaded).""" super().__init__(target = self.launch) def add_workers(self, **kwargs): self.worker_slack = kwargs.pop("worker_slack", None) self.worker_mail = kwargs.pop("worker_mail", None) # the two above won't be needed in the Watcher self.worker_download = kwargs.get("worker_download", None) self.worker_fetch = kwargs.get("worker_fetch", None) self.worker_compress = kwargs.get("worker_compress", None) self.worker_upload = kwargs.get("worker_upload", None) self.kwargs = kwargs def launch(self) -> None: for w in [self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]: if not w is None: w.start() def incoming_request(self, message): """This method is passed onto the slack worker. It gets triggered when a new message is received.""" url = message.urls[0] # ignore all the other ones article, is_new = models.ArticleDownload.get_or_create(article_url=url) thread = message.thread thread.article = article thread.save() self.kwargs.update({"notifier" : self.article_complete_notifier}) if is_new or (article.file_name == "" and article.verified == 0): # check for models that were created but were abandonned. This means they have missing information, most importantly no associated file # this overwrites previously set information, but that should not be too important ArticleWatcher( article, thread, **self.kwargs ) # All workers are implemented as a threaded queue. But the individual model requires a specific processing order: # fetch -> download -> compress -> complete # the watcher orchestrates the procedure and notifies upon completition # the watcher will notify once it is sufficiently populated else: # manually trigger notification immediatly logger.info(f"Found existing article {article}. Now sending") self.article_complete_notifier(article, thread) def manual_processing(self, articles, workers): for w in workers: w.start() for article in articles: notifier = lambda article: print(f"Completed manual actions for {article}") ArticleWatcher(article, None, workers_manual = workers, notifier = notifier) # Article watcher wants a thread to link article to TODO: handle threads as a kwarg def article_complete_notifier(self, article, thread): if self.worker_slack is None: logger.warning("Not sending slack notifier") else: self.worker_slack.bot_worker.respond_channel_message(thread) if self.worker_mail is None: logger.warning("Not sending mail notifier") else: self.worker_mail.send(article) if __name__ == "__main__": coordinator = Coordinator() if os.getenv("UPLOAD", "false") == "true": articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").execute() logger.info(f"Launching upload to archive for {len(articles)} articles.") coordinator.manual_processing(articles, [UploadWorker()]) elif os.getenv("CHECK", "false") == "true": from utils_check import runner as check_runner check_runner.verify_unchecked() else: # launch with full action slack_runner = slack_runner.BotRunner(coordinator.incoming_request) kwargs = { "worker_download" : DownloadWorker(), "worker_fetch" : FetchWorker(), "worker_upload" : UploadWorker(), "worker_compress" : CompressWorker(), "worker_slack" : slack_runner, "worker_mail" : mail_runner, } coordinator.add_workers(**kwargs) coordinator.start() slack_runner.start()