import httpx as h import json import random from pathlib import Path import uuid import keys class ImageGetException(Exception): pass class ImageGetter: headers = { "x-api-key": keys.immich_api_key } cached_num = 10 cached_path = Path("./.image-cache/") def get_random_image(self) -> bytearray: try: id = self.get_random_image_ids()[0] bytes = self.get_image_file(id) self.save_cached_files() except (h.ConnectError, h.HTTPStatusError, h.NetworkError, h.RequestError, h.DecodingError, h.TransportError, ImageGetException): print("Loading image from cache") bytes = self.load_cached_file() return bytes def get_random_image_ids(self, num=1) -> str: url = keys.immich_api_root_url + "album/" + keys.immich_album_id headers = self.headers | {"Accept": "application/json"} response = h.request("GET", url, headers=headers, data={}) # raises an htppx exception if anything goes wrong if response.status_code == 200: response = json.loads(response.text) else: raise ImageGetException("Error in step get_random_image_id: " + str(response.status_code)) images = response['assets'] print(f"Picking {num} random id(s) out of {len(images)} album images") ids = [] for i in range(num): image = random.choice(images) print(f"Image considered: {image['exifInfo']}") ids.append(image["id"]) return ids def get_image_file(self, image_id: str) -> bytearray: url = keys.immich_api_root_url + "asset/download/" + image_id headers = self.headers | {"Accept": "application/octet-stream"} response = h.request("POST", url, headers=headers, data={}) if not response.status_code == 200: raise ImageGetException("Error in step get_image_file: " + str(response.status_code)) return response.content def save_cached_files(self) -> None: """Ensures self.cached_num files are at self.cached_path at any time""" if not self.cached_path.exists(): self.cached_path.mkdir() present_count = len(list(self.cached_path.glob("*"))) missing = self.cached_num - present_count if missing == 0: return ids = self.get_random_image_ids(missing) for i, id in enumerate(ids): print(f"Caching image {i + 1}") new_cache = self.cached_path / f"{uuid.uuid4()}" new_cache.write_bytes(self.get_image_file(id)) def load_cached_file(self) -> bytearray: """Returns a random file from self.cached_path""" files = list(self.cached_path.glob("*")) if len(files) == 0: raise ImageGetException("Could not load cached file: directory empty") file = random.choice(files) bytes = file.read_bytes() file.unlink() return bytes