amazing cache #55
| @@ -28,7 +28,7 @@ jobs: | ||||
|       working-directory: backend | ||||
|  | ||||
|     - name: Run Tests | ||||
|       run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=INFO | ||||
|       run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=DEBUG | ||||
|       working-directory: backend | ||||
|  | ||||
|     - name: Upload HTML report | ||||
|   | ||||
| @@ -445,7 +445,9 @@ disable=raw-checker-failed, | ||||
|         logging-fstring-interpolation, | ||||
|         duplicate-code, | ||||
|         relative-beyond-top-level,  | ||||
|         invalid-name | ||||
|         invalid-name, | ||||
|         too-many-arguments, | ||||
|         too-many-positional-arguments | ||||
|  | ||||
| # Enable the message, report, category or checker with the given id(s). You can | ||||
| # either give multiple identifier separated by comma (,) or put this option | ||||
|   | ||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							| @@ -2,6 +2,7 @@ | ||||
|  | ||||
| import os | ||||
| from pathlib import Path | ||||
| from typing import List, Literal, Tuple | ||||
|  | ||||
|  | ||||
| LOCATION_PREFIX = Path('src') | ||||
| @@ -14,6 +15,8 @@ OPTIMIZER_PARAMETERS_PATH = PARAMETERS_DIR / 'optimizer_parameters.yaml' | ||||
| cache_dir_string = os.getenv('OSM_CACHE_DIR', './cache') | ||||
| OSM_CACHE_DIR = Path(cache_dir_string) | ||||
|  | ||||
| OSM_TYPES = List[Literal['way', 'node', 'relation']] | ||||
| BBOX = Tuple[float, float, float, float] | ||||
|  | ||||
| MEMCACHED_HOST_PATH = os.getenv('MEMCACHED_HOST_PATH', None) | ||||
| if MEMCACHED_HOST_PATH == "none": | ||||
|   | ||||
| @@ -3,7 +3,7 @@ | ||||
| import logging | ||||
| import time | ||||
| from contextlib import asynccontextmanager | ||||
| from fastapi import FastAPI, HTTPException, Query | ||||
| from fastapi import FastAPI, HTTPException, BackgroundTasks, Query | ||||
|  | ||||
| from .logging_config import configure_logging | ||||
| from .structs.landmark import Landmark, Toilets | ||||
| @@ -14,8 +14,10 @@ from .utils.landmarks_manager import LandmarkManager | ||||
| from .utils.toilets_manager import ToiletsManager | ||||
| from .optimization.optimizer import Optimizer | ||||
| from .optimization.refiner import Refiner | ||||
| from .overpass.overpass import fill_cache | ||||
| from .cache import client as cache_client | ||||
|  | ||||
|  | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
| manager = LandmarkManager() | ||||
| @@ -35,11 +37,11 @@ async def lifespan(app: FastAPI): | ||||
| app = FastAPI(lifespan=lifespan) | ||||
|  | ||||
|  | ||||
|  | ||||
| @app.post("/trip/new") | ||||
| def new_trip(preferences: Preferences, | ||||
|              start: tuple[float, float], | ||||
|              end: tuple[float, float] | None = None) -> Trip: | ||||
|              end: tuple[float, float] | None = None,  | ||||
|              background_tasks: BackgroundTasks = None) -> Trip: | ||||
|     """ | ||||
|     Main function to call the optimizer. | ||||
|  | ||||
| @@ -91,6 +93,9 @@ def new_trip(preferences: Preferences, | ||||
|         preferences = preferences | ||||
|     ) | ||||
|  | ||||
|     if len(landmarks) == 0 : | ||||
|         raise HTTPException(status_code=500, detail="No landmarks were found.") | ||||
|  | ||||
|     # insert start and finish to the landmarks list | ||||
|     landmarks_short.insert(0, start_landmark) | ||||
|     landmarks_short.append(end_landmark) | ||||
| @@ -114,6 +119,9 @@ def new_trip(preferences: Preferences, | ||||
|         refined_tour = refiner.refine_optimization(landmarks, base_tour, | ||||
|                                                preferences.max_time_minute, | ||||
|                                                preferences.detour_tolerance_minute) | ||||
|     except TimeoutError as te : | ||||
|         logger.error(f'Refiner failed : {str(te)} Using base tour.') | ||||
|         refined_tour = base_tour | ||||
|     except Exception as exc : | ||||
|         raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc | ||||
|  | ||||
| @@ -127,6 +135,9 @@ def new_trip(preferences: Preferences, | ||||
|     # upon creation of the trip, persistence of both the trip and its landmarks is ensured. | ||||
|     trip = Trip.from_linked_landmarks(linked_tour, cache_client) | ||||
|     logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.') | ||||
|  | ||||
|     background_tasks.add_task(fill_cache) | ||||
|  | ||||
|     return trip | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -55,6 +55,9 @@ class Optimizer: | ||||
|             self.average_walking_speed = parameters['average_walking_speed'] | ||||
|             self.max_landmarks = parameters['max_landmarks'] | ||||
|             self.overshoot = parameters['overshoot'] | ||||
|             self.time_limit = parameters['time_limit'] | ||||
|             self.gap_rel = parameters['gap_rel'] | ||||
|             self.max_iter = parameters['max_iter'] | ||||
|  | ||||
|  | ||||
|     def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int): | ||||
| @@ -490,10 +493,21 @@ class Optimizer: | ||||
|  | ||||
|  | ||||
|     def warm_start(self, x: list[pl.LpVariable], L: int) : | ||||
|         """ | ||||
|         This function sets the initial values of the decision variables to a feasible solution. | ||||
|         This can help the solver start with a feasible or heuristic solution, | ||||
|         potentially speeding up convergence. | ||||
|  | ||||
|         Args: | ||||
|         x (list[pl.LpVariable]): A list of PuLP decision variables (binary variables). | ||||
|         L (int): The size parameter, representing a dimension (likely related to a grid or matrix). | ||||
|  | ||||
|         Returns: | ||||
|         list[pl.LpVariable]: The modified list of PuLP decision variables with initial values set. | ||||
|         """ | ||||
|         for i in range(L*L) : | ||||
|             x[i].setInitialValue(0) | ||||
|              | ||||
|  | ||||
|         x[1].setInitialValue(1) | ||||
|         x[2*L-1].setInitialValue(1) | ||||
|  | ||||
| @@ -573,7 +587,10 @@ class Optimizer: | ||||
|         prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks) | ||||
|  | ||||
|         # Solve the problem and extract results. | ||||
|         prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1, timeLimit=10, warmStart=False)) | ||||
|         try : | ||||
|             prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit+1, gapRel=self.gap_rel)) | ||||
|         except Exception as exc : | ||||
|             raise Exception(f"No solution found: {exc}") from exc | ||||
|         status = pl.LpStatus[prob.status] | ||||
|         solution = [pl.value(var) for var in x]  # The values of the decision variables (will be 0 or 1) | ||||
|  | ||||
| @@ -588,18 +605,21 @@ class Optimizer: | ||||
|         circles = self.is_connected(solution) | ||||
|  | ||||
|         i = 0 | ||||
|         timeout = 40 | ||||
|         while circles is not None : | ||||
|             i += 1 | ||||
|             if i == timeout : | ||||
|                 self.logger.error(f'Timeout: No solution found after {timeout} iterations.') | ||||
|                 raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.") | ||||
|             if i == self.max_iter : | ||||
|                 self.logger.error(f'Timeout: No solution found after {self.max_iter} iterations.') | ||||
|                 raise TimeoutError(f"Optimization took too long. No solution found after {self.max_iter} iterations.") | ||||
|  | ||||
|             for circle in circles : | ||||
|                 self.prevent_circle(prob, x, circle, L) | ||||
|  | ||||
|             # Solve the problem again | ||||
|             prob.solve(pl.PULP_CBC_CMD(msg=False)) | ||||
|             try : | ||||
|                 prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit, gapRel=self.gap_rel)) | ||||
|             except Exception as exc : | ||||
|                 raise Exception(f"No solution found: {exc}") from exc | ||||
|  | ||||
|             solution = [pl.value(var) for var in x] | ||||
|  | ||||
|             if pl.LpStatus[prob.status] != 'Optimal' : | ||||
| @@ -614,5 +634,5 @@ class Optimizer: | ||||
|         order = self.get_order(solution) | ||||
|         tour =  [landmarks[i] for i in order] | ||||
|  | ||||
|         self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}") | ||||
|         self.logger.info(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}") | ||||
|         return tour | ||||
|   | ||||
| @@ -1,9 +1,8 @@ | ||||
| """Module defining the caching strategy for overpass requests.""" | ||||
| import os | ||||
| import xml.etree.ElementTree as ET | ||||
| import json | ||||
| import hashlib | ||||
|  | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from ..constants import OSM_CACHE_DIR, OSM_TYPES | ||||
|  | ||||
|  | ||||
| def get_cache_key(query: str) -> str: | ||||
| @@ -17,10 +16,6 @@ def get_cache_key(query: str) -> str: | ||||
| class CachingStrategyBase: | ||||
|     """ | ||||
|     Base class for implementing caching strategies. | ||||
|  | ||||
|     This class defines the structure for a caching strategy with basic methods | ||||
|     that must be implemented by subclasses. Subclasses should define how to | ||||
|     retrieve, store, and close the cache. | ||||
|     """ | ||||
|     def get(self, key): | ||||
|         """Retrieve the cached data associated with the provided key.""" | ||||
| @@ -30,111 +25,108 @@ class CachingStrategyBase: | ||||
|         """Store data in the cache with the specified key.""" | ||||
|         raise NotImplementedError('Subclass should implement set') | ||||
|  | ||||
|     def set_hollow(self, key, **kwargs): | ||||
|         """Create a hollow (empty) cache entry with a specific key.""" | ||||
|         raise NotImplementedError('Subclass should implement set_hollow') | ||||
|  | ||||
|     def close(self): | ||||
|         """Clean up or close any resources used by the caching strategy.""" | ||||
|  | ||||
|  | ||||
| class XMLCache(CachingStrategyBase): | ||||
| class JSONCache(CachingStrategyBase): | ||||
|     """ | ||||
|     A caching strategy that stores and retrieves data in XML format. | ||||
|  | ||||
|     This class provides methods to cache data as XML files in a specified directory. | ||||
|     The directory is automatically suffixed with '_XML' to distinguish it from other | ||||
|     caching strategies. The data is stored and retrieved using XML serialization. | ||||
|  | ||||
|     Args: | ||||
|         cache_dir (str): The base directory where XML cache files will be stored. | ||||
|                          Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix. | ||||
|  | ||||
|     Methods: | ||||
|         get(key): Retrieve cached data from a XML file associated with the given key. | ||||
|         set(key, value): Store data in a XML file with the specified key. | ||||
|     A caching strategy that stores and retrieves data in JSON format. | ||||
|     """ | ||||
|     def __init__(self, cache_dir=OSM_CACHE_DIR): | ||||
|         # Add the class name as a suffix to the directory | ||||
|         self._cache_dir = f'{cache_dir}_XML' | ||||
|         self._cache_dir = f'{cache_dir}' | ||||
|         if not os.path.exists(self._cache_dir): | ||||
|             os.makedirs(self._cache_dir) | ||||
|  | ||||
|     def _filename(self, key): | ||||
|         return os.path.join(self._cache_dir, f'{key}.xml') | ||||
|         return os.path.join(self._cache_dir, f'{key}.json') | ||||
|  | ||||
|     def get(self, key): | ||||
|         """Retrieve XML data from the cache and parse it as an ElementTree.""" | ||||
|         """Retrieve JSON data from the cache and parse it as an ElementTree.""" | ||||
|         filename = self._filename(key) | ||||
|         if os.path.exists(filename): | ||||
|             try: | ||||
|                 # Parse and return the cached XML data | ||||
|                 tree = ET.parse(filename) | ||||
|                 return tree.getroot()  # Return the root element of the parsed XML | ||||
|             except ET.ParseError: | ||||
|                 # print(f"Error parsing cached XML file: {filename}") | ||||
|                 return None | ||||
|                 # Open and parse the cached JSON data | ||||
|                 with open(filename, 'r', encoding='utf-8') as file: | ||||
|                     data = json.load(file) | ||||
|                 # Return the data as a list of dicts. | ||||
|                 return data | ||||
|             except json.JSONDecodeError: | ||||
|                 return None  # Return None if parsing fails | ||||
|         return None | ||||
|  | ||||
|     def set(self, key, value): | ||||
|         """Save the XML data as an ElementTree to the cache.""" | ||||
|         """Save the JSON data as an ElementTree to the cache.""" | ||||
|         filename = self._filename(key) | ||||
|         tree = ET.ElementTree(value)  # value is expected to be an ElementTree root element | ||||
|         try: | ||||
|             # Write the XML data to a file | ||||
|             with open(filename, 'wb') as file: | ||||
|                 tree.write(file, encoding='utf-8', xml_declaration=True) | ||||
|             # Write the JSON data to the cache file | ||||
|             with open(filename, 'w', encoding='utf-8') as file: | ||||
|                 json.dump(value, file, ensure_ascii=False, indent=4) | ||||
|         except IOError as e: | ||||
|             raise IOError(f"Error writing to cache file: {filename} - {e}") from e | ||||
|  | ||||
|     def set_hollow(self, key, cell: tuple, osm_types: list, | ||||
|                     selector: str, conditions: list=None, out='center'): | ||||
|         """Create an empty placeholder cache entry for a future fill.""" | ||||
|         hollow_key = f'hollow_{key}' | ||||
|         filename = self._filename(hollow_key) | ||||
|  | ||||
|         # Create the hollow JSON structure | ||||
|         hollow_data = { | ||||
|             "key": key, | ||||
|             "cell": list(cell), | ||||
|             "osm_types": list(osm_types), | ||||
|             "selector": selector, | ||||
|             "conditions": conditions, | ||||
|             "out": out | ||||
|         } | ||||
|         # Write the hollow data to the cache file | ||||
|         try: | ||||
|             with open(filename, 'w', encoding='utf-8') as file: | ||||
|                 json.dump(hollow_data, file, ensure_ascii=False, indent=4) | ||||
|         except IOError as e: | ||||
|             raise IOError(f"Error writing hollow cache to file: {filename} - {e}") from e | ||||
|  | ||||
|     def close(self): | ||||
|         """Cleanup method, if needed.""" | ||||
|         pass | ||||
|  | ||||
| class CachingStrategy: | ||||
|     """ | ||||
|     A class to manage different caching strategies. | ||||
|  | ||||
|     This class provides an interface to switch between different caching strategies  | ||||
|     (e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats,  | ||||
|     depending on the strategy being used. By default, it uses the XMLCache strategy. | ||||
|  | ||||
|     Attributes: | ||||
|     __strategy (CachingStrategyBase): The currently active caching strategy. | ||||
|     __strategies (dict): A mapping between strategy names (as strings) and their corresponding | ||||
|                          classes, allowing dynamic selection of caching strategies. | ||||
|     """ | ||||
|     __strategy = XMLCache()  # Default caching strategy | ||||
|     __strategy = JSONCache()  # Default caching strategy | ||||
|     __strategies = { | ||||
|         'XML': XMLCache, | ||||
|         'JSON': JSONCache, | ||||
|     } | ||||
|  | ||||
|     @classmethod | ||||
|     def use(cls, strategy_name='XML', **kwargs): | ||||
|         """ | ||||
|         Set the caching strategy based on the strategy_name provided. | ||||
|  | ||||
|         Args: | ||||
|             strategy_name (str): The name of the caching strategy (e.g., 'XML'). | ||||
|             **kwargs: Additional keyword arguments to pass when initializing the strategy. | ||||
|         """ | ||||
|         # If a previous strategy exists, close it | ||||
|     def use(cls, strategy_name='JSON', **kwargs): | ||||
|         if cls.__strategy: | ||||
|             cls.__strategy.close() | ||||
|  | ||||
|         # Retrieve the strategy class based on the strategy name | ||||
|         strategy_class = cls.__strategies.get(strategy_name) | ||||
|  | ||||
|         if not strategy_class: | ||||
|             raise ValueError(f"Unknown caching strategy: {strategy_name}") | ||||
|  | ||||
|         # Instantiate the new strategy with the provided arguments | ||||
|         cls.__strategy = strategy_class(**kwargs) | ||||
|         return cls.__strategy | ||||
|  | ||||
|     @classmethod | ||||
|     def get(cls, key): | ||||
|         """Get data from the current strategy's cache.""" | ||||
|         if not cls.__strategy: | ||||
|             raise RuntimeError("Caching strategy has not been set.") | ||||
|         return cls.__strategy.get(key) | ||||
|  | ||||
|     @classmethod | ||||
|     def set(cls, key, value): | ||||
|         """Set data in the current strategy's cache.""" | ||||
|         if not cls.__strategy: | ||||
|             raise RuntimeError("Caching strategy has not been set.") | ||||
|         cls.__strategy.set(key, value) | ||||
|  | ||||
|     @classmethod | ||||
|     def set_hollow(cls, key, cell: tuple, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions: list=None, out='center'): | ||||
|         """Create a hollow cache entry.""" | ||||
|         cls.__strategy.set_hollow(key, cell, osm_types, selector, conditions, out) | ||||
|   | ||||
| @@ -1,14 +1,17 @@ | ||||
| """Module allowing connexion to overpass api and fectch data from OSM.""" | ||||
| from typing import Literal, List | ||||
| import os | ||||
| import urllib | ||||
| import math | ||||
| import logging | ||||
| import xml.etree.ElementTree as ET | ||||
| import json | ||||
| from typing import List, Tuple | ||||
|  | ||||
| from .caching_strategy import get_cache_key, CachingStrategy | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from ..constants import OSM_CACHE_DIR, OSM_TYPES, BBOX | ||||
|  | ||||
| logger = logging.getLogger('Overpass') | ||||
| osm_types = List[Literal['way', 'node', 'relation']] | ||||
|  | ||||
| RESOLUTION = 0.05 | ||||
| CELL = Tuple[int, int] | ||||
|  | ||||
|  | ||||
| class Overpass : | ||||
| @@ -16,7 +19,10 @@ class Overpass : | ||||
|     Overpass class to manage the query building and sending to overpass api. | ||||
|     The caching strategy is a part of this class and initialized upon creation of the Overpass object. | ||||
|     """ | ||||
|     def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) : | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
|     def __init__(self, caching_strategy: str = 'JSON', cache_dir: str = OSM_CACHE_DIR) : | ||||
|         """ | ||||
|         Initialize the Overpass instance with the url, headers and caching strategy. | ||||
|         """ | ||||
| @@ -25,17 +31,109 @@ class Overpass : | ||||
|         self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir) | ||||
|  | ||||
|  | ||||
|     @classmethod | ||||
|     def build_query(self, area: tuple, osm_types: osm_types, | ||||
|                     selector: str, conditions=[], out='center') -> str: | ||||
|     def send_query(self, bbox: BBOX, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions: list=None, out='center') -> List[dict]: | ||||
|         """ | ||||
|         Sends the Overpass QL query to the Overpass API and returns the parsed json response. | ||||
|  | ||||
|         Args: | ||||
|             bbox (tuple): Bounding box for the query. | ||||
|             osm_types (list[str]): List of OSM element types (e.g., 'node', 'way'). | ||||
|             selector (str): Key or tag to filter OSM elements (e.g., 'highway'). | ||||
|             conditions (list): Optional list of additional filter conditions in Overpass QL format. | ||||
|             out (str): Output format ('center', 'body', etc.). Defaults to 'center'. | ||||
|  | ||||
|         Returns: | ||||
|             list:   Parsed json response from the Overpass API, or cached data if available. | ||||
|         """ | ||||
|         # Determine which grid cells overlap with this bounding box. | ||||
|         overlapping_cells = Overpass._get_overlapping_cells(bbox) | ||||
|  | ||||
|         # Retrieve cached data and identify missing cache entries | ||||
|         cached_responses, non_cached_cells = self._retrieve_cached_data(overlapping_cells, osm_types, selector, conditions, out) | ||||
|  | ||||
|         self.logger.info(f'Cache hit for {len(overlapping_cells)-len(non_cached_cells)}/{len(overlapping_cells)} quadrants.') | ||||
|  | ||||
|         # If there is no missing data, return the cached responses after filtering. | ||||
|         if not non_cached_cells : | ||||
|             return Overpass._filter_landmarks(cached_responses, bbox) | ||||
|  | ||||
|         # If there is no cached data, fetch all from Overpass. | ||||
|         elif not cached_responses : | ||||
|             query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out) | ||||
|             return self.fetch_data_from_api(query_str) | ||||
|  | ||||
|         # Hybrid cache: some data from Overpass, some data from cache. | ||||
|         else : | ||||
|             # Resize the bbox for smaller search area and build new query string. | ||||
|             non_cached_bbox = Overpass._get_non_cached_bbox(non_cached_cells, bbox) | ||||
|             query_str = Overpass.build_query(non_cached_bbox, osm_types, selector, conditions, out) | ||||
|             non_cached_responses = self.fetch_data_from_api(query_str) | ||||
|             return Overpass._filter_landmarks(cached_responses, bbox) + non_cached_responses | ||||
|  | ||||
|  | ||||
|     def fetch_data_from_api(self, query_str: str) -> List[dict]: | ||||
|         """ | ||||
|         Fetch data from the Overpass API and return the json data. | ||||
|  | ||||
|         Args: | ||||
|             query_str (str): The Overpass query string. | ||||
|  | ||||
|         Returns: | ||||
|             dict: Combined cached and fetched data. | ||||
|         """ | ||||
|         try: | ||||
|             data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') | ||||
|             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) | ||||
|  | ||||
|             with urllib.request.urlopen(request) as response: | ||||
|                 response_data = response.read().decode('utf-8')  # Convert the HTTPResponse to a string | ||||
|                 data = json.loads(response_data)  # Load the JSON from the string | ||||
|                 elements = data.get('elements', []) | ||||
|                 # self.logger.debug(f'Query = {query_str}') | ||||
|                 return elements | ||||
|  | ||||
|         except urllib.error.URLError as e: | ||||
|             self.logger.error(f"Error connecting to Overpass API: {e}") | ||||
|             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||
|         except Exception as exc : | ||||
|             raise Exception(f'An unexpected error occured: {str(exc)}') from exc | ||||
|  | ||||
|  | ||||
|     def fill_cache(self, json_data: dict) : | ||||
|         """ | ||||
|         Fill cache with data by using a hollow cache entry's information. | ||||
|         """ | ||||
|         query_str, cache_key = Overpass._build_query_from_hollow(json_data) | ||||
|         try: | ||||
|             data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') | ||||
|             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) | ||||
|  | ||||
|             with urllib.request.urlopen(request) as response: | ||||
|  | ||||
|                 # Convert the HTTPResponse to a string and load data | ||||
|                 response_data = response.read().decode('utf-8')   | ||||
|                 data = json.loads(response_data) | ||||
|  | ||||
|                 # Get elements and set cache | ||||
|                 elements = data.get('elements', []) | ||||
|                 self.caching_strategy.set(cache_key, elements) | ||||
|                 self.logger.debug(f'Cache set for {cache_key}') | ||||
|         except urllib.error.URLError as e: | ||||
|             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||
|         except Exception as exc : | ||||
|             raise Exception(f'An unexpected error occured: {str(exc)}') from exc | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def build_query(bbox: BBOX, osm_types: OSM_TYPES, | ||||
|                     selector: str, conditions: list=None, out='center') -> str: | ||||
|         """ | ||||
|         Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data. | ||||
|  | ||||
|         Args: | ||||
|             area (tuple): A tuple representing the geographical search area, typically in the format  | ||||
|                         (radius, latitude, longitude). The first element is a string like "around:2000"  | ||||
|                         specifying the search radius, and the second and third elements represent  | ||||
|                         the latitude and longitude as floats or strings. | ||||
|             bbox (tuple): A tuple representing the geographical search area, typically in the format  | ||||
|                         (lat_min, lon_min, lat_max, lon_max). | ||||
|             osm_types (list[str]): A list of OSM element types to search for. Must be one or more of  | ||||
|                                     'Way', 'Node', or 'Relation'. | ||||
|             selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.). | ||||
| @@ -52,82 +150,203 @@ class Overpass : | ||||
|         Notes: | ||||
|             - If no conditions are provided, the query will just use the `selector` to filter the OSM  | ||||
|             elements without additional constraints. | ||||
|             - The search area must always formatted as "(radius, lat, lon)". | ||||
|         """ | ||||
|         if not isinstance(conditions, list) : | ||||
|             conditions = [conditions] | ||||
|         if not isinstance(osm_types, list) : | ||||
|             osm_types = [osm_types] | ||||
|         query = '[out:json];(' | ||||
|  | ||||
|         query = '(' | ||||
|         # convert the bbox to string. | ||||
|         bbox_str = f"({','.join(map(str, bbox))})" | ||||
|  | ||||
|         # Round the radius to nearest 50 and coordinates to generate less queries | ||||
|         if area[0] > 500 : | ||||
|             search_radius = round(area[0] / 50) * 50 | ||||
|             loc = tuple((round(area[1], 2), round(area[2], 2))) | ||||
|         else : | ||||
|             search_radius = round(area[0] / 25) * 25 | ||||
|             loc = tuple((round(area[1], 3), round(area[2], 3))) | ||||
|  | ||||
|         search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})" | ||||
|  | ||||
|         if conditions : | ||||
|         if conditions is not None and len(conditions) > 0: | ||||
|             conditions = '(if: ' + ' && '.join(conditions) + ')' | ||||
|         else : | ||||
|             conditions = '' | ||||
|  | ||||
|         for elem in osm_types : | ||||
|             query += elem + '[' + selector + ']' + conditions + search_area + ';' | ||||
|             query += elem + '[' + selector + ']' + conditions + bbox_str + ';' | ||||
|  | ||||
|         query += ');' + f'out {out};' | ||||
|  | ||||
|         return query | ||||
|  | ||||
|  | ||||
|     def send_query(self, query: str) -> ET: | ||||
|     def _retrieve_cached_data(self, overlapping_cells: CELL, osm_types: OSM_TYPES, | ||||
|                               selector: str, conditions: list, out: str) -> Tuple[List[dict], list[CELL]]: | ||||
|         """ | ||||
|         Sends the Overpass QL query to the Overpass API and returns the parsed JSON response. | ||||
|         Retrieve cached data and identify missing cache quadrants. | ||||
|  | ||||
|         Args: | ||||
|             query (str): The Overpass QL query to be sent to the Overpass API. | ||||
|             overlapping_cells (list): Cells to check for cached data. | ||||
|             osm_types (list): OSM types (e.g., 'node', 'way'). | ||||
|             selector (str): Key or tag to filter OSM elements. | ||||
|             conditions (list): Additional conditions to apply. | ||||
|             out (str): Output format. | ||||
|  | ||||
|         Returns: | ||||
|             dict: The parsed JSON response from the Overpass API, or None if the request fails. | ||||
|             tuple: A tuple containing: | ||||
|                 - cached_responses (list): List of cached data found. | ||||
|                 - non_cached_cells (list(tuple)): List of cells with missing data. | ||||
|         """ | ||||
|         cell_key_dict = {} | ||||
|         for cell in overlapping_cells : | ||||
|             for elem in osm_types : | ||||
|                 key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})" | ||||
|  | ||||
|             cell_key_dict[cell] = get_cache_key(key_str) | ||||
|  | ||||
|         cached_responses = [] | ||||
|         non_cached_cells = [] | ||||
|  | ||||
|         # Retrieve the cached data and mark the missing entries as hollow | ||||
|         for cell, key in cell_key_dict.items(): | ||||
|             cached_data = self.caching_strategy.get(key) | ||||
|             if cached_data is not None : | ||||
|                 cached_responses += cached_data | ||||
|             else: | ||||
|                 self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out) | ||||
|                 non_cached_cells.append(cell) | ||||
|  | ||||
|         return cached_responses, non_cached_cells | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def _build_query_from_hollow(json_data: dict) -> Tuple[str, str]: | ||||
|         """ | ||||
|         Build query string using information from a hollow cache entry. | ||||
|         """ | ||||
|         # Extract values from the JSON object | ||||
|         key = json_data.get('key') | ||||
|         cell = tuple(json_data.get('cell')) | ||||
|         bbox = Overpass._get_bbox_from_grid_cell(cell) | ||||
|         osm_types = json_data.get('osm_types') | ||||
|         selector = json_data.get('selector') | ||||
|         conditions = json_data.get('conditions') | ||||
|         out = json_data.get('out') | ||||
|  | ||||
|  | ||||
|         query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out) | ||||
|         return query_str, key | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def _get_overlapping_cells(query_bbox: tuple) -> List[CELL]: | ||||
|         """ | ||||
|         Returns a set of all grid cells that overlap with the given bounding box. | ||||
|         """ | ||||
|         # Extract location from the query bbox | ||||
|         lat_min, lon_min, lat_max, lon_max = query_bbox | ||||
|  | ||||
|         min_lat_cell, min_lon_cell = Overpass._get_grid_cell(lat_min, lon_min) | ||||
|         max_lat_cell, max_lon_cell = Overpass._get_grid_cell(lat_max, lon_max) | ||||
|  | ||||
|         overlapping_cells = set() | ||||
|         for lat_idx in range(min_lat_cell, max_lat_cell + 1): | ||||
|             for lon_idx in range(min_lon_cell, max_lon_cell + 1): | ||||
|                 overlapping_cells.add((lat_idx, lon_idx)) | ||||
|  | ||||
|         return overlapping_cells | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def _get_grid_cell(lat: float, lon: float) -> CELL: | ||||
|         """ | ||||
|         Returns the grid cell coordinates for a given latitude and longitude. | ||||
|         Each grid cell is 0.05°lat x 0.05°lon resolution in size. | ||||
|         """ | ||||
|         lat_index = math.floor(lat / RESOLUTION) | ||||
|         lon_index = math.floor(lon / RESOLUTION) | ||||
|         return (lat_index, lon_index) | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def _get_bbox_from_grid_cell(cell: CELL) -> BBOX: | ||||
|         """ | ||||
|         Returns the bounding box for a given grid cell index. | ||||
|         Each grid cell is resolution x resolution in size. | ||||
|  | ||||
|         The bounding box is returned as (min_lat, min_lon, max_lat, max_lon). | ||||
|         """ | ||||
|         # Calculate the southwest (min_lat, min_lon) corner of the bounding box | ||||
|         min_lat = round(cell[0] * RESOLUTION, 2) | ||||
|         min_lon = round(cell[1] * RESOLUTION, 2) | ||||
|  | ||||
|         # Calculate the northeast (max_lat, max_lon) corner of the bounding box | ||||
|         max_lat = round((cell[0] + 1) * RESOLUTION, 2) | ||||
|         max_lon = round((cell[1] + 1) * RESOLUTION, 2) | ||||
|  | ||||
|         return (min_lat, min_lon, max_lat, max_lon) | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def _get_non_cached_bbox(non_cached_cells: List[CELL], original_bbox: BBOX): | ||||
|         """ | ||||
|         Calculate the non-cached bounding box by excluding cached cells. | ||||
|  | ||||
|         Args: | ||||
|             non_cached_cells (list): The list of cells that were not found in the cache. | ||||
|             original_bbox (tuple): The original bounding box (min_lat, min_lon, max_lat, max_lon). | ||||
|  | ||||
|         Returns: | ||||
|             tuple: The new bounding box that excludes cached cells, or None if all cells are cached. | ||||
|         """ | ||||
|         if not non_cached_cells: | ||||
|             return None  # All cells were cached | ||||
|  | ||||
|         # Initialize the non-cached bounding box with extreme values | ||||
|         min_lat, min_lon, max_lat, max_lon = float('inf'), float('inf'), float('-inf'), float('-inf') | ||||
|  | ||||
|         # Iterate over non-cached cells to find the new bounding box | ||||
|         for cell in non_cached_cells: | ||||
|             cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon = Overpass._get_bbox_from_grid_cell(cell) | ||||
|  | ||||
|             min_lat = min(min_lat, cell_min_lat) | ||||
|             min_lon = min(min_lon, cell_min_lon) | ||||
|             max_lat = max(max_lat, cell_max_lat) | ||||
|             max_lon = max(max_lon, cell_max_lon) | ||||
|  | ||||
|         # If no update to bounding box, return the original | ||||
|         if min_lat == float('inf') or min_lon == float('inf'): | ||||
|             return None | ||||
|  | ||||
|         return (max(min_lat, original_bbox[0]),  | ||||
|                 max(min_lon, original_bbox[1]),  | ||||
|                 min(max_lat, original_bbox[2]),  | ||||
|                 min(max_lon, original_bbox[3])) | ||||
|  | ||||
|  | ||||
|     @staticmethod | ||||
|     def _filter_landmarks(elements: List[dict], bbox: BBOX) -> List[dict]: | ||||
|         """ | ||||
|         Filters elements based on whether their coordinates are inside the given bbox. | ||||
|  | ||||
|         Args: | ||||
|         - elements (list of dict): List of elements containing coordinates. | ||||
|         - bbox (tuple): A bounding box defined as (min_lat, min_lon, max_lat, max_lon). | ||||
|  | ||||
|         Returns: | ||||
|         - list: A list of elements whose coordinates are inside the bounding box. | ||||
|         """ | ||||
|  | ||||
|         # Generate a cache key for the current query | ||||
|         cache_key = get_cache_key(query) | ||||
|         filtered_elements = [] | ||||
|         min_lat, min_lon, max_lat, max_lon = bbox | ||||
|  | ||||
|         # Try to fetch the result from the cache | ||||
|         cached_response = self.caching_strategy.get(cache_key) | ||||
|         if cached_response is not None : | ||||
|             logger.debug("Cache hit.") | ||||
|             return cached_response | ||||
|         for elem in elements: | ||||
|             # Extract coordinates based on the 'type' of element | ||||
|             if elem.get('type') != 'node': | ||||
|                 center = elem.get('center', {}) | ||||
|                 lat = float(center.get('lat', 0)) | ||||
|                 lon = float(center.get('lon', 0)) | ||||
|             else: | ||||
|                 lat = float(elem.get('lat', 0)) | ||||
|                 lon = float(elem.get('lon', 0)) | ||||
|  | ||||
|         # Prepare the data to be sent as POST request, encoded as bytes | ||||
|         data = urllib.parse.urlencode({'data': query}).encode('utf-8') | ||||
|             # Check if the coordinates fall within the given bounding box | ||||
|             if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon: | ||||
|                 filtered_elements.append(elem) | ||||
|  | ||||
|         try: | ||||
|             # Create a Request object with the specified URL, data, and headers | ||||
|             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) | ||||
|  | ||||
|             # Send the request and read the response | ||||
|             with urllib.request.urlopen(request) as response: | ||||
|                 # Read and decode the response | ||||
|                 response_data = response.read().decode('utf-8') | ||||
|                 root = ET.fromstring(response_data) | ||||
|  | ||||
|                 # Cache the response data as an ElementTree root | ||||
|                 self.caching_strategy.set(cache_key, root) | ||||
|                 logger.debug("Response data added to cache.") | ||||
|  | ||||
|                 return root | ||||
|  | ||||
|         except urllib.error.URLError as e: | ||||
|             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||
|         return filtered_elements | ||||
|  | ||||
|  | ||||
| def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | ||||
| def get_base_info(elem: dict, osm_type: OSM_TYPES, with_name=False) : | ||||
|     """ | ||||
|     Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element. | ||||
|  | ||||
| @@ -136,7 +355,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | ||||
|     extracting coordinates either directly or from a center tag, depending on the element type. | ||||
|  | ||||
|     Args: | ||||
|         elem (ET.Element): The XML element representing the OSM entity. | ||||
|         elem (dict): The JSON element representing the OSM entity. | ||||
|         osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates | ||||
|                         are extracted directly from the element; otherwise, from the 'center' tag. | ||||
|         with_name (bool): Whether to extract and return the name of the element. If True, it attempts | ||||
| @@ -150,7 +369,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | ||||
|     """ | ||||
|     # 1. extract coordinates | ||||
|     if osm_type != 'node' : | ||||
|         center = elem.find('center') | ||||
|         center = elem.get('center') | ||||
|         lat = float(center.get('lat')) | ||||
|         lon = float(center.get('lon')) | ||||
|  | ||||
| @@ -165,7 +384,31 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | ||||
|  | ||||
|     # 3. Extract name if specified and return | ||||
|     if with_name : | ||||
|         name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None | ||||
|         name = elem.get('tags', {}).get('name') | ||||
|         return osm_id, coords, name | ||||
|     else : | ||||
|         return osm_id, coords | ||||
|  | ||||
|  | ||||
| def fill_cache(): | ||||
|     """ | ||||
|     Scans the specified cache directory for files starting with 'hollow_' and attempts to load | ||||
|     their contents as JSON to fill the cache of the Overpass system. | ||||
|     """ | ||||
|     overpass = Overpass() | ||||
|  | ||||
|     with os.scandir(OSM_CACHE_DIR) as it: | ||||
|         for entry in it: | ||||
|             if entry.is_file() and entry.name.startswith('hollow_'): | ||||
|  | ||||
|                 try : | ||||
|                     # Read the whole file content as a string | ||||
|                     with open(entry.path, 'r') as f: | ||||
|                         # load data and fill the cache with the query and key | ||||
|                         json_data = json.load(f) | ||||
|                         overpass.fill_cache(json_data) | ||||
|                     # Now delete the file as the cache is filled | ||||
|                     os.remove(entry.path) | ||||
|  | ||||
|                 except Exception as exc : | ||||
|                     overpass.logger.error(f'An error occured while parsing file {entry.path} as .json file') | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| city_bbox_side: 7500 #m | ||||
| max_bbox_side: 4000   #m | ||||
| radius_close_to: 50 | ||||
| church_coeff: 0.55 | ||||
| nature_coeff: 1.4 | ||||
| @@ -8,5 +8,5 @@ image_bonus: 1.1 | ||||
| viewpoint_bonus: 5 | ||||
| wikipedia_bonus: 1.25 | ||||
| name_bonus: 3 | ||||
| N_important: 40 | ||||
| N_important: 60 | ||||
| pay_bonus: -1 | ||||
|   | ||||
| @@ -4,3 +4,6 @@ average_walking_speed: 4.8 | ||||
| max_landmarks: 10 | ||||
| max_landmarks_refiner: 20 | ||||
| overshoot: 0.0016 | ||||
| time_limit: 1 | ||||
| gap_rel: 0.05 | ||||
| max_iter: 40 | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| from typing import Optional, Literal | ||||
| from uuid import uuid4, UUID | ||||
| from pydantic import BaseModel, Field | ||||
| from pydantic import BaseModel, ConfigDict, Field | ||||
|  | ||||
|  | ||||
| # Output to frontend | ||||
| @@ -144,8 +144,4 @@ class Toilets(BaseModel) : | ||||
|         """ | ||||
|         return f'Toilets @{self.location}' | ||||
|  | ||||
|     class Config: | ||||
|         """ | ||||
|         This allows us to easily convert the model to and from dictionaries | ||||
|         """ | ||||
|         from_attributes = True | ||||
|     model_config = ConfigDict(from_attributes=True) | ||||
|   | ||||
| @@ -27,11 +27,13 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name | ||||
|         "/trip/new", | ||||
|         json={ | ||||
|             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, | ||||
|             "nature": {"type": "nature", "score": 5}, | ||||
|             "shopping": {"type": "shopping", "score": 5}, | ||||
|             "nature": {"type": "nature", "score": 0}, | ||||
|             "shopping": {"type": "shopping", "score": 0}, | ||||
|             "max_time_minute": duration_minutes, | ||||
|             "detour_tolerance_minute": 0}, | ||||
|             "start": [48.084588, 7.280405] | ||||
|             # "start": [48.084588, 7.280405] | ||||
|             # "start": [45.74445023349939, 4.8222687890538865] | ||||
|             "start": [45.75156398104873, 4.827154464827647] | ||||
|             } | ||||
|         ) | ||||
|     result = response.json() | ||||
| @@ -51,11 +53,11 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name | ||||
|     assert response.status_code == 200  # check for successful planning | ||||
|     assert isinstance(landmarks, list)  # check that the return type is a list | ||||
|     assert len(landmarks) > 2           # check that there is something to visit | ||||
|     assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" | ||||
|     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" | ||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||
|     # assert 2!= 3 | ||||
|  | ||||
|  | ||||
| def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. | ||||
| @@ -97,10 +99,9 @@ def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" | ||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||
|  | ||||
|  | ||||
| def test_cologne(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. | ||||
|     Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area. | ||||
|      | ||||
|     Args: | ||||
|         client: | ||||
| @@ -141,7 +142,7 @@ def test_cologne(client, request) :   # pylint: disable=redefined-outer-name | ||||
|  | ||||
| def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. | ||||
|     Test n°4 : Custom test in Strasbourg to ensure proper decision making in crowded area. | ||||
|      | ||||
|     Args: | ||||
|         client: | ||||
| @@ -182,7 +183,7 @@ def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name | ||||
|  | ||||
| def test_zurich(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. | ||||
|     Test n°5 : Custom test in Zurich to ensure proper decision making in crowded area. | ||||
|      | ||||
|     Args: | ||||
|         client: | ||||
| @@ -223,24 +224,24 @@ def test_zurich(client, request) :   # pylint: disable=redefined-outer-name | ||||
|  | ||||
| def test_paris(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area. | ||||
|     Test n°6 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area. | ||||
|      | ||||
|     Args: | ||||
|         client: | ||||
|         request: | ||||
|     """ | ||||
|     start_time = time.time()  # Start timer | ||||
|     duration_minutes = 300 | ||||
|     duration_minutes = 200 | ||||
|  | ||||
|     response = client.post( | ||||
|         "/trip/new", | ||||
|         json={ | ||||
|             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, | ||||
|                             "nature": {"type": "nature", "score": 5}, | ||||
|                             "nature": {"type": "nature", "score": 0}, | ||||
|                             "shopping": {"type": "shopping", "score": 5}, | ||||
|                             "max_time_minute": duration_minutes, | ||||
|                             "detour_tolerance_minute": 0}, | ||||
|             "start": [48.86248803298562, 2.346451131285925] | ||||
|             "start": [48.85468881798671, 2.3423925755998374] | ||||
|             } | ||||
|         ) | ||||
|     result = response.json() | ||||
| @@ -264,7 +265,7 @@ def test_paris(client, request) :   # pylint: disable=redefined-outer-name | ||||
|  | ||||
| def test_new_york(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area. | ||||
|     Test n°7 : Custom test in New York to ensure proper decision making in crowded area. | ||||
|      | ||||
|     Args: | ||||
|         client: | ||||
| @@ -305,7 +306,7 @@ def test_new_york(client, request) :   # pylint: disable=redefined-outer-name | ||||
|  | ||||
| def test_shopping(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     """ | ||||
|     Test n°3 : Custom test in Lyon centre to ensure shopping clusters are found. | ||||
|     Test n°8 : Custom test in Lyon centre to ensure shopping clusters are found. | ||||
|      | ||||
|     Args: | ||||
|         client: | ||||
| @@ -334,11 +335,11 @@ def test_shopping(client, request) :   # pylint: disable=redefined-outer-name | ||||
|     # Add details to report | ||||
|     log_trip_details(request, landmarks, result['total_time'], duration_minutes) | ||||
|  | ||||
|     for elem in landmarks : | ||||
|         print(elem) | ||||
|     # for elem in landmarks : | ||||
|     #     print(elem) | ||||
|  | ||||
|     # checks : | ||||
|     assert response.status_code == 200  # check for successful planning | ||||
|     assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" | ||||
|     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" | ||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||
| @@ -9,7 +9,8 @@ from pydantic import BaseModel | ||||
| from ..overpass.overpass import Overpass, get_base_info | ||||
| from ..structs.landmark import Landmark | ||||
| from .get_time_distance import get_distance | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from .utils import create_bbox | ||||
|  | ||||
|  | ||||
|  | ||||
| # silence the overpass logger | ||||
| @@ -79,8 +80,7 @@ class ClusterManager: | ||||
|             bbox: The bounding box coordinates (around:radius, center_lat, center_lon). | ||||
|         """ | ||||
|         # Setup the caching in the Overpass class. | ||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) | ||||
|  | ||||
|         self.overpass = Overpass() | ||||
|  | ||||
|         self.cluster_type = cluster_type | ||||
|         if cluster_type == 'shopping' : | ||||
| @@ -95,32 +95,29 @@ class ClusterManager: | ||||
|             raise NotImplementedError("Please choose only an available option for cluster detection") | ||||
|  | ||||
|         # Initialize the points for cluster detection | ||||
|         query = self.overpass.build_query( | ||||
|             area = bbox, | ||||
|         try: | ||||
|             result = self.overpass.send_query( | ||||
|             bbox = bbox, | ||||
|             osm_types = osm_types, | ||||
|             selector = sel, | ||||
|             out = out | ||||
|         ) | ||||
|         self.logger.debug(f"Cluster query: {query}") | ||||
|  | ||||
|         try: | ||||
|             result = self.overpass.send_query(query) | ||||
|         except Exception as e: | ||||
|             self.logger.error(f"Error fetching landmarks: {e}") | ||||
|             self.logger.error(f"Error fetching clusters: {e}") | ||||
|  | ||||
|         if result is None : | ||||
|             self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.") | ||||
|             self.logger.debug(f"Found no {cluster_type} clusters, overpass query returned no datapoints.") | ||||
|             self.valid = False | ||||
|  | ||||
|         else : | ||||
|             points = [] | ||||
|             for osm_type in osm_types : | ||||
|                 for elem in result.findall(osm_type): | ||||
|                      | ||||
|                     # Get coordinates and append them to the points list | ||||
|                     _, coords = get_base_info(elem, osm_type) | ||||
|                     if coords is not None : | ||||
|                         points.append(coords) | ||||
|             for elem in result: | ||||
|                 osm_type = elem.get('type') | ||||
|                  | ||||
|                 # Get coordinates and append them to the points list | ||||
|                 _, coords = get_base_info(elem, osm_type) | ||||
|                 if coords is not None : | ||||
|                     points.append(coords) | ||||
|  | ||||
|             if points : | ||||
|                 self.all_points = np.array(points) | ||||
| @@ -137,7 +134,7 @@ class ClusterManager: | ||||
|  | ||||
|                 # Check that there are is least 1 cluster | ||||
|                 if len(set(labels)) > 1 : | ||||
|                     self.logger.debug(f"Found {len(set(labels))} different clusters.") | ||||
|                     self.logger.info(f"Found {len(set(labels))} different {cluster_type} clusters.") | ||||
|                     # Separate clustered points and noise points | ||||
|                     self.cluster_points = self.all_points[labels != -1] | ||||
|                     self.cluster_labels = labels[labels != -1] | ||||
| @@ -145,7 +142,7 @@ class ClusterManager: | ||||
|                     self.valid = True | ||||
|  | ||||
|                 else : | ||||
|                     self.logger.debug(f"Detected 0 {cluster_type} clusters.") | ||||
|                     self.logger.info(f"Found 0 {cluster_type} clusters.") | ||||
|                     self.valid = False | ||||
|  | ||||
|             else : | ||||
| @@ -218,9 +215,8 @@ class ClusterManager: | ||||
|         """ | ||||
|  | ||||
|         # Define the bounding box for a given radius around the coordinates | ||||
|         lat, lon = cluster.centroid | ||||
|         bbox = (1000, lat, lon) | ||||
|  | ||||
|         bbox = create_bbox(cluster.centroid, 1000) | ||||
|          | ||||
|         # Query neighborhoods and shopping malls | ||||
|         selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"'] | ||||
|  | ||||
| @@ -238,37 +234,34 @@ class ClusterManager: | ||||
|         osm_types = ['node', 'way', 'relation'] | ||||
|  | ||||
|         for sel in selectors : | ||||
|             query = self.overpass.build_query( | ||||
|                 area = bbox, | ||||
|                 osm_types = osm_types, | ||||
|                 selector = sel, | ||||
|                 out = 'ids center' | ||||
|             ) | ||||
|  | ||||
|             try: | ||||
|                 result = self.overpass.send_query(query) | ||||
|                 result = self.overpass.send_query(bbox = bbox, | ||||
|                                                   osm_types = osm_types, | ||||
|                                                   selector = sel, | ||||
|                                                   out = 'ids center' | ||||
|                                                   ) | ||||
|             except Exception as e: | ||||
|                 self.logger.error(f"Error fetching landmarks: {e}") | ||||
|                 self.logger.error(f"Error fetching clusters: {e}") | ||||
|                 continue | ||||
|  | ||||
|             if result is None : | ||||
|                 self.logger.error(f"Error fetching landmarks: {e}") | ||||
|                 self.logger.error(f"Error fetching clusters: {e}") | ||||
|                 continue | ||||
|  | ||||
|             for osm_type in osm_types : | ||||
|                 for elem in result.findall(osm_type): | ||||
|             for elem in result: | ||||
|                 osm_type = elem.get('type') | ||||
|  | ||||
|                     id, coords, name = get_base_info(elem, osm_type, with_name=True) | ||||
|                 id, coords, name = get_base_info(elem, osm_type, with_name=True) | ||||
|  | ||||
|                     if name is None or coords is None : | ||||
|                         continue | ||||
|                 if name is None or coords is None : | ||||
|                     continue | ||||
|  | ||||
|                     d = get_distance(cluster.centroid, coords) | ||||
|                     if  d < min_dist : | ||||
|                         min_dist = d | ||||
|                         new_name = name | ||||
|                         osm_type = osm_type     # Add type: 'way' or 'relation' | ||||
|                         osm_id = id             # Add OSM id | ||||
|                 d = get_distance(cluster.centroid, coords) | ||||
|                 if  d < min_dist : | ||||
|                     min_dist = d | ||||
|                     new_name = name | ||||
|                     osm_type = osm_type     # Add type: 'way' or 'relation' | ||||
|                     osm_id = id             # Add OSM id | ||||
|  | ||||
|         return Landmark( | ||||
|             name=new_name, | ||||
|   | ||||
| @@ -1,19 +1,15 @@ | ||||
| """Module used to import data from OSM and arrange them in categories.""" | ||||
| import logging | ||||
| import xml.etree.ElementTree as ET | ||||
| import yaml | ||||
|  | ||||
|  | ||||
| from ..structs.preferences import Preferences | ||||
| from ..structs.landmark import Landmark | ||||
| from .take_most_important import take_most_important | ||||
| from .cluster_manager import ClusterManager | ||||
| from ..overpass.overpass import Overpass, get_base_info | ||||
| from .utils import create_bbox | ||||
|  | ||||
| from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR | ||||
|  | ||||
| # silence the overpass logger | ||||
| logging.getLogger('Overpass').setLevel(level=logging.CRITICAL) | ||||
| from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH | ||||
|  | ||||
|  | ||||
| class LandmarkManager: | ||||
| @@ -37,8 +33,7 @@ class LandmarkManager: | ||||
|  | ||||
|         with LANDMARK_PARAMETERS_PATH.open('r') as f: | ||||
|             parameters = yaml.safe_load(f) | ||||
|             self.max_bbox_side = parameters['city_bbox_side'] | ||||
|             self.radius_close_to = parameters['radius_close_to'] | ||||
|             self.max_bbox_side = parameters['max_bbox_side'] | ||||
|             self.church_coeff = parameters['church_coeff'] | ||||
|             self.nature_coeff = parameters['nature_coeff'] | ||||
|             self.overall_coeff = parameters['overall_coeff'] | ||||
| @@ -56,7 +51,7 @@ class LandmarkManager: | ||||
|             self.detour_factor = parameters['detour_factor'] | ||||
|  | ||||
|         # Setup the caching in the Overpass class. | ||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) | ||||
|         self.overpass = Overpass() | ||||
|  | ||||
|         self.logger.info('LandmakManager successfully initialized.') | ||||
|  | ||||
| @@ -80,39 +75,39 @@ class LandmarkManager: | ||||
|         """ | ||||
|         self.logger.debug('Starting to fetch landmarks...') | ||||
|         max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor) | ||||
|         reachable_bbox_side = min(max_walk_dist, self.max_bbox_side) | ||||
|         radius = min(max_walk_dist, int(self.max_bbox_side/2)) | ||||
|  | ||||
|         # use set to avoid duplicates, this requires some __methods__ to be set in Landmark | ||||
|         all_landmarks = set() | ||||
|  | ||||
|         # Create a bbox using the around technique, tuple of strings | ||||
|         bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1])) | ||||
|         bbox = create_bbox(center_coordinates, radius) | ||||
|  | ||||
|         # list for sightseeing | ||||
|         if preferences.sightseeing.score != 0: | ||||
|             self.logger.debug('Fetching sightseeing landmarks...') | ||||
|             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score) | ||||
|             all_landmarks.update(current_landmarks) | ||||
|             self.logger.debug('Fetching sightseeing clusters...') | ||||
|             self.logger.info(f'Found {len(current_landmarks)} sightseeing landmarks') | ||||
|  | ||||
|             # special pipeline for historic neighborhoods | ||||
|             neighborhood_manager = ClusterManager(bbox, 'sightseeing') | ||||
|             historic_clusters = neighborhood_manager.generate_clusters() | ||||
|             all_landmarks.update(historic_clusters) | ||||
|             self.logger.debug('Sightseeing clusters done') | ||||
|  | ||||
|         # list for nature | ||||
|         if preferences.nature.score != 0: | ||||
|             self.logger.debug('Fetching nature landmarks...') | ||||
|             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score) | ||||
|             all_landmarks.update(current_landmarks) | ||||
|             self.logger.info(f'Found {len(current_landmarks)} nature landmarks') | ||||
|  | ||||
|  | ||||
|         # list for shopping | ||||
|         if preferences.shopping.score != 0: | ||||
|             self.logger.debug('Fetching shopping landmarks...') | ||||
|             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score) | ||||
|             self.logger.debug('Fetching shopping clusters...') | ||||
|             self.logger.info(f'Found {len(current_landmarks)} shopping landmarks') | ||||
|  | ||||
|             # set time for all shopping activites : | ||||
|             for landmark in current_landmarks : | ||||
| @@ -123,8 +118,6 @@ class LandmarkManager: | ||||
|             shopping_manager = ClusterManager(bbox, 'shopping') | ||||
|             shopping_clusters = shopping_manager.generate_clusters() | ||||
|             all_landmarks.update(shopping_clusters) | ||||
|             self.logger.debug('Shopping clusters done') | ||||
|  | ||||
|  | ||||
|  | ||||
|         landmarks_constrained = take_most_important(all_landmarks, self.n_important) | ||||
| @@ -179,7 +172,7 @@ class LandmarkManager: | ||||
|         """ | ||||
|         return_list = [] | ||||
|  | ||||
|         if landmarktype == 'nature' : query_conditions = [] | ||||
|         if landmarktype == 'nature' : query_conditions = None | ||||
|         else : query_conditions = ['count_tags()>5'] | ||||
|  | ||||
|         # caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously | ||||
| @@ -190,117 +183,115 @@ class LandmarkManager: | ||||
|             osm_types = ['way', 'relation'] | ||||
|  | ||||
|             if 'viewpoint' in sel : | ||||
|                 query_conditions = [] | ||||
|                 query_conditions = None | ||||
|                 osm_types.append('node') | ||||
|  | ||||
|             query = self.overpass.build_query( | ||||
|                 area = bbox, | ||||
|                 osm_types = osm_types, | ||||
|                 selector = sel, | ||||
|                 conditions = query_conditions,        # except for nature.... | ||||
|                 out = 'center' | ||||
|                 ) | ||||
|             self.logger.debug(f"Query: {query}") | ||||
|  | ||||
|             # Send the overpass query | ||||
|             try: | ||||
|                 result = self.overpass.send_query(query) | ||||
|                 result = self.overpass.send_query( | ||||
|                     bbox = bbox, | ||||
|                     osm_types = osm_types, | ||||
|                     selector = sel, | ||||
|                     conditions = query_conditions,        # except for nature.... | ||||
|                     out = 'ids center tags' | ||||
|                     ) | ||||
|             except Exception as e: | ||||
|                 self.logger.error(f"Error fetching landmarks: {e}") | ||||
|                 continue | ||||
|  | ||||
|             return_list += self.xml_to_landmarks(result, landmarktype, preference_level) | ||||
|             return_list += self._to_landmarks(result, landmarktype, preference_level) | ||||
|  | ||||
|         self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}") | ||||
|  | ||||
|         return return_list | ||||
|  | ||||
|  | ||||
|     def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]: | ||||
|     def _to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]: | ||||
|         """ | ||||
|         Parse the Overpass API result and extract landmarks. | ||||
|  | ||||
|         This method processes the XML root element returned by the Overpass API and  | ||||
|         This method processes the JSON elements returned by the Overpass API and  | ||||
|         extracts landmarks of types 'node', 'way', and 'relation'. It retrieves  | ||||
|         relevant information such as name, coordinates, and tags, and converts them  | ||||
|         into Landmark objects. | ||||
|  | ||||
|         Args: | ||||
|         root (ET.Element): The root element of the XML response from Overpass API. | ||||
|         elements (list): The elements of json response from Overpass API. | ||||
|         elem_type (str): The type of landmark (e.g., node, way, relation). | ||||
|  | ||||
|         Returns: | ||||
|         list[Landmark]: A list of Landmark objects extracted from the XML data. | ||||
|         list[Landmark]: A list of Landmark objects extracted from the JSON data. | ||||
|         """ | ||||
|         if root is None : | ||||
|         if elements is None : | ||||
|             return [] | ||||
|  | ||||
|         landmarks = [] | ||||
|         for osm_type in ['node', 'way', 'relation'] : | ||||
|             for elem in root.findall(osm_type): | ||||
|         for elem in elements: | ||||
|             osm_type = elem.get('type') | ||||
|  | ||||
|                 id, coords, name = get_base_info(elem, osm_type, with_name=True) | ||||
|  | ||||
|                 if name is None or coords is None : | ||||
|                     continue | ||||
|  | ||||
|                 tags = elem.findall('tag') | ||||
|  | ||||
|                 # Convert this to Landmark object | ||||
|                 landmark = Landmark(name=name, | ||||
|                                     type=landmarktype, | ||||
|                                     location=coords, | ||||
|                                     osm_id=id, | ||||
|                                     osm_type=osm_type, | ||||
|                                     attractiveness=0, | ||||
|                                     n_tags=len(tags)) | ||||
|  | ||||
|                 # Browse through tags to add information to landmark. | ||||
|                 for tag in tags: | ||||
|                     key = tag.get('k') | ||||
|                     value = tag.get('v') | ||||
|  | ||||
|                     # Skip this landmark if not suitable. | ||||
|                     if key == 'building:part' and value == 'yes' : | ||||
|                         break | ||||
|                     if 'disused:' in key : | ||||
|                         break | ||||
|                     if 'boundary:' in key : | ||||
|                         break | ||||
|                     if 'shop' in key and landmarktype != 'shopping' : | ||||
|                         break | ||||
|                     # if value == 'apartments' : | ||||
|                     #     break | ||||
|  | ||||
|                     # Fill in the other attributes. | ||||
|                     if key == 'image' : | ||||
|                         landmark.image_url = value | ||||
|                     if key == 'website' : | ||||
|                         landmark.website_url = value | ||||
|                     if key == 'place_of_worship' : | ||||
|                         landmark.is_place_of_worship = True | ||||
|                     if key == 'wikipedia' : | ||||
|                         landmark.wiki_url = value | ||||
|                     if key == 'name:en' : | ||||
|                         landmark.name_en = value | ||||
|                     if 'building:' in key or 'pay' in key : | ||||
|                         landmark.n_tags -= 1 | ||||
|  | ||||
|                     # Set the duration. | ||||
|                     if value in ['museum', 'aquarium', 'planetarium'] : | ||||
|                         landmark.duration = 60 | ||||
|                     elif value == 'viewpoint' : | ||||
|                         landmark.is_viewpoint = True | ||||
|                         landmark.duration = 10 | ||||
|                     elif value == 'cathedral' : | ||||
|                         landmark.is_place_of_worship = False | ||||
|                         landmark.duration = 10 | ||||
|  | ||||
|                 else: | ||||
|                     self.set_landmark_score(landmark, landmarktype, preference_level) | ||||
|                     landmarks.append(landmark) | ||||
|             id, coords, name = get_base_info(elem, osm_type, with_name=True) | ||||
|  | ||||
|             if name is None or coords is None : | ||||
|                 continue | ||||
|  | ||||
|             tags = elem.get('tags') | ||||
|  | ||||
|             # Convert this to Landmark object | ||||
|             landmark = Landmark(name=name, | ||||
|                                 type=landmarktype, | ||||
|                                 location=coords, | ||||
|                                 osm_id=id, | ||||
|                                 osm_type=osm_type, | ||||
|                                 attractiveness=0, | ||||
|                                 n_tags=len(tags)) | ||||
|  | ||||
|             # self.logger.debug('added landmark.') | ||||
|  | ||||
|             # Browse through tags to add information to landmark. | ||||
|             for key, value in tags.items(): | ||||
|  | ||||
|                 # Skip this landmark if not suitable. | ||||
|                 if key == 'building:part' and value == 'yes' : | ||||
|                     break | ||||
|                 if 'disused:' in key : | ||||
|                     break | ||||
|                 if 'boundary:' in key : | ||||
|                     break | ||||
|                 if 'shop' in key and landmarktype != 'shopping' : | ||||
|                     break | ||||
|                 # if value == 'apartments' : | ||||
|                 #     break | ||||
|  | ||||
|                 # Fill in the other attributes. | ||||
|                 if key == 'image' : | ||||
|                     landmark.image_url = value | ||||
|                 if key == 'website' : | ||||
|                     landmark.website_url = value | ||||
|                 if key == 'place_of_worship' : | ||||
|                     landmark.is_place_of_worship = True | ||||
|                 if key == 'wikipedia' : | ||||
|                     landmark.wiki_url = value | ||||
|                 if key == 'name:en' : | ||||
|                     landmark.name_en = value | ||||
|                 if 'building:' in key or 'pay' in key : | ||||
|                     landmark.n_tags -= 1 | ||||
|  | ||||
|                 # Set the duration. | ||||
|                 if value in ['museum', 'aquarium', 'planetarium'] : | ||||
|                     landmark.duration = 60 | ||||
|                 elif value == 'viewpoint' : | ||||
|                     landmark.is_viewpoint = True | ||||
|                     landmark.duration = 10 | ||||
|                 elif value == 'cathedral' : | ||||
|                     landmark.is_place_of_worship = False | ||||
|                     landmark.duration = 10 | ||||
|  | ||||
|             else: | ||||
|                 self.set_landmark_score(landmark, landmarktype, preference_level) | ||||
|                 landmarks.append(landmark) | ||||
|  | ||||
|             continue | ||||
|  | ||||
|         return landmarks | ||||
|  | ||||
| def dict_to_selector_list(d: dict) -> list: | ||||
|   | ||||
| @@ -1,10 +1,9 @@ | ||||
| """Module for finding public toilets around given coordinates.""" | ||||
| import logging | ||||
| import xml.etree.ElementTree as ET | ||||
|  | ||||
| from ..overpass.overpass import Overpass, get_base_info | ||||
| from ..structs.landmark import Toilets | ||||
| from ..constants import OSM_CACHE_DIR | ||||
| from .utils import create_bbox | ||||
|  | ||||
|  | ||||
| # silence the overpass logger | ||||
| @@ -41,7 +40,7 @@ class ToiletsManager: | ||||
|         self.location = location | ||||
|  | ||||
|         # Setup the caching in the Overpass class. | ||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) | ||||
|         self.overpass = Overpass() | ||||
|  | ||||
|  | ||||
|     def generate_toilet_list(self) -> list[Toilets] : | ||||
| @@ -53,73 +52,71 @@ class ToiletsManager: | ||||
|         list[Toilets]: A list of `Toilets` objects containing detailed information  | ||||
|                        about the toilets found around the given coordinates. | ||||
|         """ | ||||
|         bbox = tuple((self.radius, self.location[0], self.location[1])) | ||||
|         bbox = create_bbox(self.location, self.radius) | ||||
|         osm_types = ['node', 'way', 'relation'] | ||||
|         toilets_list = [] | ||||
|  | ||||
|         query = self.overpass.build_query( | ||||
|                 area = bbox, | ||||
|                 osm_types = osm_types, | ||||
|                 selector = '"amenity"="toilets"', | ||||
|                 out = 'ids center tags' | ||||
|                 ) | ||||
|         self.logger.debug(f"Query: {query}") | ||||
|  | ||||
|         query = Overpass.build_query( | ||||
|             bbox = bbox, | ||||
|             osm_types = osm_types, | ||||
|             selector = '"amenity"="toilets"', | ||||
|             out = 'ids center tags' | ||||
|             ) | ||||
|         try: | ||||
|             result = self.overpass.send_query(query) | ||||
|             result = self.overpass.fetch_data_from_api(query_str=query) | ||||
|         except Exception as e: | ||||
|             self.logger.error(f"Error fetching landmarks: {e}") | ||||
|             return None | ||||
|  | ||||
|         toilets_list = self.xml_to_toilets(result) | ||||
|         toilets_list = self.to_toilets(result) | ||||
|  | ||||
|         return toilets_list | ||||
|  | ||||
|  | ||||
|     def xml_to_toilets(self, root: ET.Element) -> list[Toilets]: | ||||
|     def to_toilets(self, elements: list) -> list[Toilets]: | ||||
|         """ | ||||
|         Parse the Overpass API result and extract landmarks. | ||||
|  | ||||
|         This method processes the XML root element returned by the Overpass API and  | ||||
|         This method processes the JSON elements returned by the Overpass API and  | ||||
|         extracts landmarks of types 'node', 'way', and 'relation'. It retrieves  | ||||
|         relevant information such as name, coordinates, and tags, and converts them  | ||||
|         into Landmark objects. | ||||
|  | ||||
|         Args: | ||||
|         root (ET.Element): The root element of the XML response from Overpass API. | ||||
|         list (osm elements): The root element of the JSON response from Overpass API. | ||||
|         elem_type (str): The type of landmark (e.g., node, way, relation). | ||||
|  | ||||
|         Returns: | ||||
|         list[Landmark]: A list of Landmark objects extracted from the XML data. | ||||
|         list[Landmark]: A list of Landmark objects extracted from the JSON data. | ||||
|         """ | ||||
|         if root is None : | ||||
|         if elements is None : | ||||
|             return [] | ||||
|  | ||||
|         toilets_list = [] | ||||
|         for osm_type in ['node', 'way', 'relation'] : | ||||
|             for elem in root.findall(osm_type): | ||||
|                 # Get coordinates and append them to the points list | ||||
|                 _, coords = get_base_info(elem, osm_type) | ||||
|                 if coords is None : | ||||
|                     continue | ||||
|         for elem in elements: | ||||
|             osm_type = elem.get('type') | ||||
|             # Get coordinates and append them to the points list | ||||
|             _, coords = get_base_info(elem, osm_type) | ||||
|             if coords is None : | ||||
|                 continue | ||||
|  | ||||
|                 toilets = Toilets(location=coords) | ||||
|             toilets = Toilets(location=coords) | ||||
|  | ||||
|                 # Extract tags as a dictionary | ||||
|                 tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')} | ||||
|             # Extract tags as a dictionary | ||||
|             tags = elem.get('tags') | ||||
|  | ||||
|                 if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes': | ||||
|                     toilets.wheelchair = True | ||||
|             if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes': | ||||
|                 toilets.wheelchair = True | ||||
|  | ||||
|                 if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes': | ||||
|                     toilets.changing_table = True | ||||
|             if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes': | ||||
|                 toilets.changing_table = True | ||||
|  | ||||
|                 if 'fee' in tags.keys() and tags['fee'] == 'yes': | ||||
|                     toilets.fee = True | ||||
|             if 'fee' in tags.keys() and tags['fee'] == 'yes': | ||||
|                 toilets.fee = True | ||||
|  | ||||
|                 if 'opening_hours' in tags.keys() : | ||||
|                     toilets.opening_hours = tags['opening_hours'] | ||||
|             if 'opening_hours' in tags.keys() : | ||||
|                 toilets.opening_hours = tags['opening_hours'] | ||||
|  | ||||
|                 toilets_list.append(toilets) | ||||
|             toilets_list.append(toilets) | ||||
|  | ||||
|         return toilets_list | ||||
|   | ||||
							
								
								
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | ||||
| """Various helper functions""" | ||||
| import math as m | ||||
|  | ||||
| def create_bbox(coords: tuple[float, float], radius: int): | ||||
|     """ | ||||
|     Create a bounding box around the given coordinates. | ||||
|  | ||||
|     Args: | ||||
|         coords (tuple[float, float]): The latitude and longitude of the center of the bounding box. | ||||
|         radius (int): The half-side length of the bounding box in meters. | ||||
|  | ||||
|     Returns: | ||||
|         tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude | ||||
|                                             defining the bounding box. | ||||
|     """ | ||||
|     # Earth's radius in meters | ||||
|     R = 6378137 | ||||
|     lat, lon = coords | ||||
|     d_lat = radius / R | ||||
|     d_lon = radius / (R * m.cos(m.pi * lat / 180)) | ||||
|  | ||||
|     lat_min = lat - d_lat * 180 / m.pi | ||||
|     lat_max = lat + d_lat * 180 / m.pi | ||||
|     lon_min = lon - d_lon * 180 / m.pi | ||||
|     lon_max = lon + d_lon * 180 / m.pi | ||||
|  | ||||
|     return (lat_min, lon_min, lat_max, lon_max) | ||||
		Reference in New Issue
	
	Block a user