2022-10-06 15:55:30 +02:00

167 lines
5.1 KiB
Python

import logging
import time
import datetime
import os, shutil, uuid
from pathlib import Path
import base64
import requests
from selenium import webdriver
import configuration
config = configuration.main_config["DOWNLOADS"]
def driver_running(f):
def wrapper(*args, **kwargs):
self = args[0]
if not self._running:
self.start()
return f(*args, **kwargs)
return wrapper
class PDFDownloader:
"""Saves a given url. Fills the object it got as a parameter"""
logger = logging.getLogger(__name__)
_running = False
def start(self):
"""Called externally to start the driver, but after an exception can also be called internally"""
if self._running:
self.finish() # clear up
self.logger.info("Starting geckodriver")
reduced_path = self.create_tmp_profile()
profile = webdriver.FirefoxProfile(reduced_path)
options = webdriver.FirefoxOptions()
if os.getenv("DEBUG", "false") == "true":
self.logger.warning("Opening browser GUI because of 'DEBUG=true'")
else:
options.add_argument('--headless')
self.driver = webdriver.Remote(
command_executor = 'http://geckodriver:4444', # the host geckodriver points to the geckodriver container
options = options,
browser_profile = profile
)
self._running = True
def finish(self):
self.logger.info("Exiting Geckodriver")
try:
self.driver.quit()
time.sleep(10)
except:
self.logger.critical("Connection to the driver broke off")
self._running = False
@driver_running
def download(self, article_object):
sleep_time = int(config["browser_print_delay"])
url = article_object.article_url
try:
self.driver.get(url)
except Exception as e:
self.logger.critical("Selenium .get(url) failed with error {}".format(e))
self.finish()
return article_object # without changes
time.sleep(sleep_time)
# leave the page time to do any funky business
# in the mean time, get a page title if required
if article_object.is_title_bad:
article_object.title = self.driver.title.replace(".pdf", "") # some titles end with .pdf
# will be propagated to the saved file (dst) as well
fname = article_object.fname_template
fname = ensure_unique(article_object.save_path, fname)
dst = os.path.join(article_object.save_path, fname)
if url[-4:] == ".pdf": # calling the ususal pdf generation would not yield a nice pdf, just download it directly
success = self.get_exisiting_pdf(url, dst)
else:
success = self.get_new_pdf(dst)
if success:
article_object.file_name = fname
else:
article_object.file_name = ""
return article_object # this change is saved later by the external caller
def get_exisiting_pdf(self, url, dst):
try:
r = requests.get(url)
bytes = r.content
except:
return False
return self.get_new_pdf(dst, other_bytes=bytes)
def get_new_pdf(self, dst, other_bytes=None):
os.makedirs(os.path.dirname(dst), exist_ok=True)
if other_bytes is None:
try:
result = self.driver.print_page()
bytes = base64.b64decode(result, validate=True)
except:
self.logger.error("Failed, probably because the driver went extinct.")
return False
else:
bytes = other_bytes
try:
with open(dst, "wb+") as f:
f.write(bytes)
return True
except Exception as e:
self.logger.error(f"Failed, because of FS-operation: {e}")
return False
def create_tmp_profile(self, full_profile_path: Path = Path(config["browser_profile_path"])) -> Path:
reduced_profile_path = Path(f"/tmp/firefox_profile_{uuid.uuid4()}")
os.mkdir(reduced_profile_path)
# copy needed directories
dirs = ["extensions", "storage"]
for dir in dirs:
shutil.copytree(full_profile_path / dir, reduced_profile_path / dir)
# copy needed files
files = ["extension-preferences.json", "addons.json", "addonStartup.json.lz4", "prefs.js", "extensions.json", "cookies.sqlite"]
for f in files:
shutil.copy(full_profile_path / f, reduced_profile_path)
folder_size = round(sum(p.stat().st_size for p in Path(reduced_profile_path).rglob('*')) / 1024 / 1024, 3)
self.logger.info(f"Generated temporary profile at {reduced_profile_path} with size {folder_size} MB")
return reduced_profile_path
def ensure_unique(path, fname):
fbase, ending = os.path.splitext(fname)
exists = os.path.exists(os.path.join(path, fname))
i = 1
while exists:
fname = fbase + f" -- fetch {i}" + ending
i += 1
exists = os.path.exists(os.path.join(path, fname))
return fname