amazing cache #55
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							| @@ -2,6 +2,7 @@ | ||||
|  | ||||
| import os | ||||
| from pathlib import Path | ||||
| from typing import List, Literal | ||||
|  | ||||
|  | ||||
| LOCATION_PREFIX = Path('src') | ||||
| @@ -14,6 +15,7 @@ OPTIMIZER_PARAMETERS_PATH = PARAMETERS_DIR / 'optimizer_parameters.yaml' | ||||
| cache_dir_string = os.getenv('OSM_CACHE_DIR', './cache') | ||||
| OSM_CACHE_DIR = Path(cache_dir_string) | ||||
|  | ||||
| OSM_TYPES = List[Literal['way', 'node', 'relation']] | ||||
|  | ||||
| MEMCACHED_HOST_PATH = os.getenv('MEMCACHED_HOST_PATH', None) | ||||
| if MEMCACHED_HOST_PATH == "none": | ||||
|   | ||||
| @@ -3,7 +3,7 @@ | ||||
| import logging | ||||
| import time | ||||
| from contextlib import asynccontextmanager | ||||
| from fastapi import FastAPI, HTTPException, Query | ||||
| from fastapi import FastAPI, HTTPException, BackgroundTasks, Query | ||||
|  | ||||
| from .logging_config import configure_logging | ||||
| from .structs.landmark import Landmark, Toilets | ||||
| @@ -14,8 +14,10 @@ from .utils.landmarks_manager import LandmarkManager | ||||
| from .utils.toilets_manager import ToiletsManager | ||||
| from .optimization.optimizer import Optimizer | ||||
| from .optimization.refiner import Refiner | ||||
| from .overpass.overpass import fill_cache | ||||
| from .cache import client as cache_client | ||||
|  | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
| manager = LandmarkManager() | ||||
| @@ -35,7 +37,6 @@ async def lifespan(app: FastAPI): | ||||
| app = FastAPI(lifespan=lifespan) | ||||
|  | ||||
|  | ||||
|  | ||||
| @app.post("/trip/new") | ||||
| def new_trip(preferences: Preferences, | ||||
|              start: tuple[float, float], | ||||
| @@ -127,6 +128,9 @@ def new_trip(preferences: Preferences, | ||||
|     # upon creation of the trip, persistence of both the trip and its landmarks is ensured. | ||||
|     trip = Trip.from_linked_landmarks(linked_tour, cache_client) | ||||
|     logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.') | ||||
|      | ||||
|     background_tasks = BackgroundTasks(fill_cache()) | ||||
|  | ||||
|     return trip | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -1,9 +1,9 @@ | ||||
| """Module defining the caching strategy for overpass requests.""" | ||||
| import os | ||||
| import xml.etree.ElementTree as ET | ||||
| import hashlib | ||||
| import time | ||||
|  | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from ..constants import OSM_CACHE_DIR, OSM_TYPES | ||||
|  | ||||
|  | ||||
| def get_cache_key(query: str) -> str: | ||||
| @@ -13,14 +13,9 @@ def get_cache_key(query: str) -> str: | ||||
|     """ | ||||
|     return hashlib.md5(query.encode('utf-8')).hexdigest() | ||||
|  | ||||
|  | ||||
| class CachingStrategyBase: | ||||
|     """ | ||||
|     Base class for implementing caching strategies. | ||||
|  | ||||
|     This class defines the structure for a caching strategy with basic methods | ||||
|     that must be implemented by subclasses. Subclasses should define how to | ||||
|     retrieve, store, and close the cache. | ||||
|     """ | ||||
|     def get(self, key): | ||||
|         """Retrieve the cached data associated with the provided key.""" | ||||
| @@ -30,6 +25,15 @@ class CachingStrategyBase: | ||||
|         """Store data in the cache with the specified key.""" | ||||
|         raise NotImplementedError('Subclass should implement set') | ||||
|  | ||||
|     def set_hollow(self, key, cell: tuple, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions=[], out='center'): | ||||
|         """Create a hollow (empty) cache entry with a specific key.""" | ||||
|         raise NotImplementedError('Subclass should implement set_hollow') | ||||
|  | ||||
|     def fill_hollow(self, key, value): | ||||
|         """Fill in the cache for an existing hollow entry.""" | ||||
|         raise NotImplementedError('Subclass should implement fill_hollow') | ||||
|  | ||||
|     def close(self): | ||||
|         """Clean up or close any resources used by the caching strategy.""" | ||||
|  | ||||
| @@ -37,22 +41,10 @@ class CachingStrategyBase: | ||||
| class XMLCache(CachingStrategyBase): | ||||
|     """ | ||||
|     A caching strategy that stores and retrieves data in XML format. | ||||
|  | ||||
|     This class provides methods to cache data as XML files in a specified directory. | ||||
|     The directory is automatically suffixed with '_XML' to distinguish it from other | ||||
|     caching strategies. The data is stored and retrieved using XML serialization. | ||||
|  | ||||
|     Args: | ||||
|         cache_dir (str): The base directory where XML cache files will be stored. | ||||
|                          Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix. | ||||
|  | ||||
|     Methods: | ||||
|         get(key): Retrieve cached data from a XML file associated with the given key. | ||||
|         set(key, value): Store data in a XML file with the specified key. | ||||
|     """ | ||||
|     def __init__(self, cache_dir=OSM_CACHE_DIR): | ||||
|         # Add the class name as a suffix to the directory | ||||
|         self._cache_dir = f'{cache_dir}_XML' | ||||
|         self._cache_dir = f'{cache_dir}' | ||||
|         if not os.path.exists(self._cache_dir): | ||||
|             os.makedirs(self._cache_dir) | ||||
|  | ||||
| @@ -68,7 +60,6 @@ class XMLCache(CachingStrategyBase): | ||||
|                 tree = ET.parse(filename) | ||||
|                 return tree.getroot()  # Return the root element of the parsed XML | ||||
|             except ET.ParseError: | ||||
|                 # print(f"Error parsing cached XML file: {filename}") | ||||
|                 return None | ||||
|         return None | ||||
|  | ||||
| @@ -77,25 +68,41 @@ class XMLCache(CachingStrategyBase): | ||||
|         filename = self._filename(key) | ||||
|         tree = ET.ElementTree(value)  # value is expected to be an ElementTree root element | ||||
|         try: | ||||
|             # Write the XML data to a file | ||||
|             with open(filename, 'wb') as file: | ||||
|                 tree.write(file, encoding='utf-8', xml_declaration=True) | ||||
|         except IOError as e: | ||||
|             raise IOError(f"Error writing to cache file: {filename} - {e}") from e | ||||
|  | ||||
|     def set_hollow(self, key, cell: tuple, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions=[], out='center'): | ||||
|         """Create an empty placeholder cache entry for a future fill.""" | ||||
|         hollow_key = f'hollow_{key}' | ||||
|         filename = self._filename(hollow_key) | ||||
|      | ||||
|         # Create the root element <cache> | ||||
|         root = ET.Element("params") | ||||
|         # Add sub-elements with provided values | ||||
|         ET.SubElement(root, "key").text = key | ||||
|         ET.SubElement(root, "cell").text = f"({cell[0]}, {cell[1]})" | ||||
|         ET.SubElement(root, "osm_types").text = ','.join(osm_types) | ||||
|         ET.SubElement(root, "selector").text = selector | ||||
|         ET.SubElement(root, "conditions").text = ','.join(conditions) if conditions else "none" | ||||
|         ET.SubElement(root, "out").text = out | ||||
|  | ||||
|         # Create an ElementTree object from the root | ||||
|         tree = ET.ElementTree(root) | ||||
|  | ||||
|         # Write the XML to the file | ||||
|         with open(filename, 'wb') as file: | ||||
|             tree.write(file, encoding='utf-8', xml_declaration=True) | ||||
|  | ||||
|     def close(self): | ||||
|         """Cleanup method, if needed.""" | ||||
|         pass | ||||
|  | ||||
| class CachingStrategy: | ||||
|     """ | ||||
|     A class to manage different caching strategies. | ||||
|  | ||||
|     This class provides an interface to switch between different caching strategies  | ||||
|     (e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats,  | ||||
|     depending on the strategy being used. By default, it uses the XMLCache strategy. | ||||
|  | ||||
|     Attributes: | ||||
|     __strategy (CachingStrategyBase): The currently active caching strategy. | ||||
|     __strategies (dict): A mapping between strategy names (as strings) and their corresponding | ||||
|                          classes, allowing dynamic selection of caching strategies. | ||||
|     """ | ||||
|     __strategy = XMLCache()  # Default caching strategy | ||||
|     __strategies = { | ||||
| @@ -104,37 +111,31 @@ class CachingStrategy: | ||||
|  | ||||
|     @classmethod | ||||
|     def use(cls, strategy_name='XML', **kwargs): | ||||
|         """ | ||||
|         Set the caching strategy based on the strategy_name provided. | ||||
|  | ||||
|         Args: | ||||
|             strategy_name (str): The name of the caching strategy (e.g., 'XML'). | ||||
|             **kwargs: Additional keyword arguments to pass when initializing the strategy. | ||||
|         """ | ||||
|         # If a previous strategy exists, close it | ||||
|         if cls.__strategy: | ||||
|             cls.__strategy.close() | ||||
|  | ||||
|         # Retrieve the strategy class based on the strategy name | ||||
|         strategy_class = cls.__strategies.get(strategy_name) | ||||
|  | ||||
|         if not strategy_class: | ||||
|             raise ValueError(f"Unknown caching strategy: {strategy_name}") | ||||
|  | ||||
|         # Instantiate the new strategy with the provided arguments | ||||
|         cls.__strategy = strategy_class(**kwargs) | ||||
|         return cls.__strategy | ||||
|  | ||||
|     @classmethod | ||||
|     def get(cls, key): | ||||
|         """Get data from the current strategy's cache.""" | ||||
|         if not cls.__strategy: | ||||
|             raise RuntimeError("Caching strategy has not been set.") | ||||
|         return cls.__strategy.get(key) | ||||
|  | ||||
|     @classmethod | ||||
|     def set(cls, key, value): | ||||
|         """Set data in the current strategy's cache.""" | ||||
|         if not cls.__strategy: | ||||
|             raise RuntimeError("Caching strategy has not been set.") | ||||
|         cls.__strategy.set(key, value) | ||||
|  | ||||
|     @classmethod | ||||
|     def set_hollow(cls, key, cell: tuple, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions=[], out='center'): | ||||
|         """Create a hollow cache entry.""" | ||||
|         cls.__strategy.set_hollow(key, cell, osm_types, selector, conditions, out) | ||||
|  | ||||
|     @classmethod | ||||
|     def fill_hollow(cls, key, value): | ||||
|         """Fill in the hollow cache entry with actual data.""" | ||||
|         cls.__strategy.fill_hollow(key, value) | ||||
|   | ||||
| @@ -1,14 +1,15 @@ | ||||
| """Module allowing connexion to overpass api and fectch data from OSM.""" | ||||
| from typing import Literal, List | ||||
| import os | ||||
| import urllib | ||||
| import math | ||||
| import logging | ||||
| import xml.etree.ElementTree as ET | ||||
|  | ||||
| from .caching_strategy import get_cache_key, CachingStrategy | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from ..constants import OSM_CACHE_DIR, OSM_TYPES | ||||
|  | ||||
| logger = logging.getLogger('Overpass') | ||||
| osm_types = List[Literal['way', 'node', 'relation']] | ||||
|  | ||||
| RESOLUTION = 0.05 | ||||
|  | ||||
|  | ||||
| class Overpass : | ||||
| @@ -16,6 +17,9 @@ class Overpass : | ||||
|     Overpass class to manage the query building and sending to overpass api. | ||||
|     The caching strategy is a part of this class and initialized upon creation of the Overpass object. | ||||
|     """ | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
|     def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) : | ||||
|         """ | ||||
|         Initialize the Overpass instance with the url, headers and caching strategy. | ||||
| @@ -26,16 +30,14 @@ class Overpass : | ||||
|  | ||||
|  | ||||
|     @classmethod | ||||
|     def build_query(self, area: tuple, osm_types: osm_types, | ||||
|     def build_query(self, bbox: tuple, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions=[], out='center') -> str: | ||||
|         """ | ||||
|         Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data. | ||||
|  | ||||
|         Args: | ||||
|             area (tuple): A tuple representing the geographical search area, typically in the format  | ||||
|                         (radius, latitude, longitude). The first element is a string like "around:2000"  | ||||
|                         specifying the search radius, and the second and third elements represent  | ||||
|                         the latitude and longitude as floats or strings. | ||||
|             bbox (tuple): A tuple representing the geographical search area, typically in the format  | ||||
|                         (lat_min, lon_min, lat_max, lon_max). | ||||
|             osm_types (list[str]): A list of OSM element types to search for. Must be one or more of  | ||||
|                                     'Way', 'Node', or 'Relation'. | ||||
|             selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.). | ||||
| @@ -52,7 +54,6 @@ class Overpass : | ||||
|         Notes: | ||||
|             - If no conditions are provided, the query will just use the `selector` to filter the OSM  | ||||
|             elements without additional constraints. | ||||
|             - The search area must always formatted as "(radius, lat, lon)". | ||||
|         """ | ||||
|         if not isinstance(conditions, list) : | ||||
|             conditions = [conditions] | ||||
| @@ -61,15 +62,8 @@ class Overpass : | ||||
|  | ||||
|         query = '(' | ||||
|  | ||||
|         # Round the radius to nearest 50 and coordinates to generate less queries | ||||
|         if area[0] > 500 : | ||||
|             search_radius = round(area[0] / 50) * 50 | ||||
|             loc = tuple((round(area[1], 2), round(area[2], 2))) | ||||
|         else : | ||||
|             search_radius = round(area[0] / 25) * 25 | ||||
|             loc = tuple((round(area[1], 3), round(area[2], 3))) | ||||
|  | ||||
|         search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})" | ||||
|         # convert the bbox to string. | ||||
|         bbox_str = f"({','.join(map(str, bbox))})" | ||||
|  | ||||
|         if conditions : | ||||
|             conditions = '(if: ' + ' && '.join(conditions) + ')' | ||||
| @@ -77,14 +71,15 @@ class Overpass : | ||||
|             conditions = '' | ||||
|  | ||||
|         for elem in osm_types : | ||||
|             query += elem + '[' + selector + ']' + conditions + search_area + ';' | ||||
|             query += elem + '[' + selector + ']' + conditions + bbox_str + ';' | ||||
|  | ||||
|         query += ');' + f'out {out};' | ||||
|  | ||||
|         return query | ||||
|  | ||||
|  | ||||
|     def send_query(self, query: str) -> ET: | ||||
|     def send_query(self, bbox: tuple, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions=[], out='center') -> ET: | ||||
|         """ | ||||
|         Sends the Overpass QL query to the Overpass API and returns the parsed JSON response. | ||||
|  | ||||
| @@ -94,18 +89,42 @@ class Overpass : | ||||
|         Returns: | ||||
|             dict: The parsed JSON response from the Overpass API, or None if the request fails. | ||||
|         """ | ||||
|         # Determine which grid cells overlap with this bounding box. | ||||
|         overlapping_cells = self.get_overlapping_cells(bbox) | ||||
|  | ||||
|         # Generate a cache key for the current query | ||||
|         cache_key = get_cache_key(query) | ||||
|         # Check the cache for any data that overlaps with these cells | ||||
|         cell_key_dict = {} | ||||
|         for cell in overlapping_cells : | ||||
|             for elem in osm_types : | ||||
|                 key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})" | ||||
|              | ||||
|         # Try to fetch the result from the cache | ||||
|         cached_response = self.caching_strategy.get(cache_key) | ||||
|         if cached_response is not None : | ||||
|             logger.debug("Cache hit.") | ||||
|             return cached_response | ||||
|             cell_key_dict[cell] = get_cache_key(key_str) | ||||
|  | ||||
|         cached_responses = [] | ||||
|         hollow_cache_keys = [] | ||||
|  | ||||
|         # Retrieve the cached data and mark the missing entries as hollow | ||||
|         for cell, key in cell_key_dict.items(): | ||||
|             cached_data = self.caching_strategy.get(key) | ||||
|             if cached_data is not None : | ||||
|                 cached_responses.append(cached_data) | ||||
|             else: | ||||
|                 # Cache miss: Mark the cache key as hollow | ||||
|                 self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out) | ||||
|                 hollow_cache_keys.append(key) | ||||
|  | ||||
|         # If there is no missing data, return the cached responses | ||||
|         if not hollow_cache_keys : | ||||
|             self.logger.debug(f'Cache hit.') | ||||
|             return self.combine_cached_data(cached_responses) | ||||
|          | ||||
|         # TODO If there is SOME missing data : hybrid stuff with partial cache | ||||
|          | ||||
|         # Build the query string in case of needed overpass query | ||||
|         query_str = self.build_query(bbox, osm_types, selector, conditions, out) | ||||
|  | ||||
|         # Prepare the data to be sent as POST request, encoded as bytes | ||||
|         data = urllib.parse.urlencode({'data': query}).encode('utf-8') | ||||
|         data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') | ||||
|  | ||||
|         try: | ||||
|             # Create a Request object with the specified URL, data, and headers | ||||
| @@ -117,9 +136,7 @@ class Overpass : | ||||
|                 response_data = response.read().decode('utf-8') | ||||
|                 root = ET.fromstring(response_data) | ||||
|  | ||||
|                 # Cache the response data as an ElementTree root | ||||
|                 self.caching_strategy.set(cache_key, root) | ||||
|                 logger.debug("Response data added to cache.") | ||||
|                 self.logger.debug(f'Cache miss. Fetching data through Overpass\nQuery = {query_str}') | ||||
|  | ||||
|                 return root | ||||
|  | ||||
| @@ -127,7 +144,108 @@ class Overpass : | ||||
|             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||
|  | ||||
|  | ||||
| def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | ||||
|     def build_query_from_hollow(self, xml_string): | ||||
|         """Extract variables from an XML string.""" | ||||
|          | ||||
|         # Parse the XML string into an ElementTree object | ||||
|         root = ET.fromstring(xml_string) | ||||
|          | ||||
|         # Extract values from the XML tree | ||||
|         key = root.find('key').text | ||||
|         cell = tuple(map(float, root.find('cell').text.strip('()').split(','))) | ||||
|         bbox = self.get_bbox_from_grid_cell(cell[0], cell[1]) | ||||
|         osm_types = root.find('osm_types').text.split(',') | ||||
|         selector = root.find('selector').text | ||||
|         conditions = root.find('conditions').text.split(',') if root.find('conditions').text != "none" else [] | ||||
|         out = root.find('out').text | ||||
|  | ||||
|         query_str = self.build_query(bbox, osm_types, selector, conditions, out) | ||||
|          | ||||
|         return query_str, key | ||||
|  | ||||
|     def get_grid_cell(self, lat: float, lon: float): | ||||
|         """ | ||||
|         Returns the grid cell coordinates for a given latitude and longitude. | ||||
|         Each grid cell is 0.05°lat x 0.05°lon resolution in size. | ||||
|         """ | ||||
|         lat_index = math.floor(lat / RESOLUTION) | ||||
|         lon_index = math.floor(lon / RESOLUTION) | ||||
|         return (lat_index, lon_index) | ||||
|  | ||||
|  | ||||
|     def get_bbox_from_grid_cell(self, lat_index: int, lon_index: int): | ||||
|         """ | ||||
|         Returns the bounding box for a given grid cell index. | ||||
|         Each grid cell is resolution x resolution in size. | ||||
|  | ||||
|         The bounding box is returned as (min_lat, min_lon, max_lat, max_lon). | ||||
|         """ | ||||
|         # Calculate the southwest (min_lat, min_lon) corner of the bounding box | ||||
|         min_lat = round(lat_index * RESOLUTION, 2) | ||||
|         min_lon = round(lon_index * RESOLUTION, 2) | ||||
|  | ||||
|         # Calculate the northeast (max_lat, max_lon) corner of the bounding box | ||||
|         max_lat = round((lat_index + 1) * RESOLUTION, 2) | ||||
|         max_lon = round((lon_index + 1) * RESOLUTION, 2) | ||||
|  | ||||
|         return (min_lat, min_lon, max_lat, max_lon) | ||||
|  | ||||
|  | ||||
|  | ||||
|     def get_overlapping_cells(self, query_bbox: tuple): | ||||
|         """ | ||||
|         Returns a set of all grid cells that overlap with the given bounding box. | ||||
|         """ | ||||
|         # Extract location from the query bbox | ||||
|         lat_min, lon_min, lat_max, lon_max = query_bbox | ||||
|  | ||||
|         min_lat_cell, min_lon_cell = self.get_grid_cell(lat_min, lon_min) | ||||
|         max_lat_cell, max_lon_cell = self.get_grid_cell(lat_max, lon_max) | ||||
|  | ||||
|         overlapping_cells = set() | ||||
|         for lat_idx in range(min_lat_cell, max_lat_cell + 1): | ||||
|             for lon_idx in range(min_lon_cell, max_lon_cell + 1): | ||||
|                 overlapping_cells.add((lat_idx, lon_idx)) | ||||
|          | ||||
|         return overlapping_cells | ||||
|  | ||||
|  | ||||
|     def combine_cached_data(self, cached_data_list): | ||||
|         """ | ||||
|         Combines data from multiple cached responses into a single result. | ||||
|         """ | ||||
|         combined_data = ET.Element("osm") | ||||
|         for cached_data in cached_data_list: | ||||
|             for element in cached_data: | ||||
|                 combined_data.append(element) | ||||
|         return combined_data | ||||
|      | ||||
|  | ||||
|     def fill_cache(self, query_str: str, cache_key) : | ||||
|          | ||||
|         # Prepare the data to be sent as POST request, encoded as bytes | ||||
|         data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') | ||||
|  | ||||
|         try: | ||||
|             # Create a Request object with the specified URL, data, and headers | ||||
|             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) | ||||
|  | ||||
|             # Send the request and read the response | ||||
|             with urllib.request.urlopen(request) as response: | ||||
|                 # Read and decode the response | ||||
|                 response_data = response.read().decode('utf-8') | ||||
|                 root = ET.fromstring(response_data) | ||||
|  | ||||
|                 self.caching_strategy.set(cache_key, root) | ||||
|                 self.logger.debug(f'Cache set') | ||||
|  | ||||
|         except urllib.error.URLError as e: | ||||
|             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| def get_base_info(elem: ET.Element, osm_type: OSM_TYPES, with_name=False) : | ||||
|     """ | ||||
|     Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element. | ||||
|  | ||||
| @@ -169,3 +287,26 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | ||||
|         return osm_id, coords, name | ||||
|     else : | ||||
|         return osm_id, coords | ||||
|  | ||||
|  | ||||
| def fill_cache(): | ||||
|  | ||||
|     overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) | ||||
|  | ||||
|     with os.scandir(OSM_CACHE_DIR) as it: | ||||
|         for entry in it: | ||||
|             if entry.is_file() and entry.name.startswith('hollow_'): | ||||
|                  | ||||
|                 # Read the whole file content as a string | ||||
|                 with open(entry.path, 'r') as f: | ||||
|                     xml_string = f.read()   | ||||
|  | ||||
|                 # Build the query and cache key from the hollow XML string | ||||
|                 query_str, key = overpass.build_query_from_hollow(xml_string) | ||||
|                  | ||||
|                 # Fill the cache with the query and key | ||||
|                 overpass.fill_cache(query_str, key) | ||||
|  | ||||
|                 # Now delete the file as the cache is filled | ||||
|                 os.remove(entry.path) | ||||
|                  | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| city_bbox_side: 7500 #m | ||||
| max_bbox_side: 4000   #m | ||||
| radius_close_to: 50 | ||||
| church_coeff: 0.55 | ||||
| nature_coeff: 1.4 | ||||
|   | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| from typing import Optional, Literal | ||||
| from uuid import uuid4, UUID | ||||
| from pydantic import BaseModel, Field | ||||
| from pydantic import BaseModel, ConfigDict, Field | ||||
|  | ||||
|  | ||||
| # Output to frontend | ||||
| @@ -144,8 +144,4 @@ class Toilets(BaseModel) : | ||||
|         """ | ||||
|         return f'Toilets @{self.location}' | ||||
|  | ||||
|     class Config: | ||||
|         """ | ||||
|         This allows us to easily convert the model to and from dictionaries | ||||
|         """ | ||||
|         from_attributes = True | ||||
|     model_config = ConfigDict(from_attributes=True) | ||||
|   | ||||
| @@ -27,8 +27,8 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name | ||||
|         "/trip/new", | ||||
|         json={ | ||||
|             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, | ||||
|             "nature": {"type": "nature", "score": 5}, | ||||
|             "shopping": {"type": "shopping", "score": 5}, | ||||
|             "nature": {"type": "nature", "score": 0}, | ||||
|             "shopping": {"type": "shopping", "score": 0}, | ||||
|             "max_time_minute": duration_minutes, | ||||
|             "detour_tolerance_minute": 0}, | ||||
|             "start": [48.084588, 7.280405] | ||||
| @@ -51,11 +51,11 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name | ||||
|     assert response.status_code == 200  # check for successful planning | ||||
|     assert isinstance(landmarks, list)  # check that the return type is a list | ||||
|     assert len(landmarks) > 2           # check that there is something to visit | ||||
|     assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" | ||||
|     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" | ||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||
|     # assert 2!= 3 | ||||
|  | ||||
|  | ||||
| def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. | ||||
|   | ||||
| @@ -10,6 +10,8 @@ from ..overpass.overpass import Overpass, get_base_info | ||||
| from ..structs.landmark import Landmark | ||||
| from .get_time_distance import get_distance | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from .utils import create_bbox | ||||
|  | ||||
|  | ||||
|  | ||||
| # silence the overpass logger | ||||
| @@ -81,7 +83,6 @@ class ClusterManager: | ||||
|         # Setup the caching in the Overpass class. | ||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) | ||||
|  | ||||
|  | ||||
|         self.cluster_type = cluster_type | ||||
|         if cluster_type == 'shopping' : | ||||
|             osm_types = ['node'] | ||||
| @@ -95,16 +96,13 @@ class ClusterManager: | ||||
|             raise NotImplementedError("Please choose only an available option for cluster detection") | ||||
|  | ||||
|         # Initialize the points for cluster detection | ||||
|         query = self.overpass.build_query( | ||||
|             area = bbox, | ||||
|         try: | ||||
|             result = self.overpass.send_query( | ||||
|             bbox = bbox, | ||||
|             osm_types = osm_types, | ||||
|             selector = sel, | ||||
|             out = out | ||||
|         ) | ||||
|         self.logger.debug(f"Cluster query: {query}") | ||||
|  | ||||
|         try: | ||||
|             result = self.overpass.send_query(query) | ||||
|         except Exception as e: | ||||
|             self.logger.error(f"Error fetching landmarks: {e}") | ||||
|  | ||||
| @@ -218,8 +216,7 @@ class ClusterManager: | ||||
|         """ | ||||
|  | ||||
|         # Define the bounding box for a given radius around the coordinates | ||||
|         lat, lon = cluster.centroid | ||||
|         bbox = (1000, lat, lon) | ||||
|         bbox = create_bbox(cluster.centroid, 1000) | ||||
|          | ||||
|         # Query neighborhoods and shopping malls | ||||
|         selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"'] | ||||
| @@ -238,15 +235,12 @@ class ClusterManager: | ||||
|         osm_types = ['node', 'way', 'relation'] | ||||
|  | ||||
|         for sel in selectors : | ||||
|             query = self.overpass.build_query( | ||||
|                 area = bbox, | ||||
|             try: | ||||
|                 result = self.overpass.send_query(bbox = bbox, | ||||
|                                                   osm_types = osm_types, | ||||
|                                                   selector = sel, | ||||
|                                                   out = 'ids center' | ||||
|                                                   ) | ||||
|  | ||||
|             try: | ||||
|                 result = self.overpass.send_query(query) | ||||
|             except Exception as e: | ||||
|                 self.logger.error(f"Error fetching landmarks: {e}") | ||||
|                 continue | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| """Module used to import data from OSM and arrange them in categories.""" | ||||
| import logging | ||||
| import math as m | ||||
| import xml.etree.ElementTree as ET | ||||
| import yaml | ||||
|  | ||||
| @@ -9,12 +10,10 @@ from ..structs.landmark import Landmark | ||||
| from .take_most_important import take_most_important | ||||
| from .cluster_manager import ClusterManager | ||||
| from ..overpass.overpass import Overpass, get_base_info | ||||
| from .utils import create_bbox | ||||
|  | ||||
| from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR | ||||
|  | ||||
| # silence the overpass logger | ||||
| logging.getLogger('Overpass').setLevel(level=logging.CRITICAL) | ||||
|  | ||||
|  | ||||
| class LandmarkManager: | ||||
|     """ | ||||
| @@ -37,8 +36,7 @@ class LandmarkManager: | ||||
|  | ||||
|         with LANDMARK_PARAMETERS_PATH.open('r') as f: | ||||
|             parameters = yaml.safe_load(f) | ||||
|             self.max_bbox_side = parameters['city_bbox_side'] | ||||
|             self.radius_close_to = parameters['radius_close_to'] | ||||
|             self.max_bbox_side = parameters['max_bbox_side'] | ||||
|             self.church_coeff = parameters['church_coeff'] | ||||
|             self.nature_coeff = parameters['nature_coeff'] | ||||
|             self.overall_coeff = parameters['overall_coeff'] | ||||
| @@ -80,13 +78,13 @@ class LandmarkManager: | ||||
|         """ | ||||
|         self.logger.debug('Starting to fetch landmarks...') | ||||
|         max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor) | ||||
|         reachable_bbox_side = min(max_walk_dist, self.max_bbox_side) | ||||
|         radius = min(max_walk_dist, int(self.max_bbox_side/2)) | ||||
|  | ||||
|         # use set to avoid duplicates, this requires some __methods__ to be set in Landmark | ||||
|         all_landmarks = set() | ||||
|  | ||||
|         # Create a bbox using the around technique, tuple of strings | ||||
|         bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1])) | ||||
|         bbox = create_bbox(center_coordinates, radius) | ||||
|  | ||||
|         # list for sightseeing | ||||
|         if preferences.sightseeing.score != 0: | ||||
| @@ -193,17 +191,15 @@ class LandmarkManager: | ||||
|                 query_conditions = [] | ||||
|                 osm_types.append('node') | ||||
|  | ||||
|             query = self.overpass.build_query( | ||||
|                 area = bbox, | ||||
|             # Send the overpass query | ||||
|             try: | ||||
|                 result = self.overpass.send_query( | ||||
|                 bbox = bbox, | ||||
|                 osm_types = osm_types, | ||||
|                 selector = sel, | ||||
|                 conditions = query_conditions,        # except for nature.... | ||||
|                 out = 'center' | ||||
|                 ) | ||||
|             self.logger.debug(f"Query: {query}") | ||||
|  | ||||
|             try: | ||||
|                 result = self.overpass.send_query(query) | ||||
|             except Exception as e: | ||||
|                 self.logger.error(f"Error fetching landmarks: {e}") | ||||
|                 continue | ||||
|   | ||||
							
								
								
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | ||||
| """Various helper functions""" | ||||
| import math as m | ||||
|  | ||||
| def create_bbox(coords: tuple[float, float], radius: int): | ||||
|     """ | ||||
|     Create a bounding box around the given coordinates. | ||||
|  | ||||
|     Args: | ||||
|         coords (tuple[float, float]): The latitude and longitude of the center of the bounding box. | ||||
|         radius (int): The half-side length of the bounding box in meters. | ||||
|  | ||||
|     Returns: | ||||
|         tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude | ||||
|                                             defining the bounding box. | ||||
|     """ | ||||
|     # Earth's radius in meters | ||||
|     R = 6378137 | ||||
|     lat, lon = coords | ||||
|     d_lat = radius / R | ||||
|     d_lon = radius / (R * m.cos(m.pi * lat / 180)) | ||||
|  | ||||
|     lat_min = lat - d_lat * 180 / m.pi | ||||
|     lat_max = lat + d_lat * 180 / m.pi | ||||
|     lon_min = lon - d_lon * 180 / m.pi | ||||
|     lon_max = lon + d_lon * 180 / m.pi | ||||
|  | ||||
|     return (lat_min, lon_min, lat_max, lon_max) | ||||
		Reference in New Issue
	
	Block a user