coss_archiving/app/runner.py

193 lines
7.4 KiB
Python

"""Main coordination of other util classes. Handles inbound and outbound calls"""
import configuration
models = configuration.models
import sys
import logging
logger = logging.getLogger(__name__)
from utils_mail import runner as mail_runner
from utils_slack import runner as slack_runner
from utils.workers import CompressWorker, DownloadWorker, FetchWorker, UploadWorker
class ArticleWatcher:
"""Wrapper for a newly created article object. Notifies the coordinator upon change/completition"""
def __init__(self, article, **kwargs) -> None:
self.article = article
self.completition_notifier = kwargs.get("notifier")
self.fetch = kwargs.get("worker_fetch", None)
self.download = kwargs.get("worker_download", None)
self.compress = kwargs.get("worker_compress", None)
self.upload = kwargs.get("worker_upload", None)
self.completition_notified = False
# self._download_called = self._compression_called = False
self._fetch_completed = self._download_completed = self._compression_completed = self._upload_completed = False
# first step: gather metadata
self.fetch.process(self) # this will call the update_status method
self.upload.process(self) # idependdent from the rest
def update_status(self, completed_action):
"""Checks and notifies internal completition-status.
Article download is complete iff fetch and download were successfull and compression was run
"""
# if self.completition_notified and self._compression_completed and self._fetch_completed and self._download_completed and self._upload_completed, we are done
# we don't need to delete self though, because it is then automatically garbage-collected
# all_done = self._fetch_completed and self._download_completed and self._compression_completed and self._upload_completed
# if self._fetch_completed and not self._download_called:
# self._download_called = True
# self.download.process(self)
# elif self._download_completed and not self._compression_called:
# self._compression_called = True
# self.compress.process(self)
# elif self._compression_completed: # last step
# self.completition_notifier(self.article)
# # triggers action in Coordinator
# elif self._upload_completed:
# # this case occurs when upload was faster than compression
# pass
# else:
# logger.warning(f"update_status called with unusual configuration {self._fetch_completed},{self._download_completed},{self._compression_completed}")
if completed_action == "fetch":
self.download.process(self)
elif completed_action == "download":
self.compress.process(self)
elif completed_action == "compress": # last step
self.completition_notifier(self.article)
# triggers action in Coordinator
elif completed_action == "upload":
# this case occurs when upload was faster than compression
pass
else:
logger.warning(f"update_status called with unusual configuration: {completed_action}")
# ====== Attributes to be modified by the util workers
@property
def fetch_completed(self):
return self._fetch_completed
@fetch_completed.setter
def fetch_completed(self, value: bool):
self._fetch_completed = value
self.update_status("fetch")
@property
def download_completed(self):
return self._download_completed
@download_completed.setter
def download_completed(self, value: bool):
self._download_completed = value
self.update_status("download")
@property
def compression_completed(self):
return self._compression_completed
@compression_completed.setter
def compression_completed(self, value: bool):
self._compression_completed = value
self.update_status("compress")
@property
def upload_completed(self):
return self._upload_completed
@upload_completed.setter
def upload_completed(self, value: bool):
self._upload_completed = value
self.update_status("upload")
class Coordinator:
def __init__(self, **kwargs) -> None:
"""Launcher calls this Coordinator as the main thread to handle connections between the other workers (threaded)."""
pass
def add_workers(self, **kwargs):
self.worker_slack = kwargs.pop("worker_slack", None)
self.worker_mail = kwargs.pop("worker_mail", None)
# the two above won't be needed in the Watcher
self.worker_download = kwargs.get("worker_download", None)
self.worker_fetch = kwargs.get("worker_fetch", None)
self.worker_compress = kwargs.get("worker_compress", None)
self.worker_upload = kwargs.get("worker_upload", None)
self.kwargs = kwargs
for w in [self.worker_slack, self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]:
if not w is None:
w.start()
def incoming_request(self, message):
# TODO CHECK ME!
"""This method is passed onto the slack worker. It gets triggered when a new message is received."""
url = message.urls[0] # ignore all the other ones
a, is_new = models.ArticleDownload.get_or_create(article_url=url)
message.thread.article = a
message.thread.save()
if is_new:
self.kwargs.update({"notifier" : self.article_complete_notifier})
ArticleWatcher(
a,
**self.kwargs
)
# All workers are implemented as a threaded queue. But the individual model requires a specific processing order:
# fetch -> download -> compress -> complete
# the watcher orchestrates the procedure and notifies upon completition
# the watcher will notify once it is sufficiently populated
else: # manually trigger notification immediatly
self.article_complete_notifier(a)
def manual_processing(self, url_list, target_calls):
for url in url_list:
article = models.ArticleDownload.get_or_none(article_url=url)
watcher = ArticleWatcher(article, self.article_complete_notifier)
for t in target_calls:
t.process(watcher)
def article_complete_notifier(self, article):
self.worker_slack.bot_worker.respond_channel_message(article)
self.worker_mail.send(article)
if __name__ == "__main__":
coordinator = Coordinator()
if "upload" in sys.argv:
urls = models.ArticleDownload.select(models.ArticleDownload.article_url).where(models.ArticleDownload.archive_url == "").execute()
logger.info(f"Launching upload to archive for {len(urls)} urls.")
coordinator.manual_processing(urls, [UploadWorker()])
elif "check" in sys.argv:
logger.info("Not implemented yet.")
else: # launch with full action
kwargs = {
"worker_download" : DownloadWorker(),
"worker_fetch" : FetchWorker(),
"worker_upload" : UploadWorker(),
"worker_compress" : CompressWorker(),
"worker_slack" : slack_runner.BotRunner(coordinator.incoming_request),
"worker_mail" : mail_runner,
}
coordinator.add_workers(**kwargs)
# TODO
# Resume interrupted article models