168 lines
5.5 KiB
Python
168 lines
5.5 KiB
Python
import time
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import base64
|
|
import requests
|
|
from selenium import webdriver
|
|
import configuration
|
|
import json
|
|
|
|
config = configuration.parsed["DOWNLOADS"]
|
|
blacklisted = json.loads(config["blacklisted_href_domains"])
|
|
|
|
|
|
class PDFDownloader:
|
|
"""Saves a given url. Fills the object it got as a parameter"""
|
|
logger = logging.getLogger(__name__)
|
|
# status-variable for restarting:
|
|
running = False
|
|
|
|
def start(self):
|
|
self.finish() # clear up
|
|
|
|
options = webdriver.FirefoxOptions()
|
|
options.profile = config["browser_profile_path"]
|
|
# should be options.set_preference("profile", config["browser_profile_path"]) as of selenium 4 but that doesn't work
|
|
|
|
if os.getenv("HEADLESS", "false") == "true":
|
|
options.add_argument('--headless')
|
|
else:
|
|
self.logger.warning("Opening browser GUI because of 'HEADLESS=true'")
|
|
|
|
options.set_preference('print.save_as_pdf.links.enabled', True)
|
|
# Just save if the filetype is pdf already, does not work!
|
|
|
|
options.set_preference("print.printer_Mozilla_Save_to_PDF.print_to_file", True)
|
|
options.set_preference("browser.download.folderList", 2)
|
|
# options.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/pdf")
|
|
# options.set_preference("pdfjs.disabled", True)
|
|
options.set_preference("browser.download.dir", config["default_download_path"])
|
|
|
|
self.logger.info("Starting gecko driver")
|
|
# self.driver = webdriver.Firefox(
|
|
# options = options,
|
|
# service = webdriver.firefox.service.Service(
|
|
# log_path = f'{config["local_storage_path"]}/geckodriver.log'
|
|
# ))
|
|
self.driver = webdriver.Remote(
|
|
command_executor = 'http://localhost:4444',
|
|
options = options,
|
|
# can't set log path...
|
|
)
|
|
|
|
residues = os.listdir(config["default_download_path"])
|
|
for res in residues:
|
|
os.remove(os.path.join(config["default_download_path"], res))
|
|
|
|
self.running = True
|
|
|
|
def autostart(self):
|
|
if not self.running:
|
|
self.start() # relaunch the dl util
|
|
|
|
def finish(self):
|
|
if self.running:
|
|
self.logger.info("Exiting gecko driver")
|
|
self.driver.quit()
|
|
self.running = False
|
|
else:
|
|
self.logger.info("Gecko driver not yet running")
|
|
|
|
def download(self, article_object):
|
|
sleep_time = 1
|
|
self.autostart()
|
|
url = article_object.article_url
|
|
|
|
try:
|
|
self.driver.get(url)
|
|
except Exception as e:
|
|
self.logger.critical("Selenium .get(url) failed with error {}".format(e))
|
|
self.finish()
|
|
return article_object # without changes
|
|
|
|
time.sleep(sleep_time)
|
|
# leave the page time to do any funky business
|
|
|
|
# in the mean time, get a page title if required
|
|
if article_object.is_title_bad:
|
|
article_object.title = self.driver.title.replace(".pdf", "")
|
|
# will be propagated to dst as well
|
|
|
|
fname = article_object.fname_template
|
|
dst = os.path.join(article_object.save_path, fname)
|
|
if os.path.exists(dst):
|
|
fname = make_path_unique(fname)
|
|
dst = os.path.join(article_object.save_path, fname)
|
|
|
|
|
|
if url[-4:] == ".pdf":
|
|
# according to the browser preferences, calling the url will open pdfjs.
|
|
# If not handled separately, printing would require the ctrl+p route, but setup is janky to say the least
|
|
success = self.get_exisiting_pdf(url, dst)
|
|
else:
|
|
success = self.get_new_pdf(dst)
|
|
|
|
|
|
if success:
|
|
article_object.file_name = fname
|
|
article_object.set_references(self.get_references())
|
|
else:
|
|
article_object.file_name = ""
|
|
|
|
return article_object # this change is saved later manually
|
|
|
|
|
|
def get_exisiting_pdf(self, url, dst):
|
|
try:
|
|
r = requests.get(url)
|
|
bytes = r.content
|
|
except:
|
|
return False
|
|
return self.get_new_pdf(dst, other_bytes=bytes)
|
|
|
|
|
|
def get_new_pdf(self, dst, other_bytes=None):
|
|
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
|
|
|
if other_bytes is None:
|
|
try:
|
|
result = self.driver.print_page()
|
|
bytes = base64.b64decode(result, validate=True)
|
|
except:
|
|
self.logger.error("Failed, probably because the driver went extinct.")
|
|
return False
|
|
else:
|
|
bytes = other_bytes
|
|
|
|
try:
|
|
with open(dst, "wb+") as f:
|
|
f.write(bytes)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed, because of FS-operation: {e}")
|
|
return False
|
|
|
|
|
|
def get_references(self):
|
|
try:
|
|
hrefs = [e.get_attribute("href") for e in self.driver.find_elements_by_xpath("//a[@href]")]
|
|
except:
|
|
hrefs = []
|
|
len_old = len(hrefs)
|
|
hrefs = [h for h in hrefs \
|
|
if not sum([(domain in h) for domain in blacklisted]) # sum([True, False, False, False]) == 1 (esp. not 0)
|
|
] # filter a tiny bit at least
|
|
self.logger.info(f"Hrefs filtered (before: {len_old}, after: {len(hrefs)})")
|
|
return hrefs
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_path_unique(path):
|
|
fname, ending = os.path.splitext(path)
|
|
fname += datetime.datetime.now().strftime("%d-%H%M%S")
|
|
return fname + ending |