import logging import time import datetime import os, shutil, uuid from pathlib import Path import base64 import requests from selenium import webdriver import configuration download_config = configuration.config["downloads"] def driver_running(f): def wrapper(*args, **kwargs): self = args[0] if not self._running: self.start() return f(*args, **kwargs) return wrapper class PDFDownloader: """Saves a given url. Fills the object it got as a parameter""" logger = logging.getLogger(__name__) _running = False def start(self): """Called externally to start the driver, but after an exception can also be called internally""" if self._running: self.finish() # clear up self.logger.info("Starting geckodriver") reduced_path = self.create_tmp_profile() profile = webdriver.FirefoxProfile(reduced_path) options = webdriver.FirefoxOptions() if os.getenv("DEBUG", "false") == "true": self.logger.warning("Opening browser GUI because of 'DEBUG=true'") else: options.add_argument('--headless') self.driver = webdriver.Remote( command_executor = 'http://geckodriver:4444', # the host geckodriver points to the geckodriver container options = options, browser_profile = profile ) self._running = True def finish(self): self.logger.info("Exiting Geckodriver") try: self.driver.quit() time.sleep(10) except: self.logger.critical("Connection to the driver broke off") self._running = False @driver_running def download(self, article_object): url = article_object.article_url if url[-4:] == ".pdf": # calling the ususal pdf generation would not yield a nice pdf, just download it directly self.logger.info("Downloading existing pdf") success = self.get_exisiting_pdf(article_object) # get a page title if required if article_object.is_title_bad: article_object.title = self.driver.title.replace(".pdf", "") # some titles end with .pdf # will be propagated to the saved file (dst) as well else: success = self.get_new_pdf(article_object) if not success: self.logger.error("Download failed") # TODO: need to reset the file name to empty? return article_object # changes to this are saved later by the external caller def get_exisiting_pdf(self, article_object): # get a better page title if required if article_object.is_title_bad: article_object.title = article_object.article_url.split("/")[-1].split(".pdf")[0] try: r = requests.get(article_object.article_url) bytes = r.content except: return False return self.write_pdf(bytes, article_object) def get_new_pdf(self, article_object): sleep_time = int(download_config["browser_print_delay"]) try: self.driver.get(article_object.article_url) except Exception as e: self.logger.critical("Selenium .get(url) failed with error {}".format(e)) self.finish() return False time.sleep(sleep_time) # leave the page time to do any funky business if article_object.is_title_bad: article_object.title = self.driver.title try: result = self.driver.print_page() bytes = base64.b64decode(result, validate=True) except: self.logger.error("Failed, probably because the driver went extinct.") return False return self.write_pdf(bytes, article_object) def get_file_destination(self, article_object): fname = article_object.fname_template fname = ensure_unique(article_object.save_path, fname) dst = os.path.join(article_object.save_path, fname) return dst, fname def write_pdf(self, content, article_object): dst, fname = self.get_file_destination(article_object) os.makedirs(os.path.dirname(dst), exist_ok=True) try: with open(dst, "wb+") as f: f.write(content) article_object.file_name = fname return True except Exception as e: self.logger.error(f"Failed, because of FS-operation: {e}") return False def create_tmp_profile(self, full_profile_path: Path = Path(download_config["browser_profile_path"])) -> Path: reduced_profile_path = Path(f"/tmp/firefox_profile_{uuid.uuid4()}") os.mkdir(reduced_profile_path) # copy needed directories dirs = ["extensions", "storage"] for dir in dirs: shutil.copytree(full_profile_path / dir, reduced_profile_path / dir) # copy needed files files = ["extension-preferences.json", "addons.json", "addonStartup.json.lz4", "prefs.js", "extensions.json", "cookies.sqlite"] for f in files: shutil.copy(full_profile_path / f, reduced_profile_path) folder_size = round(sum(p.stat().st_size for p in Path(reduced_profile_path).rglob('*')) / 1024 / 1024, 3) self.logger.info(f"Generated temporary profile at {reduced_profile_path} with size {folder_size} MB") return reduced_profile_path def ensure_unique(path, fname): fbase, ending = os.path.splitext(fname) exists = os.path.exists(os.path.join(path, fname)) i = 1 while exists: fname = fbase + f" -- fetch {i}" + ending i += 1 exists = os.path.exists(os.path.join(path, fname)) return fname