Refactor get process to add multiple sources later on
This commit is contained in:
		| @@ -4,14 +4,15 @@ | ||||
|   # become: true | ||||
|  | ||||
|   vars: | ||||
|     code_dest: /home/remy/eink | ||||
|     repo_dest: /home/remy/eink | ||||
|     code_dest: /home/remy/eink/src | ||||
|     service_target_dir: /etc/systemd/system/ | ||||
|  | ||||
|   tasks: | ||||
|     - name: Pull the latest version of the code | ||||
|       git: | ||||
|         repo: https://git.kluster.moll.re/remoll/rpi-eink-picture-frame | ||||
|         dest: "{{ code_dest }}" | ||||
|         dest: "{{ repo_dest }}" | ||||
|         version: main | ||||
|  | ||||
|     - name: Install pillow dependencies | ||||
| @@ -25,7 +26,7 @@ | ||||
|     - name: Install from the pipenv-file | ||||
|       command: "pipenv install --system --deploy --categories=\"packages prod-packages\"" | ||||
|       args: | ||||
|         chdir: "{{ code_dest }}" | ||||
|         chdir: "{{ repo_dest }}" | ||||
|  | ||||
|     - name: Copy keys python file | ||||
|       copy: | ||||
|   | ||||
| @@ -2,7 +2,7 @@ | ||||
| Description=Run photo update regularly | ||||
|  | ||||
| [Timer] | ||||
| OnCalendar=08:00 | ||||
| OnCalendar=7:30 | ||||
| OnCalendar=16:00 | ||||
| Persistent=true | ||||
|  | ||||
|   | ||||
							
								
								
									
										94
									
								
								image_get.py
									
									
									
									
									
								
							
							
						
						
									
										94
									
								
								image_get.py
									
									
									
									
									
								
							| @@ -1,94 +0,0 @@ | ||||
| import httpx as h | ||||
| import json | ||||
| import random | ||||
| from pathlib import Path | ||||
| import uuid | ||||
| import keys | ||||
|  | ||||
|  | ||||
| class ImageGetException(Exception): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class ImageGetter: | ||||
|     headers = { | ||||
|         "x-api-key": keys.immich_api_key | ||||
|     } | ||||
|     cached_num = 10 | ||||
|     cached_path = Path("./.image-cache/") | ||||
|  | ||||
|  | ||||
|     def get_random_image(self) -> bytearray: | ||||
|         try: | ||||
|             id = self.get_random_image_ids()[0] | ||||
|             bytes = self.get_image_file(id) | ||||
|             self.save_cached_files() | ||||
|         except (h.ConnectError, h.HTTPStatusError, h.NetworkError, h.RequestError, h.DecodingError, h.TransportError, ImageGetException): | ||||
|             print("Loading image from cache") | ||||
|             bytes = self.load_cached_file() | ||||
|  | ||||
|         return bytes | ||||
|  | ||||
|  | ||||
|     def get_random_image_ids(self, num=1) -> str: | ||||
|         url = keys.immich_api_root_url + "album/" + keys.immich_album_id | ||||
|         headers = self.headers | {"Accept": "application/json"} | ||||
|          | ||||
|         response = h.request("GET", url, headers=headers, data={}) | ||||
|         # raises an htppx exception if anything goes wrong | ||||
|         if response.status_code == 200: | ||||
|             response = json.loads(response.text) | ||||
|         else: | ||||
|             raise ImageGetException("Error in step get_random_image_id: " + str(response.status_code)) | ||||
|  | ||||
|         images = response['assets'] | ||||
|  | ||||
|         print(f"Picking {num} random id(s) out of {len(images)} album images") | ||||
|         ids = [] | ||||
|         for i in range(num): | ||||
|             image = random.choice(images) | ||||
|             print(f"Image considered: {image['exifInfo']}") | ||||
|             ids.append(image["id"]) | ||||
|  | ||||
|         return ids | ||||
|  | ||||
|  | ||||
|     def get_image_file(self, image_id: str) -> bytearray: | ||||
|         url = keys.immich_api_root_url + "asset/download/" + image_id | ||||
|  | ||||
|         headers = self.headers | {"Accept": "application/octet-stream"} | ||||
|         response = h.request("POST", url, headers=headers, data={}) | ||||
|         if not response.status_code == 200: | ||||
|             raise ImageGetException("Error in step get_image_file: " + str(response.status_code)) | ||||
|              | ||||
|         return response.content | ||||
|  | ||||
|  | ||||
|     def save_cached_files(self) -> None: | ||||
|         """Ensures self.cached_num files are at self.cached_path at any time""" | ||||
|         if not self.cached_path.exists(): | ||||
|             self.cached_path.mkdir() | ||||
|  | ||||
|         present_count = len(list(self.cached_path.glob("*"))) | ||||
|         missing = self.cached_num - present_count | ||||
|         if missing == 0: | ||||
|             return | ||||
|      | ||||
|         ids = self.get_random_image_ids(missing) | ||||
|         for i, id in enumerate(ids): | ||||
|             print(f"Caching image {i + 1}") | ||||
|             new_cache = self.cached_path / f"{uuid.uuid4()}" | ||||
|             new_cache.write_bytes(self.get_image_file(id)) | ||||
|  | ||||
|  | ||||
|     def load_cached_file(self) -> bytearray: | ||||
|         """Returns a random file from self.cached_path""" | ||||
|         files = list(self.cached_path.glob("*")) | ||||
|  | ||||
|         if len(files) == 0: | ||||
|             raise ImageGetException("Could not load cached file: directory empty") | ||||
|  | ||||
|         file = random.choice(files) | ||||
|         bytes  = file.read_bytes() | ||||
|         file.unlink() | ||||
|         return bytes | ||||
							
								
								
									
										0
									
								
								src/get/combined.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								src/get/combined.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										57
									
								
								src/get/immich.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										57
									
								
								src/get/immich.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,57 @@ | ||||
| import keys | ||||
| import httpx as h | ||||
| import random | ||||
| import json | ||||
|  | ||||
| from .template import ImageGet, ImageGetException | ||||
|  | ||||
| class ImageGetImmich(ImageGet): | ||||
|  | ||||
|     def __init__(self) -> None: | ||||
|         headers = { | ||||
|             "x-api-key": keys.immich_api_key | ||||
|         } | ||||
|  | ||||
|         super().__init__( | ||||
|             base_url=keys.immich_api_root_url, | ||||
|             headers=headers | ||||
|             ) | ||||
|  | ||||
|     def get_random_image_ids(self, num=1) -> str: | ||||
|         url = keys.immich_api_root_url + "album/" + keys.immich_album_id | ||||
|         headers = self.headers | {"Accept": "application/json"} | ||||
|          | ||||
|         response = h.request("GET", url, headers=headers, data={}) | ||||
|         # raises an htppx exception if anything goes wrong | ||||
|         if response.status_code == 200: | ||||
|             response = json.loads(response.text) | ||||
|         else: | ||||
|             raise ImageGetException(f"Error in step get_random_image_id: {response.status_code}") | ||||
|  | ||||
|         images = response['assets'] | ||||
|  | ||||
|         print(f"Picking {num} random id(s) out of {len(images)} album images") | ||||
|         ids = [] | ||||
|         for i in range(num): | ||||
|             image = random.choice(images) | ||||
|             print(f"Image considered: {image['exifInfo']}") | ||||
|             ids.append(image["id"]) | ||||
|  | ||||
|         return ids | ||||
|  | ||||
|  | ||||
|     def get_image_file(self, image_id: str) -> bytearray: | ||||
|         url = keys.immich_api_root_url + "asset/download/" + image_id | ||||
|  | ||||
|         headers = self.headers | {"Accept": "application/octet-stream"} | ||||
|         response = h.request("POST", url, headers=headers, data={}) | ||||
|         if not response.status_code == 200: | ||||
|             raise ImageGetException("Error in step get_image_file: " + str(response.status_code)) | ||||
|              | ||||
|         return response.content | ||||
|  | ||||
|  | ||||
|     # def save_cached_files(self) -> None: | ||||
|         # in super | ||||
|         # return | ||||
|  | ||||
							
								
								
									
										24
									
								
								src/get/public.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								src/get/public.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| import keys | ||||
| import httpx as h | ||||
| import random | ||||
| import json | ||||
|  | ||||
| from image_get import ImageGet, ImageGetException | ||||
|  | ||||
| class ImageGetImmich(ImageGet): | ||||
|  | ||||
|     def __init__(self) -> None: | ||||
|         headers = { | ||||
|             "AIC-User-Agent": "Eink Art Display (remoll@ethz.ch)" | ||||
|         } | ||||
|  | ||||
|         super().__init__( | ||||
|             base_url=keys.immich_api_root_url, | ||||
|             headers=self.headers | ||||
|             ) | ||||
|      | ||||
|     def get_random_image_ids(self) -> list[str]: | ||||
|         raise NotImplementedError | ||||
|      | ||||
|     def get_image_file(self, image_id: str) -> bytearray: | ||||
|         raise NotImplementedError | ||||
							
								
								
									
										67
									
								
								src/get/template.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										67
									
								
								src/get/template.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,67 @@ | ||||
| import httpx as h | ||||
| import random | ||||
| from pathlib import Path | ||||
| import uuid | ||||
|  | ||||
|  | ||||
| class ImageGetException(Exception): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class ImageGet: | ||||
|     def __init__(self, base_url, headers, cache_num = 10) -> None: | ||||
|         self.base_url = base_url | ||||
|         self.headers = headers | ||||
|         self.cache_num = cache_num | ||||
|         self.cache_dir = Path("../.cache/{self.__class__.__name__}/") | ||||
|         self.cache_dir.mkdir(parents=True, exist_ok=True) | ||||
|  | ||||
|  | ||||
|     # Main entrypoint, called externally | ||||
|     def get_random_image(self) -> bytearray: | ||||
|         try: | ||||
|             id = self.get_random_image_ids()[0] | ||||
|             bytes = self.get_image_file(id) | ||||
|             # self.save_cached_files() | ||||
|         except (h.ConnectError, h.HTTPStatusError, h.NetworkError, h.RequestError, h.DecodingError, h.TransportError, ImageGetException): | ||||
|             print("Loading image from cache") | ||||
|             bytes = self.load_cached_file() | ||||
|  | ||||
|         return bytes | ||||
|  | ||||
|  | ||||
|     def save_cached_files(self) -> None: | ||||
|         """Ensures self.cache_num files are at self.cache_dir at any time""" | ||||
|  | ||||
|         present_count = len(list(self.cached_path.glob("*"))) | ||||
|         missing = self.cached_num - present_count | ||||
|         if missing == 0: | ||||
|             return | ||||
|      | ||||
|         ids = self.get_random_image_ids(missing) | ||||
|         for i, id in enumerate(ids): | ||||
|             print(f"Caching image {i + 1}") | ||||
|             new_cache = self.cached_path / f"{uuid.uuid4()}" | ||||
|             new_cache.write_bytes(self.get_image_file(id)) | ||||
|  | ||||
|  | ||||
|     def load_cached_file(self) -> bytearray: | ||||
|         """Returns a random file from self.cached_path""" | ||||
|         files = list(self.cached_path.glob("*")) | ||||
|  | ||||
|         if len(files) == 0: | ||||
|             raise ImageGetException("Could not load cached file: directory empty") | ||||
|  | ||||
|         file = random.choice(files) | ||||
|         bytes  = file.read_bytes() | ||||
|         file.unlink() | ||||
|         return bytes | ||||
|  | ||||
|  | ||||
|  | ||||
|     def get_random_image_ids(self) -> list[str]: | ||||
|         raise NotImplementedError | ||||
|      | ||||
|  | ||||
|     def get_image_file(self, image_id: str) -> bytearray: | ||||
|         raise NotImplementedError | ||||
| @@ -1,12 +1,12 @@ | ||||
| import sys | ||||
| from image_convert import ImageShrink | ||||
| from image_get import ImageGetter | ||||
| from get import immich | ||||
| from image_show import ImageShow | ||||
| 
 | ||||
| if len(sys.argv) == 2 and sys.argv[1] == "test": | ||||
|     print("Running test") | ||||
|     show = ImageShow() | ||||
|     show.draw_sample_image() | ||||
|     shower = ImageShow() | ||||
|     shower.draw_sample_image() | ||||
|     sys.exit() | ||||
| 
 | ||||
| 
 | ||||
| @@ -18,10 +18,10 @@ if "noreduce" in sys.argv: | ||||
|     print("Disabling color reduction") | ||||
|     shrink_kwargs["colors"] = -1 | ||||
| 
 | ||||
| get = ImageGetter() | ||||
| convert = ImageShrink(**shrink_kwargs) | ||||
| show = ImageShow() | ||||
| getter = immich.ImageGetImmich() | ||||
| converter = ImageShrink(**shrink_kwargs) | ||||
| shower = ImageShow() | ||||
| 
 | ||||
| image = get.get_random_image() | ||||
| image = convert.convert(image) | ||||
| show.show_image(image) | ||||
| image = getter.get_random_image() | ||||
| image = converter.convert(image) | ||||
| shower.show_image(image) | ||||
		Reference in New Issue
	
	Block a user