Cache a fixed amount of image in case of connectivity loss
This commit is contained in:
		
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -1,2 +1,3 @@
 | 
				
			|||||||
keys.py
 | 
					keys.py
 | 
				
			||||||
*.pyc
 | 
					*.pyc
 | 
				
			||||||
 | 
					.image-cache/
 | 
				
			||||||
@@ -42,6 +42,7 @@ class ImageShrink:
 | 
				
			|||||||
        image = self.shrink(image)
 | 
					        image = self.shrink(image)
 | 
				
			||||||
        image = self.convert_to_reduced_colors(image)
 | 
					        image = self.convert_to_reduced_colors(image)
 | 
				
			||||||
        # image.save("test.png")
 | 
					        # image.save("test.png")
 | 
				
			||||||
 | 
					        return image
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def shrink(self, image: Image) -> Image:
 | 
					    def shrink(self, image: Image) -> Image:
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										54
									
								
								image_get.py
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								image_get.py
									
									
									
									
									
								
							@@ -1,7 +1,8 @@
 | 
				
			|||||||
import httpx as h
 | 
					import httpx as h
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
import random
 | 
					import random
 | 
				
			||||||
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					import uuid
 | 
				
			||||||
import keys
 | 
					import keys
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -15,29 +16,43 @@ class ImageGetter:
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def __init__(self):
 | 
					    def __init__(self):
 | 
				
			||||||
 | 
					        self.cached_num = 10
 | 
				
			||||||
 | 
					        self.cached_path = Path("./.image-cache/")
 | 
				
			||||||
        pass
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_random_image(self) -> bytearray:
 | 
					    def get_random_image(self) -> bytearray:
 | 
				
			||||||
        id = self.get_random_image_id()
 | 
					        try:
 | 
				
			||||||
        bytes = self.get_image_file(id)
 | 
					            ids = self.get_random_image_ids(num=self.cached_num + 1)
 | 
				
			||||||
 | 
					            bytes = self.get_image_file(ids[0])
 | 
				
			||||||
 | 
					            self.save_cached_files(ids[1:])
 | 
				
			||||||
 | 
					        except (h.ConnectError, h.HTTPStatusError, h.NetworkError, h.RequestError, h.DecodingError, h.TransportError, ImageGetException):
 | 
				
			||||||
 | 
					            print("Loading image from cache")
 | 
				
			||||||
 | 
					            bytes = self.load_cached_file()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return bytes
 | 
					        return bytes
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_random_image_id(self) -> str:
 | 
					    def get_random_image_ids(self, num=1) -> str:
 | 
				
			||||||
        url = keys.immich_api_root_url + "album/" + keys.immich_album_id
 | 
					        url = keys.immich_api_root_url + "album/" + keys.immich_album_id
 | 
				
			||||||
        headers = self.headers | {"Accept": "application/json"}
 | 
					        headers = self.headers | {"Accept": "application/json"}
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        response = h.request("GET", url, headers=headers, data={})
 | 
					        response = h.request("GET", url, headers=headers, data={})
 | 
				
			||||||
 | 
					        # raises an htppx exception if anything goes wrong
 | 
				
			||||||
        if response.status_code == 200:
 | 
					        if response.status_code == 200:
 | 
				
			||||||
            response = json.loads(response.text)
 | 
					            response = json.loads(response.text)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            raise ImageGetException("Error in step get_random_image_id: " + str(response.status_code))
 | 
					            raise ImageGetException("Error in step get_random_image_id: " + str(response.status_code))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        images = response['assets']
 | 
					        images = response['assets']
 | 
				
			||||||
        print(f"Picking random image out of {len(images)} album images")
 | 
					
 | 
				
			||||||
        image = random.choice(images)
 | 
					        print(f"Picking {num} random id(s) out of {len(images)} album images")
 | 
				
			||||||
        return image["id"]
 | 
					        ids = []
 | 
				
			||||||
 | 
					        for i in range(num):
 | 
				
			||||||
 | 
					            image = random.choice(images)
 | 
				
			||||||
 | 
					            ids.append(image["id"])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return ids
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_image_file(self, image_id: str) -> bytearray:
 | 
					    def get_image_file(self, image_id: str) -> bytearray:
 | 
				
			||||||
@@ -49,3 +64,28 @@ class ImageGetter:
 | 
				
			|||||||
            raise ImageGetException("Error in step get_image_file: " + str(response.status_code))
 | 
					            raise ImageGetException("Error in step get_image_file: " + str(response.status_code))
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
        return response.content
 | 
					        return response.content
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def save_cached_files(self, image_ids: list[str]) -> None:
 | 
				
			||||||
 | 
					        """Ensures self.cached_num files are at self.cached_path at any time"""
 | 
				
			||||||
 | 
					        if not self.cached_path.exists():
 | 
				
			||||||
 | 
					            self.cached_path.mkdir()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        present_count = len(list(self.cached_path.glob("*")))
 | 
				
			||||||
 | 
					        for i in range(self.cached_num -  present_count):
 | 
				
			||||||
 | 
					            print(f"Caching image {i + 1}")
 | 
				
			||||||
 | 
					            new_cache = self.cached_path / f"{uuid.uuid4()}"
 | 
				
			||||||
 | 
					            new_cache.write_bytes(self.get_image_file(image_ids[i]))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def load_cached_file(self) -> bytearray:
 | 
				
			||||||
 | 
					        """Returns a random file from self.cached_path"""
 | 
				
			||||||
 | 
					        files = list(self.cached_path.glob("*"))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if len(files) == 0:
 | 
				
			||||||
 | 
					            raise ImageGetException("Could not load cached file: directory empty")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        file = random.choice(files)
 | 
				
			||||||
 | 
					        bytes  = file.read_bytes()
 | 
				
			||||||
 | 
					        file.unlink()
 | 
				
			||||||
 | 
					        return bytes
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,7 +27,7 @@ class ImageShow:
 | 
				
			|||||||
        if image.size != (self.epd.width, self.epd.height):
 | 
					        if image.size != (self.epd.width, self.epd.height):
 | 
				
			||||||
            raise ImageShowError("Image does not match screen size")
 | 
					            raise ImageShowError("Image does not match screen size")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # self.__init__()
 | 
					        self.__init__()
 | 
				
			||||||
        # possibly include a blank image to clear screen
 | 
					        # possibly include a blank image to clear screen
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.epd.display(self.epd.getbuffer(image))
 | 
					        self.epd.display(self.epd.getbuffer(image))
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										6
									
								
								main.py
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								main.py
									
									
									
									
									
								
							@@ -1,8 +1,12 @@
 | 
				
			|||||||
from image_convert import ImageShrink
 | 
					from image_convert import ImageShrink
 | 
				
			||||||
from image_get import ImageGetter
 | 
					from image_get import ImageGetter
 | 
				
			||||||
 | 
					# from image_show import ImageShow
 | 
				
			||||||
 | 
					
 | 
				
			||||||
get = ImageGetter()
 | 
					get = ImageGetter()
 | 
				
			||||||
convert = ImageShrink()
 | 
					convert = ImageShrink()
 | 
				
			||||||
 | 
					# show = ImageShow()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
image = get.get_random_image()
 | 
					image = get.get_random_image()
 | 
				
			||||||
convert.convert(image)
 | 
					image = convert.convert(image)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# show.show_image(image)
 | 
				
			||||||
		Reference in New Issue
	
	Block a user