import logging import time import datetime import os, shutil, uuid from pathlib import Path import base64 import requests from selenium import webdriver import configuration config = configuration.main_config["DOWNLOADS"] def driver_running(f): def wrapper(*args, **kwargs): self = args[0] if not self._running: self.start() return f(*args, **kwargs) return wrapper class PDFDownloader: """Saves a given url. Fills the object it got as a parameter""" logger = logging.getLogger(__name__) _running = False def start(self): """Called externally to start the driver, but after an exception can also be called internally""" if self._running: self.finish() # clear up self.logger.info("Starting geckodriver") reduced_path = self.create_tmp_profile() profile = webdriver.FirefoxProfile(reduced_path) options = webdriver.FirefoxOptions() if os.getenv("DEBUG", "false") == "true": self.logger.warning("Opening browser GUI because of 'DEBUG=true'") else: options.add_argument('--headless') self.driver = webdriver.Remote( command_executor = 'http://geckodriver:4444', # the host geckodriver points to the geckodriver container options = options, browser_profile = profile ) self._running = True def finish(self): self.logger.info("Exiting Geckodriver") try: self.driver.quit() time.sleep(10) except: self.logger.critical("Connection to the driver broke off") self._running = False @driver_running def download(self, article_object): sleep_time = int(config["browser_print_delay"]) url = article_object.article_url try: self.driver.get(url) except Exception as e: self.logger.critical("Selenium .get(url) failed with error {}".format(e)) self.finish() return article_object # without changes time.sleep(sleep_time) # leave the page time to do any funky business # in the mean time, get a page title if required if article_object.is_title_bad: article_object.title = self.driver.title.replace(".pdf", "") # some titles end with .pdf # will be propagated to the saved file (dst) as well fname = article_object.fname_template fname = ensure_unique(article_object.save_path, fname) dst = os.path.join(article_object.save_path, fname) if url[-4:] == ".pdf": # calling the ususal pdf generation would not yield a nice pdf, just download it directly success = self.get_exisiting_pdf(url, dst) else: success = self.get_new_pdf(dst) if success: article_object.file_name = fname else: article_object.file_name = "" return article_object # this change is saved later by the external caller def get_exisiting_pdf(self, url, dst): try: r = requests.get(url) bytes = r.content except: return False return self.get_new_pdf(dst, other_bytes=bytes) def get_new_pdf(self, dst, other_bytes=None): os.makedirs(os.path.dirname(dst), exist_ok=True) if other_bytes is None: try: result = self.driver.print_page() bytes = base64.b64decode(result, validate=True) except: self.logger.error("Failed, probably because the driver went extinct.") return False else: bytes = other_bytes try: with open(dst, "wb+") as f: f.write(bytes) return True except Exception as e: self.logger.error(f"Failed, because of FS-operation: {e}") return False def create_tmp_profile(self, full_profile_path: Path = Path(config["browser_profile_path"])) -> Path: reduced_profile_path = Path(f"/tmp/firefox_profile_{uuid.uuid4()}") os.mkdir(reduced_profile_path) # copy needed directories dirs = ["extensions", "storage"] for dir in dirs: shutil.copytree(full_profile_path / dir, reduced_profile_path / dir) # copy needed files files = ["extension-preferences.json", "addons.json", "addonStartup.json.lz4", "prefs.js", "extensions.json", "cookies.sqlite"] for f in files: shutil.copy(full_profile_path / f, reduced_profile_path) folder_size = round(sum(p.stat().st_size for p in Path(reduced_profile_path).rglob('*')) / 1024 / 1024, 3) self.logger.info(f"Generated temporary profile at {reduced_profile_path} with size {folder_size} MB") return reduced_profile_path def ensure_unique(path, fname): fbase, ending = os.path.splitext(fname) exists = os.path.exists(os.path.join(path, fname)) i = 1 while exists: fname = fbase + f" -- fetch {i}" + ending i += 1 exists = os.path.exists(os.path.join(path, fname)) return fname