Working, refactored news_fetch, better documentation for launch

This commit is contained in:
2022-09-08 16:19:15 +02:00
parent 713406dc67
commit afead44d6c
14 changed files with 220 additions and 247 deletions

View File

@@ -2,6 +2,8 @@ FROM python:latest
ENV TZ Europe/Zurich
RUN apt-get update && apt-get install -y ghostscript
# for compression of pdfs
RUN useradd --create-home --shell /bin/bash --uid 1001 autonews
# id mapped to local user

View File

@@ -1,7 +1,8 @@
import os
import shutil
import configparser
import logging
import time
import shutil
from datetime import datetime
from peewee import SqliteDatabase, PostgresqlDatabase
from rich.logging import RichHandler
@@ -41,6 +42,7 @@ if os.getenv("DEBUG", "false") == "true":
else:
logger.warning("Found 'DEBUG=false' and running on production databases, I hope you know what you're doing...")
time.sleep(10) # wait for the vpn to connect (can't use a healthcheck because there is no depends_on)
cred = db_config["DATABASE"]
download_db = PostgresqlDatabase(
cred["db_name"], user=cred["user_name"], password=cred["password"], host="vpn", port=5432

View File

@@ -3,125 +3,91 @@ import configuration
models = configuration.models
from threading import Thread
import logging
import sys
logger = logging.getLogger(__name__)
import sys
from collections import OrderedDict
from utils_mail import runner as mail_runner
from utils_slack import runner as slack_runner
from utils_mail import runner as MailRunner
from utils_slack import runner as SlackRunner
from utils_worker.workers import CompressWorker, DownloadWorker, FetchWorker, UploadWorker
class ArticleWatcher:
"""Wrapper for a newly created article object. Notifies the coordinator upon change/completition"""
def __init__(self, article, **kwargs) -> None:
self.article_id = article.id # in case article becomes None at any point, we can still track the article
def __init__(self, article, workers_in, workers_out) -> None:
self.article = article
self.completition_notifier = kwargs.get("notifier")
self.fetch = kwargs.get("worker_fetch", None)
self.download = kwargs.get("worker_download", None)
self.compress = kwargs.get("worker_compress", None)
self.upload = kwargs.get("worker_upload", None)
self.workers_in = workers_in
self.workers_out = workers_out
self.completition_notified = False
# self._download_called = self._compression_called = False
self._fetch_completed = self._download_completed = self._compression_completed = self._upload_completed = False
# first step: gather metadata
if self.fetch and self.upload:
self.fetch.process(self) # this will call the update_status method
self.upload.process(self) # idependent from the rest
else: # the full kwargs were not provided, only do a manual run
# overwrite update_status() because calls from the workers will result in erros
self.update_status = lambda completed: logger.info(f"Completed action {completed}")
for w in kwargs.get("workers_manual"):
w.process(self)
for w_dict in self.workers_in:
worker = self.get_next_worker(w_dict) # gets the first worker of each dict (they get processed independently)
worker.process(self)
def update_status(self, completed_action):
"""Checks and notifies internal completition-status.
Article download is complete iff fetch and download were successfull and compression was run
"""
# if self.completition_notified and self._compression_completed and self._fetch_completed and self._download_completed and self._upload_completed, we are done
if completed_action == "fetch":
self.download.process(self)
elif completed_action == "download":
self.compress.process(self)
elif completed_action == "compress": # last step
self.completition_notifier(self.article)
# triggers action in Coordinator
elif completed_action == "upload":
# this case occurs when upload was faster than compression
pass
else:
logger.warning(f"update_status called with unusual configuration: {completed_action}")
def get_next_worker(self, worker_dict, worker_name=""):
"""Returns the worker coming after the one with key worker_name"""
if worker_name == "": # first one
return worker_dict[list(worker_dict.keys())[0]]
# for i,w_dict in enumerate(workers_list):
keys = list(worker_dict.keys())
next_key_ind = keys.index(worker_name) + 1
try:
key = keys[next_key_ind]
return worker_dict[key]
except IndexError:
return None
# ====== Attributes to be modified by the util workers
@property
def fetch_completed(self):
return self._fetch_completed
def update(self, worker_name):
"""Called by the workers to notify the watcher of a completed step"""
for w_dict in self.workers_in:
if worker_name in w_dict.keys():
next_worker = self.get_next_worker(w_dict, worker_name)
if next_worker:
if next_worker == "out":
self.completion_notifier()
else: # it's just another in-worker
next_worker.process(self)
else: # no next worker, we are done
logger.info(f"No worker after {worker_name}")
@fetch_completed.setter
def fetch_completed(self, value: bool):
self._fetch_completed = value
self.update_status("fetch")
def completion_notifier(self):
"""Triggers the out-workers to process the article, that is to send out a message"""
for w_dict in self.workers_out:
worker = self.get_next_worker(w_dict)
worker.send(self.article)
self.article.sent = True
self.article.save()
@property
def download_completed(self):
return self._download_completed
@download_completed.setter
def download_completed(self, value: bool):
self._download_completed = value
self.update_status("download")
@property
def compression_completed(self):
return self._compression_completed
@compression_completed.setter
def compression_completed(self, value: bool):
self._compression_completed = value
self.update_status("compress")
@property
def upload_completed(self):
return self._upload_completed
@upload_completed.setter
def upload_completed(self, value: bool):
self._upload_completed = value
self.update_status("upload")
def __str__(self) -> str:
return f"Article with id {self.article_id}"
return f"ArticleWatcher with id {self.article_id}"
class Coordinator(Thread):
def __init__(self, **kwargs) -> None:
"""Launcher calls this Coordinator as the main thread to handle connections between the other workers (threaded)."""
super().__init__(target = self.launch, daemon=True)
def add_workers(self, **kwargs):
self.worker_slack = kwargs.pop("worker_slack", None)
self.worker_mail = kwargs.pop("worker_mail", None)
# the two above won't be needed in the Watcher
self.worker_download = kwargs.get("worker_download", None)
self.worker_fetch = kwargs.get("worker_fetch", None)
self.worker_compress = kwargs.get("worker_compress", None)
self.worker_upload = kwargs.get("worker_upload", None)
self.kwargs = kwargs
class Dispatcher(Thread):
def __init__(self) -> None:
"""Thread to handle handle incoming requests and control the workers"""
self.workers_in = []
self.workers_out = []
super().__init__(target = self.launch)
def launch(self) -> None:
for w in [self.worker_download, self.worker_fetch, self.worker_upload, self.worker_compress]:
if not w is None: # for reduced operations such as upload, some workers are set to None
w.start()
# start workers (each worker is a thread)
for w_dict in self.workers_in: # for reduced operations such as upload, some workers are not set
for w in w_dict.values():
if isinstance(w, Thread):
w.start()
# if past messages have not been sent, they must be reevaluated
unsent = models.ArticleDownload.filter(sent = False)
# .objects.filter(sent = False)
# get all articles not fully processed
unsent = models.ArticleDownload.filter(sent = False) # if past messages have not been sent, they must be reevaluated
for a in unsent:
self.incoming_request(article=a)
@@ -136,82 +102,82 @@ class Coordinator(Thread):
return
article, is_new = models.ArticleDownload.get_or_create(article_url=url)
article.slack_ts = message.ts # either update the timestamp (to the last reference to the article) or set it for the first time
article.save()
elif article is not None:
is_new = False
logger.info(f"Received article {article} in incoming_request")
else:
logger.error("Coordinator.incoming_request called with no arguments")
logger.error("Dispatcher.incoming_request called with no arguments")
return
self.kwargs.update({"notifier" : self.article_complete_notifier})
if is_new or (article.file_name == "" and article.verified == 0):
# check for models that were created but were abandonned. This means they have missing information, most importantly no associated file
# this overwrites previously set information, but that should not be too important
ArticleWatcher(
article,
**self.kwargs
workers_in=self.workers_in,
workers_out=self.workers_out,
)
# All workers are implemented as a threaded queue. But the individual model requires a specific processing order:
# fetch -> download -> compress -> complete
# the watcher orchestrates the procedure and notifies upon completition
# the watcher will notify once it is sufficiently populated
else: # manually trigger notification immediatly
logger.info(f"Found existing article {article}. Now sending")
self.article_complete_notifier(article)
def manual_processing(self, articles, workers):
for w in workers:
w.start()
# def manual_processing(self, articles, workers):
# for w in workers:
# w.start()
for article in articles:
notifier = lambda article: logger.info(f"Completed manual actions for {article}")
ArticleWatcher(article, workers_manual = workers, notifier = notifier) # Article watcher wants a thread to link article to TODO: handle threads as a kwarg
def article_complete_notifier(self, article):
if self.worker_slack is None:
logger.warning("Skipping slack notification because worker is None")
else:
self.worker_slack.bot_worker.respond_channel_message(article)
if self.worker_mail is None:
logger.warning("Skipping mail notification because worker is None")
else:
self.worker_mail.send(article)
article.sent = True
article.save()
# for article in articles:
# notifier = lambda article: logger.info(f"Completed manual actions for {article}")
# ArticleWatcher(article, workers_manual = workers, notifier = notifier) # Article watcher wants a thread to link article to TODO: handle threads as a kwarg
if __name__ == "__main__":
coordinator = Coordinator()
dispatcher = Dispatcher()
if "upload" in sys.argv:
class PrintWorker:
def send(self, article):
print(f"Uploaded article {article}")
articles = models.ArticleDownload.select().where(models.ArticleDownload.archive_url == "" or models.ArticleDownload.archive_url == "TODO:UPLOAD").execute()
logger.info(f"Launching upload to archive for {len(articles)} articles.")
coordinator.manual_processing(articles, [UploadWorker()])
dispatcher.workers_in = [{"UploadWorker": UploadWorker()}]
dispatcher.workers_out = [{"PrintWorker": PrintWorker()}]
dispatcher.start()
else: # launch with full action
slack_runner = slack_runner.BotRunner(coordinator.incoming_request)
kwargs = {
"worker_download" : DownloadWorker(),
"worker_fetch" : FetchWorker(),
"worker_upload" : UploadWorker(),
"worker_compress" : CompressWorker(),
"worker_slack" : slack_runner,
"worker_mail" : mail_runner,
}
try:
coordinator.add_workers(**kwargs)
coordinator.start()
slack_runner = SlackRunner.BotRunner(dispatcher.incoming_request)
# All workers are implemented as a threaded queue. But the individual model requires a specific processing order:
# fetch -> download -> compress -> complete
# This is reflected in the following list of workers:
workers_in = [
OrderedDict({"FetchWorker": FetchWorker(), "DownloadWorker": DownloadWorker(), "CompressWorker": CompressWorker(), "NotifyRunner": "out"}),
OrderedDict({"UploadWorker": UploadWorker()})
]
# The two dicts are processed independently. First element of first dict is called at the same time as the first element of the second dict
# Inside a dict, the order of the keys gives the order of execution (only when the first element is done, the second is called, etc...)
workers_out = [{"SlackRunner": slack_runner},{"MailRunner": MailRunner}]
dispatcher.workers_in = workers_in
dispatcher.workers_out = workers_out
dispatcher.start() # starts the thread, (ie. runs launch())
slack_runner.start() # last one to start, inside the main thread
except KeyboardInterrupt:
logger.info("Keyboard interrupt. Stopping Slack and Coordinator")
logger.info("Keyboard interrupt. Stopping Slack and dispatcher")
slack_runner.stop()
logger.info("BYE!")
# coordinator was set as a daemon thread, so it will be stopped automatically
dispatcher.join()
for w_dict in workers_in:
for w in w_dict.values():
if isinstance(w, Thread):
w.stop()
# All threads are launched as a daemon thread, meaning that any 'leftover' should exit along with the sys call
sys.exit(0)

View File

@@ -157,29 +157,34 @@ class BotApp(App):
if say is None:
say = self.say_substitute
answers = article.slack_info
for a in answers:
if a["file_path"]:
try:
self.client.files_upload(
channels = config["archive_id"],
initial_comment = f"{a['reply_text']}",
file = a["file_path"],
thread_ts = article.slack_ts_full
)
status = True
except SlackApiError as e: # upload resulted in an error
if article.slack_ts == 0:
self.logger.error(f"{article} has no slack_ts")
else:
self.logger.info("Skipping slack reply because it is broken")
for a in []:
# for a in answers:
if a["file_path"]:
try:
self.client.files_upload(
channels = config["archive_id"],
initial_comment = f"{a['reply_text']}",
file = a["file_path"],
thread_ts = article.slack_ts_full
)
# status = True
except SlackApiError as e: # upload resulted in an error
say(
"File {} could not be uploaded.".format(a),
thread_ts = article.slack_ts_full
)
# status = False
self.logger.error(f"File upload failed: {e}")
else: # anticipated that there is no file!
say(
"File {} could not be uploaded.".format(a),
f"{a['reply_text']}",
thread_ts = article.slack_ts_full
)
status = False
self.logger.error(f"File upload failed: {e}")
else: # anticipated that there is no file!
say(
f"{a['reply_text']}",
thread_ts = article.slack_ts_full
)
status = True
# status = True
def startup_status(self):
@@ -230,6 +235,9 @@ class BotRunner():
self.logger.info("Closed Slack-Socketmodehandler")
def send(self, article):
"""Proxy function to send a message to the slack channel, Called by ArticleWatcher once the Article is ready"""
self.bot_worker.respond_channel_message(article)

View File

@@ -7,15 +7,13 @@ class TemplateWorker(Thread):
"""Parent class for any subsequent worker of the article-download pipeline. They should all run in parallel, thus the Thread subclassing"""
logger = logging.getLogger(__name__)
def __init__(self, *args, **kwargs) -> None:
def __init__(self, **kwargs) -> None:
target = self._queue_processor # will be executed on Worker.start()
group = kwargs.get("group", None)
name = kwargs.get("name", None)
super().__init__(group=group, target=target, name=name)
self.keep_running = True
super().__init__(target=target, daemon=True)
self._article_queue = []
self.logger.info(f"Worker thread {self.__class__.__name__} initialized successfully")
def process(self, article_watcher):
self._article_queue.append(article_watcher)#.article_model.article_url)
@@ -23,7 +21,7 @@ class TemplateWorker(Thread):
def _queue_processor(self):
"""This method is launched by thread.run() and idles when self._article_queue is empty. When an external caller appends to the queue it jumps into action"""
while True: # PLEASE tell me if I'm missing an obvious better way of doing this!
while self.keep_running: # PLEASE tell me if I'm missing an obvious better way of doing this!
if len(self._article_queue) == 0:
time.sleep(5)
else:
@@ -39,3 +37,10 @@ class TemplateWorker(Thread):
article = article_watcher.article
article = action(article) # action updates the article object but does not save the change
article.save()
article_watcher.update(self.__class__.__name__)
def stop(self):
self.logger.info(f"Stopping worker {self.__class__.__name__} whith {len(self._article_queue)} articles left in queue")
self.keep_running = False
self.join()

View File

@@ -25,7 +25,7 @@ class DownloadWorker(TemplateWorker):
action = self.dl_runner
super()._handle_article(article_watcher, action)
article_watcher.download_completed = True
# article_watcher.download_completed = True
@@ -36,7 +36,7 @@ class FetchWorker(TemplateWorker):
def _handle_article(self, article_watcher):
action = get_description # function
super()._handle_article(article_watcher, action)
article_watcher.fetch_completed = True
# article_watcher.fetch_completed = True
@@ -52,7 +52,7 @@ class UploadWorker(TemplateWorker):
return run_upload(*args, **kwargs)
super()._handle_article(article_watcher, action)
article_watcher.upload_completed = True
# article_watcher.upload_completed = True
@@ -63,4 +63,4 @@ class CompressWorker(TemplateWorker):
def _handle_article(self, article_watcher):
action = shrink_pdf
super()._handle_article(article_watcher, action)
article_watcher.compression_completed = True
# article_watcher.compression_completed = True