42 lines
1.8 KiB
Python
42 lines
1.8 KiB
Python
from threading import Thread
|
|
import time
|
|
import logging
|
|
|
|
|
|
class TemplateWorker(Thread):
|
|
"""Parent class for any subsequent worker of the article-download pipeline. They should all run in parallel, thus the Thread subclassing"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
target = self._queue_processor # will be executed on Worker.start()
|
|
group = kwargs.get("group", None)
|
|
name = kwargs.get("name", None)
|
|
|
|
super().__init__(group=group, target=target, name=name)
|
|
self._article_queue = []
|
|
self.logger.info(f"Worker thread {self.__class__.__name__} initialized successfully")
|
|
|
|
|
|
def process(self, article_watcher):
|
|
self._article_queue.append(article_watcher)#.article_model.article_url)
|
|
|
|
|
|
def _queue_processor(self):
|
|
"""This method is launched by thread.run() and idles when self._article_queue is empty. When an external caller appends to the queue it jumps into action"""
|
|
while True: # PLEASE tell me if I'm missing an obvious better way of doing this!
|
|
if len(self._article_queue) == 0:
|
|
time.sleep(5)
|
|
else:
|
|
article_watcher = self._article_queue.pop(0)
|
|
self.logger.info(f"{self.__class__.__name__} now processing from queue (length: {len(self._article_queue)}) - {article_watcher.article}")
|
|
self._handle_article(article_watcher)
|
|
|
|
|
|
def _handle_article(self, article_watcher, action=None):
|
|
if action is None:
|
|
self.logger.error("Unoverloaded call of _handle_article(). This should not occur in prod")
|
|
else:
|
|
article = article_watcher.article
|
|
article = action(article) # action updates the article object but does not save the change
|
|
article.save()
|