95 lines
3.0 KiB
Python
95 lines
3.0 KiB
Python
import httpx as h
|
|
import json
|
|
import random
|
|
from pathlib import Path
|
|
import uuid
|
|
import keys
|
|
|
|
|
|
class ImageGetException(Exception):
|
|
pass
|
|
|
|
|
|
class ImageGetter:
|
|
headers = {
|
|
"x-api-key": keys.immich_api_key
|
|
}
|
|
cached_num = 10
|
|
cached_path = Path("./.image-cache/")
|
|
|
|
|
|
def get_random_image(self) -> bytearray:
|
|
try:
|
|
id = self.get_random_image_ids()[0]
|
|
bytes = self.get_image_file(id)
|
|
self.save_cached_files()
|
|
except (h.ConnectError, h.HTTPStatusError, h.NetworkError, h.RequestError, h.DecodingError, h.TransportError, ImageGetException):
|
|
print("Loading image from cache")
|
|
bytes = self.load_cached_file()
|
|
|
|
return bytes
|
|
|
|
|
|
def get_random_image_ids(self, num=1) -> str:
|
|
url = keys.immich_api_root_url + "album/" + keys.immich_album_id
|
|
headers = self.headers | {"Accept": "application/json"}
|
|
|
|
response = h.request("GET", url, headers=headers, data={})
|
|
# raises an htppx exception if anything goes wrong
|
|
if response.status_code == 200:
|
|
response = json.loads(response.text)
|
|
else:
|
|
raise ImageGetException("Error in step get_random_image_id: " + str(response.status_code))
|
|
|
|
images = response['assets']
|
|
|
|
print(f"Picking {num} random id(s) out of {len(images)} album images")
|
|
ids = []
|
|
for i in range(num):
|
|
image = random.choice(images)
|
|
print(f"Image considered: {image['exifInfo']}")
|
|
ids.append(image["id"])
|
|
|
|
return ids
|
|
|
|
|
|
def get_image_file(self, image_id: str) -> bytearray:
|
|
url = keys.immich_api_root_url + "asset/download/" + image_id
|
|
|
|
headers = self.headers | {"Accept": "application/octet-stream"}
|
|
response = h.request("POST", url, headers=headers, data={})
|
|
if not response.status_code == 200:
|
|
raise ImageGetException("Error in step get_image_file: " + str(response.status_code))
|
|
|
|
return response.content
|
|
|
|
|
|
def save_cached_files(self) -> None:
|
|
"""Ensures self.cached_num files are at self.cached_path at any time"""
|
|
if not self.cached_path.exists():
|
|
self.cached_path.mkdir()
|
|
|
|
present_count = len(list(self.cached_path.glob("*")))
|
|
missing = self.cached_num - present_count
|
|
if missing == 0:
|
|
return
|
|
|
|
ids = self.get_random_image_ids(missing)
|
|
for i, id in enumerate(ids):
|
|
print(f"Caching image {i + 1}")
|
|
new_cache = self.cached_path / f"{uuid.uuid4()}"
|
|
new_cache.write_bytes(self.get_image_file(id))
|
|
|
|
|
|
def load_cached_file(self) -> bytearray:
|
|
"""Returns a random file from self.cached_path"""
|
|
files = list(self.cached_path.glob("*"))
|
|
|
|
if len(files) == 0:
|
|
raise ImageGetException("Could not load cached file: directory empty")
|
|
|
|
file = random.choice(files)
|
|
bytes = file.read_bytes()
|
|
file.unlink()
|
|
return bytes
|