"""Main coordination of other util classes. Handles inbound and outbound calls""" import configuration models = configuration.models import sys import logging logger = logging.getLogger(__name__) from utils_mail import runner as mail_runner from utils_slack import runner as slack_runner from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, UploadWorker class ArticleWatcher: """Wrapper for a newly created article object. Notifies the coordinator upon change/completition""" def __init__(self, article, **kwargs) -> None: self.article = article self.completition_notifier = kwargs.get("notifier") self.fetch = kwargs.get("worker_fetch", None) self.download = kwargs.get("worker_download", None) self.compress = kwargs.get("worker_compress", None) self.upload = kwargs.get("worker_upload", None) self.completition_notified = False # self._download_called = self._compression_called = False self._fetch_completed = self._download_completed = self._compression_completed = self._upload_completed = False # first step: gather metadata self.fetch.process(self) # this will call the update_status method self.upload.process(self) # idependdent from the rest def update_status(self, completed_action): """Checks and notifies internal completition-status. Article download is complete iff fetch and download were successfull and compression was run """ # if self.completition_notified and self._compression_completed and self._fetch_completed and self._download_completed and self._upload_completed, we are done # we don't need to delete self though, because it is then automatically garbage-collected # all_done = self._fetch_completed and self._download_completed and self._compression_completed and self._upload_completed # if self._fetch_completed and not self._download_called: # self._download_called = True # self.download.process(self) # elif self._download_completed and not self._compression_called: # self._compression_called = True # self.compress.process(self) # elif self._compression_completed: # last step # self.completition_notifier(self.article) # # triggers action in Coordinator # elif self._upload_completed: # # this case occurs when upload was faster than compression # pass # else: # logger.warning(f"update_status called with unusual configuration {self._fetch_completed},{self._download_completed},{self._compression_completed}") if completed_action == "fetch": self.download.process(self) elif completed_action == "download": self.compress.process(self) elif completed_action == "compress": # last step self.completition_notifier(self.article) # triggers action in Coordinator elif completed_action == "upload": # this case occurs when upload was faster than compression pass else: logger.warning(f"update_status called with unusual configuration: {completed_action}") # ====== Attributes to be modified by the util workers @property def fetch_completed(self): return self._fetch_completed @fetch_completed.setter def fetch_completed(self, value: bool): self._fetch_completed = value self.update_status("fetch") @property def download_completed(self): return self._download_completed @download_completed.setter def download_completed(self, value: bool): self._download_completed = value self.update_status("download") @property def compression_completed(self): return self._compression_completed @compression_completed.setter def compression_completed(self, value: bool): self._compression_completed = value self.update_status("compress") @property def upload_completed(self): return self._upload_completed @upload_completed.setter def upload_completed(self, value: bool): self._upload_completed = value self.update_status("upload") class Coordinator: def __init__(self, **kwargs) -> None: """Launcher calls this Coordinator as the main thread to handle connections between the other workers (threaded).""" pass def add_workers(self, **kwargs): self.worker_slack = kwargs.pop("worker_slack", None) self.worker_mail = kwargs.pop("worker_mail", None) # the two above won't be needed in the Watcher self.worker_download = kwargs.get("worker_download", None) self.worker_fetch = kwargs.get("worker_fetch", None) self.worker_compress = kwargs.get("worker_compress", None) self.worker_upload = kwargs.get("worker_upload", None) self.kwargs = kwargs for w in [self.worker_slack, self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]: if not w is None: w.start() def incoming_request(self, message): # TODO CHECK ME! """This method is passed onto the slack worker. It gets triggered when a new message is received.""" url = message.urls[0] # ignore all the other ones a, is_new = models.ArticleDownload.get_or_create(article_url=url) message.thread.article = a message.thread.save() if is_new: self.kwargs.update({"notifier" : self.article_complete_notifier}) ArticleWatcher( a, **self.kwargs ) # All workers are implemented as a threaded queue. But the individual model requires a specific processing order: # fetch -> download -> compress -> complete # the watcher orchestrates the procedure and notifies upon completition # the watcher will notify once it is sufficiently populated else: # manually trigger notification immediatly self.article_complete_notifier(a) def manual_processing(self, url_list, target_calls): for url in url_list: article = models.ArticleDownload.get_or_none(article_url=url) watcher = ArticleWatcher(article, self.article_complete_notifier) for t in target_calls: t.process(watcher) def article_complete_notifier(self, article): self.worker_slack.bot_worker.respond_channel_message(article) self.worker_mail.send(article) if __name__ == "__main__": coordinator = Coordinator() if "upload" in sys.argv: urls = models.ArticleDownload.select(models.ArticleDownload.article_url).where(models.ArticleDownload.archive_url == "").execute() logger.info(f"Launching upload to archive for {len(urls)} urls.") coordinator.manual_processing(urls, [UploadWorker()]) elif "check" in sys.argv: from utils_check import runner as check_runner check_runner.verify_unchecked() else: # launch with full action kwargs = { "worker_download" : DownloadWorker(), "worker_fetch" : FetchWorker(), "worker_upload" : UploadWorker(), "worker_compress" : CompressWorker(), "worker_slack" : slack_runner.BotRunner(coordinator.incoming_request), "worker_mail" : mail_runner, } coordinator.add_workers(**kwargs) # TODO # Resume interrupted article models