197 lines
7.6 KiB
Python
197 lines
7.6 KiB
Python
"""Main coordination of other util classes. Handles inbound and outbound calls"""
|
|
import configuration
|
|
models = configuration.models
|
|
import sys
|
|
from threading import Thread
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from utils_mail import runner as mail_runner
|
|
from utils_slack import runner as slack_runner
|
|
from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, UploadWorker
|
|
|
|
|
|
class ArticleWatcher:
|
|
"""Wrapper for a newly created article object. Notifies the coordinator upon change/completition"""
|
|
def __init__(self, article, thread, **kwargs) -> None:
|
|
self.article = article
|
|
self.thread = thread
|
|
|
|
self.completition_notifier = kwargs.get("notifier")
|
|
|
|
self.fetch = kwargs.get("worker_fetch", None)
|
|
self.download = kwargs.get("worker_download", None)
|
|
self.compress = kwargs.get("worker_compress", None)
|
|
self.upload = kwargs.get("worker_upload", None)
|
|
|
|
self.completition_notified = False
|
|
# self._download_called = self._compression_called = False
|
|
self._fetch_completed = self._download_completed = self._compression_completed = self._upload_completed = False
|
|
|
|
# first step: gather metadata
|
|
if self.fetch and self.upload:
|
|
self.fetch.process(self) # this will call the update_status method
|
|
self.upload.process(self) # idependdent from the rest
|
|
else: # the full kwargs were not provided, only do a manual run
|
|
# overwrite update_status() because calls from the workers will result in erros
|
|
self.update_status = lambda completed: logger.info(f"Completed action {completed}")
|
|
for w in kwargs.get("workers_manual"):
|
|
w.process(self)
|
|
|
|
|
|
def update_status(self, completed_action):
|
|
"""Checks and notifies internal completition-status.
|
|
Article download is complete iff fetch and download were successfull and compression was run
|
|
"""
|
|
# if self.completition_notified and self._compression_completed and self._fetch_completed and self._download_completed and self._upload_completed, we are done
|
|
if completed_action == "fetch":
|
|
self.download.process(self)
|
|
elif completed_action == "download":
|
|
self.compress.process(self)
|
|
elif completed_action == "compress": # last step
|
|
self.completition_notifier(self.article, self.thread)
|
|
# triggers action in Coordinator
|
|
elif completed_action == "upload":
|
|
# this case occurs when upload was faster than compression
|
|
pass
|
|
else:
|
|
logger.warning(f"update_status called with unusual configuration: {completed_action}")
|
|
|
|
|
|
# ====== Attributes to be modified by the util workers
|
|
@property
|
|
def fetch_completed(self):
|
|
return self._fetch_completed
|
|
|
|
@fetch_completed.setter
|
|
def fetch_completed(self, value: bool):
|
|
self._fetch_completed = value
|
|
self.update_status("fetch")
|
|
|
|
@property
|
|
def download_completed(self):
|
|
return self._download_completed
|
|
|
|
@download_completed.setter
|
|
def download_completed(self, value: bool):
|
|
self._download_completed = value
|
|
self.update_status("download")
|
|
|
|
@property
|
|
def compression_completed(self):
|
|
return self._compression_completed
|
|
|
|
@compression_completed.setter
|
|
def compression_completed(self, value: bool):
|
|
self._compression_completed = value
|
|
self.update_status("compress")
|
|
|
|
@property
|
|
def upload_completed(self):
|
|
return self._upload_completed
|
|
|
|
@upload_completed.setter
|
|
def upload_completed(self, value: bool):
|
|
self._upload_completed = value
|
|
self.update_status("upload")
|
|
|
|
|
|
|
|
|
|
class Coordinator(Thread):
|
|
def __init__(self, **kwargs) -> None:
|
|
"""Launcher calls this Coordinator as the main thread to handle connections between the other workers (threaded)."""
|
|
super().__init__(target = self.launch)
|
|
|
|
def add_workers(self, **kwargs):
|
|
self.worker_slack = kwargs.pop("worker_slack", None)
|
|
self.worker_mail = kwargs.pop("worker_mail", None)
|
|
# the two above won't be needed in the Watcher
|
|
self.worker_download = kwargs.get("worker_download", None)
|
|
self.worker_fetch = kwargs.get("worker_fetch", None)
|
|
self.worker_compress = kwargs.get("worker_compress", None)
|
|
self.worker_upload = kwargs.get("worker_upload", None)
|
|
|
|
self.kwargs = kwargs
|
|
|
|
def launch(self) -> None:
|
|
for w in [self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]:
|
|
if not w is None:
|
|
w.start()
|
|
|
|
|
|
def incoming_request(self, message):
|
|
"""This method is passed onto the slack worker. It gets triggered when a new message is received."""
|
|
url = message.urls[0] # ignore all the other ones
|
|
article, is_new = models.ArticleDownload.get_or_create(article_url=url)
|
|
thread = message.thread
|
|
thread.article = article
|
|
thread.save()
|
|
self.kwargs.update({"notifier" : self.article_complete_notifier})
|
|
|
|
if is_new or (article.file_name == "" and article.verified == 0):
|
|
# check for models that were created but were abandonned. This means they have missing information, most importantly no associated file
|
|
# this overwrites previously set information, but that should not be too important
|
|
ArticleWatcher(
|
|
article,
|
|
thread,
|
|
**self.kwargs
|
|
)
|
|
|
|
# All workers are implemented as a threaded queue. But the individual model requires a specific processing order:
|
|
# fetch -> download -> compress -> complete
|
|
# the watcher orchestrates the procedure and notifies upon completition
|
|
# the watcher will notify once it is sufficiently populated
|
|
else: # manually trigger notification immediatly
|
|
logger.info(f"Found existing article {article}. Now sending")
|
|
self.article_complete_notifier(article, thread)
|
|
|
|
|
|
|
|
def manual_processing(self, articles, workers):
|
|
for w in workers:
|
|
w.start()
|
|
|
|
for article in articles:
|
|
notifier = lambda article: print(f"Completed manual actions for {article}")
|
|
ArticleWatcher(article, workers_manual = workers, notifier = notifier)
|
|
|
|
def article_complete_notifier(self, article, thread):
|
|
if self.worker_slack is None:
|
|
logger.warning("Not sending slack notifier")
|
|
else:
|
|
self.worker_slack.bot_worker.respond_channel_message(thread)
|
|
if self.worker_mail is None:
|
|
logger.warning("Not sending mail notifier")
|
|
else:
|
|
self.worker_mail.send(article)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
coordinator = Coordinator()
|
|
|
|
|
|
if "upload" in sys.argv:
|
|
articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "").execute()
|
|
logger.info(f"Launching upload to archive for {len(articles)} articles.")
|
|
coordinator.manual_processing(articles, [UploadWorker()])
|
|
|
|
elif "check" in sys.argv:
|
|
from utils_check import runner as check_runner
|
|
check_runner.verify_unchecked()
|
|
|
|
else: # launch with full action
|
|
slack_runner = slack_runner.BotRunner(coordinator.incoming_request)
|
|
kwargs = {
|
|
"worker_download" : DownloadWorker(),
|
|
"worker_fetch" : FetchWorker(),
|
|
"worker_upload" : UploadWorker(),
|
|
"worker_compress" : CompressWorker(),
|
|
"worker_slack" : slack_runner,
|
|
"worker_mail" : mail_runner,
|
|
}
|
|
coordinator.add_workers(**kwargs)
|
|
coordinator.start()
|
|
slack_runner.start()
|