amazing cache #55
| @@ -28,7 +28,7 @@ jobs: | |||||||
|       working-directory: backend |       working-directory: backend | ||||||
|  |  | ||||||
|     - name: Run Tests |     - name: Run Tests | ||||||
|       run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=INFO |       run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=DEBUG | ||||||
|       working-directory: backend |       working-directory: backend | ||||||
|  |  | ||||||
|     - name: Upload HTML report |     - name: Upload HTML report | ||||||
|   | |||||||
| @@ -445,7 +445,9 @@ disable=raw-checker-failed, | |||||||
|         logging-fstring-interpolation, |         logging-fstring-interpolation, | ||||||
|         duplicate-code, |         duplicate-code, | ||||||
|         relative-beyond-top-level,  |         relative-beyond-top-level,  | ||||||
|         invalid-name |         invalid-name, | ||||||
|  |         too-many-arguments, | ||||||
|  |         too-many-positional-arguments | ||||||
|  |  | ||||||
| # Enable the message, report, category or checker with the given id(s). You can | # Enable the message, report, category or checker with the given id(s). You can | ||||||
| # either give multiple identifier separated by comma (,) or put this option | # either give multiple identifier separated by comma (,) or put this option | ||||||
|   | |||||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							| @@ -2,6 +2,7 @@ | |||||||
|  |  | ||||||
| import os | import os | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
|  | from typing import List, Literal, Tuple | ||||||
|  |  | ||||||
|  |  | ||||||
| LOCATION_PREFIX = Path('src') | LOCATION_PREFIX = Path('src') | ||||||
| @@ -14,6 +15,8 @@ OPTIMIZER_PARAMETERS_PATH = PARAMETERS_DIR / 'optimizer_parameters.yaml' | |||||||
| cache_dir_string = os.getenv('OSM_CACHE_DIR', './cache') | cache_dir_string = os.getenv('OSM_CACHE_DIR', './cache') | ||||||
| OSM_CACHE_DIR = Path(cache_dir_string) | OSM_CACHE_DIR = Path(cache_dir_string) | ||||||
|  |  | ||||||
|  | OSM_TYPES = List[Literal['way', 'node', 'relation']] | ||||||
|  | BBOX = Tuple[float, float, float, float] | ||||||
|  |  | ||||||
| MEMCACHED_HOST_PATH = os.getenv('MEMCACHED_HOST_PATH', None) | MEMCACHED_HOST_PATH = os.getenv('MEMCACHED_HOST_PATH', None) | ||||||
| if MEMCACHED_HOST_PATH == "none": | if MEMCACHED_HOST_PATH == "none": | ||||||
|   | |||||||
| @@ -3,7 +3,7 @@ | |||||||
| import logging | import logging | ||||||
| import time | import time | ||||||
| from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||||
| from fastapi import FastAPI, HTTPException, Query | from fastapi import FastAPI, HTTPException, BackgroundTasks, Query | ||||||
|  |  | ||||||
| from .logging_config import configure_logging | from .logging_config import configure_logging | ||||||
| from .structs.landmark import Landmark, Toilets | from .structs.landmark import Landmark, Toilets | ||||||
| @@ -14,8 +14,10 @@ from .utils.landmarks_manager import LandmarkManager | |||||||
| from .utils.toilets_manager import ToiletsManager | from .utils.toilets_manager import ToiletsManager | ||||||
| from .optimization.optimizer import Optimizer | from .optimization.optimizer import Optimizer | ||||||
| from .optimization.refiner import Refiner | from .optimization.refiner import Refiner | ||||||
|  | from .overpass.overpass import fill_cache | ||||||
| from .cache import client as cache_client | from .cache import client as cache_client | ||||||
|  |  | ||||||
|  |  | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
| manager = LandmarkManager() | manager = LandmarkManager() | ||||||
| @@ -35,11 +37,11 @@ async def lifespan(app: FastAPI): | |||||||
| app = FastAPI(lifespan=lifespan) | app = FastAPI(lifespan=lifespan) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @app.post("/trip/new") | @app.post("/trip/new") | ||||||
| def new_trip(preferences: Preferences, | def new_trip(preferences: Preferences, | ||||||
|              start: tuple[float, float], |              start: tuple[float, float], | ||||||
|              end: tuple[float, float] | None = None) -> Trip: |              end: tuple[float, float] | None = None,  | ||||||
|  |              background_tasks: BackgroundTasks = None) -> Trip: | ||||||
|     """ |     """ | ||||||
|     Main function to call the optimizer. |     Main function to call the optimizer. | ||||||
|  |  | ||||||
| @@ -91,6 +93,9 @@ def new_trip(preferences: Preferences, | |||||||
|         preferences = preferences |         preferences = preferences | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |     if len(landmarks) == 0 : | ||||||
|  |         raise HTTPException(status_code=500, detail="No landmarks were found.") | ||||||
|  |  | ||||||
|     # insert start and finish to the landmarks list |     # insert start and finish to the landmarks list | ||||||
|     landmarks_short.insert(0, start_landmark) |     landmarks_short.insert(0, start_landmark) | ||||||
|     landmarks_short.append(end_landmark) |     landmarks_short.append(end_landmark) | ||||||
| @@ -114,6 +119,9 @@ def new_trip(preferences: Preferences, | |||||||
|         refined_tour = refiner.refine_optimization(landmarks, base_tour, |         refined_tour = refiner.refine_optimization(landmarks, base_tour, | ||||||
|                                                preferences.max_time_minute, |                                                preferences.max_time_minute, | ||||||
|                                                preferences.detour_tolerance_minute) |                                                preferences.detour_tolerance_minute) | ||||||
|  |     except TimeoutError as te : | ||||||
|  |         logger.error(f'Refiner failed : {str(te)} Using base tour.') | ||||||
|  |         refined_tour = base_tour | ||||||
|     except Exception as exc : |     except Exception as exc : | ||||||
|         raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc |         raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc | ||||||
|  |  | ||||||
| @@ -127,6 +135,9 @@ def new_trip(preferences: Preferences, | |||||||
|     # upon creation of the trip, persistence of both the trip and its landmarks is ensured. |     # upon creation of the trip, persistence of both the trip and its landmarks is ensured. | ||||||
|     trip = Trip.from_linked_landmarks(linked_tour, cache_client) |     trip = Trip.from_linked_landmarks(linked_tour, cache_client) | ||||||
|     logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.') |     logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.') | ||||||
|  |  | ||||||
|  |     background_tasks.add_task(fill_cache) | ||||||
|  |  | ||||||
|     return trip |     return trip | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
| @@ -55,6 +55,9 @@ class Optimizer: | |||||||
|             self.average_walking_speed = parameters['average_walking_speed'] |             self.average_walking_speed = parameters['average_walking_speed'] | ||||||
|             self.max_landmarks = parameters['max_landmarks'] |             self.max_landmarks = parameters['max_landmarks'] | ||||||
|             self.overshoot = parameters['overshoot'] |             self.overshoot = parameters['overshoot'] | ||||||
|  |             self.time_limit = parameters['time_limit'] | ||||||
|  |             self.gap_rel = parameters['gap_rel'] | ||||||
|  |             self.max_iter = parameters['max_iter'] | ||||||
|  |  | ||||||
|  |  | ||||||
|     def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int): |     def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int): | ||||||
| @@ -490,7 +493,18 @@ class Optimizer: | |||||||
|  |  | ||||||
|  |  | ||||||
|     def warm_start(self, x: list[pl.LpVariable], L: int) : |     def warm_start(self, x: list[pl.LpVariable], L: int) : | ||||||
|  |         """ | ||||||
|  |         This function sets the initial values of the decision variables to a feasible solution. | ||||||
|  |         This can help the solver start with a feasible or heuristic solution, | ||||||
|  |         potentially speeding up convergence. | ||||||
|  |  | ||||||
|  |         Args: | ||||||
|  |         x (list[pl.LpVariable]): A list of PuLP decision variables (binary variables). | ||||||
|  |         L (int): The size parameter, representing a dimension (likely related to a grid or matrix). | ||||||
|  |  | ||||||
|  |         Returns: | ||||||
|  |         list[pl.LpVariable]: The modified list of PuLP decision variables with initial values set. | ||||||
|  |         """ | ||||||
|         for i in range(L*L) : |         for i in range(L*L) : | ||||||
|             x[i].setInitialValue(0) |             x[i].setInitialValue(0) | ||||||
|  |  | ||||||
| @@ -573,7 +587,10 @@ class Optimizer: | |||||||
|         prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks) |         prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks) | ||||||
|  |  | ||||||
|         # Solve the problem and extract results. |         # Solve the problem and extract results. | ||||||
|         prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1, timeLimit=10, warmStart=False)) |         try : | ||||||
|  |             prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit+1, gapRel=self.gap_rel)) | ||||||
|  |         except Exception as exc : | ||||||
|  |             raise Exception(f"No solution found: {exc}") from exc | ||||||
|         status = pl.LpStatus[prob.status] |         status = pl.LpStatus[prob.status] | ||||||
|         solution = [pl.value(var) for var in x]  # The values of the decision variables (will be 0 or 1) |         solution = [pl.value(var) for var in x]  # The values of the decision variables (will be 0 or 1) | ||||||
|  |  | ||||||
| @@ -588,18 +605,21 @@ class Optimizer: | |||||||
|         circles = self.is_connected(solution) |         circles = self.is_connected(solution) | ||||||
|  |  | ||||||
|         i = 0 |         i = 0 | ||||||
|         timeout = 40 |  | ||||||
|         while circles is not None : |         while circles is not None : | ||||||
|             i += 1 |             i += 1 | ||||||
|             if i == timeout : |             if i == self.max_iter : | ||||||
|                 self.logger.error(f'Timeout: No solution found after {timeout} iterations.') |                 self.logger.error(f'Timeout: No solution found after {self.max_iter} iterations.') | ||||||
|                 raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.") |                 raise TimeoutError(f"Optimization took too long. No solution found after {self.max_iter} iterations.") | ||||||
|  |  | ||||||
|             for circle in circles : |             for circle in circles : | ||||||
|                 self.prevent_circle(prob, x, circle, L) |                 self.prevent_circle(prob, x, circle, L) | ||||||
|  |  | ||||||
|             # Solve the problem again |             # Solve the problem again | ||||||
|             prob.solve(pl.PULP_CBC_CMD(msg=False)) |             try : | ||||||
|  |                 prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit, gapRel=self.gap_rel)) | ||||||
|  |             except Exception as exc : | ||||||
|  |                 raise Exception(f"No solution found: {exc}") from exc | ||||||
|  |  | ||||||
|             solution = [pl.value(var) for var in x] |             solution = [pl.value(var) for var in x] | ||||||
|  |  | ||||||
|             if pl.LpStatus[prob.status] != 'Optimal' : |             if pl.LpStatus[prob.status] != 'Optimal' : | ||||||
| @@ -614,5 +634,5 @@ class Optimizer: | |||||||
|         order = self.get_order(solution) |         order = self.get_order(solution) | ||||||
|         tour =  [landmarks[i] for i in order] |         tour =  [landmarks[i] for i in order] | ||||||
|  |  | ||||||
|         self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}") |         self.logger.info(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}") | ||||||
|         return tour |         return tour | ||||||
|   | |||||||
| @@ -1,9 +1,8 @@ | |||||||
| """Module defining the caching strategy for overpass requests.""" |  | ||||||
| import os | import os | ||||||
| import xml.etree.ElementTree as ET | import json | ||||||
| import hashlib | import hashlib | ||||||
|  |  | ||||||
| from ..constants import OSM_CACHE_DIR | from ..constants import OSM_CACHE_DIR, OSM_TYPES | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_cache_key(query: str) -> str: | def get_cache_key(query: str) -> str: | ||||||
| @@ -17,10 +16,6 @@ def get_cache_key(query: str) -> str: | |||||||
| class CachingStrategyBase: | class CachingStrategyBase: | ||||||
|     """ |     """ | ||||||
|     Base class for implementing caching strategies. |     Base class for implementing caching strategies. | ||||||
|  |  | ||||||
|     This class defines the structure for a caching strategy with basic methods |  | ||||||
|     that must be implemented by subclasses. Subclasses should define how to |  | ||||||
|     retrieve, store, and close the cache. |  | ||||||
|     """ |     """ | ||||||
|     def get(self, key): |     def get(self, key): | ||||||
|         """Retrieve the cached data associated with the provided key.""" |         """Retrieve the cached data associated with the provided key.""" | ||||||
| @@ -30,111 +25,108 @@ class CachingStrategyBase: | |||||||
|         """Store data in the cache with the specified key.""" |         """Store data in the cache with the specified key.""" | ||||||
|         raise NotImplementedError('Subclass should implement set') |         raise NotImplementedError('Subclass should implement set') | ||||||
|  |  | ||||||
|  |     def set_hollow(self, key, **kwargs): | ||||||
|  |         """Create a hollow (empty) cache entry with a specific key.""" | ||||||
|  |         raise NotImplementedError('Subclass should implement set_hollow') | ||||||
|  |  | ||||||
|     def close(self): |     def close(self): | ||||||
|         """Clean up or close any resources used by the caching strategy.""" |         """Clean up or close any resources used by the caching strategy.""" | ||||||
|  |  | ||||||
|  |  | ||||||
| class XMLCache(CachingStrategyBase): | class JSONCache(CachingStrategyBase): | ||||||
|     """ |     """ | ||||||
|     A caching strategy that stores and retrieves data in XML format. |     A caching strategy that stores and retrieves data in JSON format. | ||||||
|  |  | ||||||
|     This class provides methods to cache data as XML files in a specified directory. |  | ||||||
|     The directory is automatically suffixed with '_XML' to distinguish it from other |  | ||||||
|     caching strategies. The data is stored and retrieved using XML serialization. |  | ||||||
|  |  | ||||||
|     Args: |  | ||||||
|         cache_dir (str): The base directory where XML cache files will be stored. |  | ||||||
|                          Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix. |  | ||||||
|  |  | ||||||
|     Methods: |  | ||||||
|         get(key): Retrieve cached data from a XML file associated with the given key. |  | ||||||
|         set(key, value): Store data in a XML file with the specified key. |  | ||||||
|     """ |     """ | ||||||
|     def __init__(self, cache_dir=OSM_CACHE_DIR): |     def __init__(self, cache_dir=OSM_CACHE_DIR): | ||||||
|         # Add the class name as a suffix to the directory |         # Add the class name as a suffix to the directory | ||||||
|         self._cache_dir = f'{cache_dir}_XML' |         self._cache_dir = f'{cache_dir}' | ||||||
|         if not os.path.exists(self._cache_dir): |         if not os.path.exists(self._cache_dir): | ||||||
|             os.makedirs(self._cache_dir) |             os.makedirs(self._cache_dir) | ||||||
|  |  | ||||||
|     def _filename(self, key): |     def _filename(self, key): | ||||||
|         return os.path.join(self._cache_dir, f'{key}.xml') |         return os.path.join(self._cache_dir, f'{key}.json') | ||||||
|  |  | ||||||
|     def get(self, key): |     def get(self, key): | ||||||
|         """Retrieve XML data from the cache and parse it as an ElementTree.""" |         """Retrieve JSON data from the cache and parse it as an ElementTree.""" | ||||||
|         filename = self._filename(key) |         filename = self._filename(key) | ||||||
|         if os.path.exists(filename): |         if os.path.exists(filename): | ||||||
|             try: |             try: | ||||||
|                 # Parse and return the cached XML data |                 # Open and parse the cached JSON data | ||||||
|                 tree = ET.parse(filename) |                 with open(filename, 'r', encoding='utf-8') as file: | ||||||
|                 return tree.getroot()  # Return the root element of the parsed XML |                     data = json.load(file) | ||||||
|             except ET.ParseError: |                 # Return the data as a list of dicts. | ||||||
|                 # print(f"Error parsing cached XML file: {filename}") |                 return data | ||||||
|                 return None |             except json.JSONDecodeError: | ||||||
|  |                 return None  # Return None if parsing fails | ||||||
|         return None |         return None | ||||||
|  |  | ||||||
|     def set(self, key, value): |     def set(self, key, value): | ||||||
|         """Save the XML data as an ElementTree to the cache.""" |         """Save the JSON data as an ElementTree to the cache.""" | ||||||
|         filename = self._filename(key) |         filename = self._filename(key) | ||||||
|         tree = ET.ElementTree(value)  # value is expected to be an ElementTree root element |  | ||||||
|         try: |         try: | ||||||
|             # Write the XML data to a file |             # Write the JSON data to the cache file | ||||||
|             with open(filename, 'wb') as file: |             with open(filename, 'w', encoding='utf-8') as file: | ||||||
|                 tree.write(file, encoding='utf-8', xml_declaration=True) |                 json.dump(value, file, ensure_ascii=False, indent=4) | ||||||
|         except IOError as e: |         except IOError as e: | ||||||
|             raise IOError(f"Error writing to cache file: {filename} - {e}") from e |             raise IOError(f"Error writing to cache file: {filename} - {e}") from e | ||||||
|  |  | ||||||
|  |     def set_hollow(self, key, cell: tuple, osm_types: list, | ||||||
|  |                     selector: str, conditions: list=None, out='center'): | ||||||
|  |         """Create an empty placeholder cache entry for a future fill.""" | ||||||
|  |         hollow_key = f'hollow_{key}' | ||||||
|  |         filename = self._filename(hollow_key) | ||||||
|  |  | ||||||
|  |         # Create the hollow JSON structure | ||||||
|  |         hollow_data = { | ||||||
|  |             "key": key, | ||||||
|  |             "cell": list(cell), | ||||||
|  |             "osm_types": list(osm_types), | ||||||
|  |             "selector": selector, | ||||||
|  |             "conditions": conditions, | ||||||
|  |             "out": out | ||||||
|  |         } | ||||||
|  |         # Write the hollow data to the cache file | ||||||
|  |         try: | ||||||
|  |             with open(filename, 'w', encoding='utf-8') as file: | ||||||
|  |                 json.dump(hollow_data, file, ensure_ascii=False, indent=4) | ||||||
|  |         except IOError as e: | ||||||
|  |             raise IOError(f"Error writing hollow cache to file: {filename} - {e}") from e | ||||||
|  |  | ||||||
|  |     def close(self): | ||||||
|  |         """Cleanup method, if needed.""" | ||||||
|  |         pass | ||||||
|  |  | ||||||
| class CachingStrategy: | class CachingStrategy: | ||||||
|     """ |     """ | ||||||
|     A class to manage different caching strategies. |     A class to manage different caching strategies. | ||||||
|  |  | ||||||
|     This class provides an interface to switch between different caching strategies  |  | ||||||
|     (e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats,  |  | ||||||
|     depending on the strategy being used. By default, it uses the XMLCache strategy. |  | ||||||
|  |  | ||||||
|     Attributes: |  | ||||||
|     __strategy (CachingStrategyBase): The currently active caching strategy. |  | ||||||
|     __strategies (dict): A mapping between strategy names (as strings) and their corresponding |  | ||||||
|                          classes, allowing dynamic selection of caching strategies. |  | ||||||
|     """ |     """ | ||||||
|     __strategy = XMLCache()  # Default caching strategy |     __strategy = JSONCache()  # Default caching strategy | ||||||
|     __strategies = { |     __strategies = { | ||||||
|         'XML': XMLCache, |         'JSON': JSONCache, | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     @classmethod |     @classmethod | ||||||
|     def use(cls, strategy_name='XML', **kwargs): |     def use(cls, strategy_name='JSON', **kwargs): | ||||||
|         """ |  | ||||||
|         Set the caching strategy based on the strategy_name provided. |  | ||||||
|  |  | ||||||
|         Args: |  | ||||||
|             strategy_name (str): The name of the caching strategy (e.g., 'XML'). |  | ||||||
|             **kwargs: Additional keyword arguments to pass when initializing the strategy. |  | ||||||
|         """ |  | ||||||
|         # If a previous strategy exists, close it |  | ||||||
|         if cls.__strategy: |         if cls.__strategy: | ||||||
|             cls.__strategy.close() |             cls.__strategy.close() | ||||||
|  |  | ||||||
|         # Retrieve the strategy class based on the strategy name |  | ||||||
|         strategy_class = cls.__strategies.get(strategy_name) |         strategy_class = cls.__strategies.get(strategy_name) | ||||||
|  |  | ||||||
|         if not strategy_class: |         if not strategy_class: | ||||||
|             raise ValueError(f"Unknown caching strategy: {strategy_name}") |             raise ValueError(f"Unknown caching strategy: {strategy_name}") | ||||||
|  |  | ||||||
|         # Instantiate the new strategy with the provided arguments |  | ||||||
|         cls.__strategy = strategy_class(**kwargs) |         cls.__strategy = strategy_class(**kwargs) | ||||||
|         return cls.__strategy |         return cls.__strategy | ||||||
|  |  | ||||||
|     @classmethod |     @classmethod | ||||||
|     def get(cls, key): |     def get(cls, key): | ||||||
|         """Get data from the current strategy's cache.""" |  | ||||||
|         if not cls.__strategy: |  | ||||||
|             raise RuntimeError("Caching strategy has not been set.") |  | ||||||
|         return cls.__strategy.get(key) |         return cls.__strategy.get(key) | ||||||
|  |  | ||||||
|     @classmethod |     @classmethod | ||||||
|     def set(cls, key, value): |     def set(cls, key, value): | ||||||
|         """Set data in the current strategy's cache.""" |  | ||||||
|         if not cls.__strategy: |  | ||||||
|             raise RuntimeError("Caching strategy has not been set.") |  | ||||||
|         cls.__strategy.set(key, value) |         cls.__strategy.set(key, value) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def set_hollow(cls, key, cell: tuple, osm_types: OSM_TYPES, | ||||||
|  |                     selector: str, conditions: list=None, out='center'): | ||||||
|  |         """Create a hollow cache entry.""" | ||||||
|  |         cls.__strategy.set_hollow(key, cell, osm_types, selector, conditions, out) | ||||||
|   | |||||||
| @@ -1,14 +1,17 @@ | |||||||
| """Module allowing connexion to overpass api and fectch data from OSM.""" | """Module allowing connexion to overpass api and fectch data from OSM.""" | ||||||
| from typing import Literal, List | import os | ||||||
| import urllib | import urllib | ||||||
|  | import math | ||||||
| import logging | import logging | ||||||
| import xml.etree.ElementTree as ET | import json | ||||||
|  | from typing import List, Tuple | ||||||
|  |  | ||||||
| from .caching_strategy import get_cache_key, CachingStrategy | from .caching_strategy import get_cache_key, CachingStrategy | ||||||
| from ..constants import OSM_CACHE_DIR | from ..constants import OSM_CACHE_DIR, OSM_TYPES, BBOX | ||||||
|  |  | ||||||
| logger = logging.getLogger('Overpass') |  | ||||||
| osm_types = List[Literal['way', 'node', 'relation']] | RESOLUTION = 0.05 | ||||||
|  | CELL = Tuple[int, int] | ||||||
|  |  | ||||||
|  |  | ||||||
| class Overpass : | class Overpass : | ||||||
| @@ -16,7 +19,10 @@ class Overpass : | |||||||
|     Overpass class to manage the query building and sending to overpass api. |     Overpass class to manage the query building and sending to overpass api. | ||||||
|     The caching strategy is a part of this class and initialized upon creation of the Overpass object. |     The caching strategy is a part of this class and initialized upon creation of the Overpass object. | ||||||
|     """ |     """ | ||||||
|     def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) : |     logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     def __init__(self, caching_strategy: str = 'JSON', cache_dir: str = OSM_CACHE_DIR) : | ||||||
|         """ |         """ | ||||||
|         Initialize the Overpass instance with the url, headers and caching strategy. |         Initialize the Overpass instance with the url, headers and caching strategy. | ||||||
|         """ |         """ | ||||||
| @@ -25,17 +31,109 @@ class Overpass : | |||||||
|         self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir) |         self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir) | ||||||
|  |  | ||||||
|  |  | ||||||
|     @classmethod |     def send_query(self, bbox: BBOX, osm_types: OSM_TYPES, | ||||||
|     def build_query(self, area: tuple, osm_types: osm_types, |                     selector: str, conditions: list=None, out='center') -> List[dict]: | ||||||
|                     selector: str, conditions=[], out='center') -> str: |         """ | ||||||
|  |         Sends the Overpass QL query to the Overpass API and returns the parsed json response. | ||||||
|  |  | ||||||
|  |         Args: | ||||||
|  |             bbox (tuple): Bounding box for the query. | ||||||
|  |             osm_types (list[str]): List of OSM element types (e.g., 'node', 'way'). | ||||||
|  |             selector (str): Key or tag to filter OSM elements (e.g., 'highway'). | ||||||
|  |             conditions (list): Optional list of additional filter conditions in Overpass QL format. | ||||||
|  |             out (str): Output format ('center', 'body', etc.). Defaults to 'center'. | ||||||
|  |  | ||||||
|  |         Returns: | ||||||
|  |             list:   Parsed json response from the Overpass API, or cached data if available. | ||||||
|  |         """ | ||||||
|  |         # Determine which grid cells overlap with this bounding box. | ||||||
|  |         overlapping_cells = Overpass._get_overlapping_cells(bbox) | ||||||
|  |  | ||||||
|  |         # Retrieve cached data and identify missing cache entries | ||||||
|  |         cached_responses, non_cached_cells = self._retrieve_cached_data(overlapping_cells, osm_types, selector, conditions, out) | ||||||
|  |  | ||||||
|  |         self.logger.info(f'Cache hit for {len(overlapping_cells)-len(non_cached_cells)}/{len(overlapping_cells)} quadrants.') | ||||||
|  |  | ||||||
|  |         # If there is no missing data, return the cached responses after filtering. | ||||||
|  |         if not non_cached_cells : | ||||||
|  |             return Overpass._filter_landmarks(cached_responses, bbox) | ||||||
|  |  | ||||||
|  |         # If there is no cached data, fetch all from Overpass. | ||||||
|  |         elif not cached_responses : | ||||||
|  |             query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out) | ||||||
|  |             return self.fetch_data_from_api(query_str) | ||||||
|  |  | ||||||
|  |         # Hybrid cache: some data from Overpass, some data from cache. | ||||||
|  |         else : | ||||||
|  |             # Resize the bbox for smaller search area and build new query string. | ||||||
|  |             non_cached_bbox = Overpass._get_non_cached_bbox(non_cached_cells, bbox) | ||||||
|  |             query_str = Overpass.build_query(non_cached_bbox, osm_types, selector, conditions, out) | ||||||
|  |             non_cached_responses = self.fetch_data_from_api(query_str) | ||||||
|  |             return Overpass._filter_landmarks(cached_responses, bbox) + non_cached_responses | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     def fetch_data_from_api(self, query_str: str) -> List[dict]: | ||||||
|  |         """ | ||||||
|  |         Fetch data from the Overpass API and return the json data. | ||||||
|  |  | ||||||
|  |         Args: | ||||||
|  |             query_str (str): The Overpass query string. | ||||||
|  |  | ||||||
|  |         Returns: | ||||||
|  |             dict: Combined cached and fetched data. | ||||||
|  |         """ | ||||||
|  |         try: | ||||||
|  |             data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') | ||||||
|  |             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) | ||||||
|  |  | ||||||
|  |             with urllib.request.urlopen(request) as response: | ||||||
|  |                 response_data = response.read().decode('utf-8')  # Convert the HTTPResponse to a string | ||||||
|  |                 data = json.loads(response_data)  # Load the JSON from the string | ||||||
|  |                 elements = data.get('elements', []) | ||||||
|  |                 # self.logger.debug(f'Query = {query_str}') | ||||||
|  |                 return elements | ||||||
|  |  | ||||||
|  |         except urllib.error.URLError as e: | ||||||
|  |             self.logger.error(f"Error connecting to Overpass API: {e}") | ||||||
|  |             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||||
|  |         except Exception as exc : | ||||||
|  |             raise Exception(f'An unexpected error occured: {str(exc)}') from exc | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     def fill_cache(self, json_data: dict) : | ||||||
|  |         """ | ||||||
|  |         Fill cache with data by using a hollow cache entry's information. | ||||||
|  |         """ | ||||||
|  |         query_str, cache_key = Overpass._build_query_from_hollow(json_data) | ||||||
|  |         try: | ||||||
|  |             data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') | ||||||
|  |             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) | ||||||
|  |  | ||||||
|  |             with urllib.request.urlopen(request) as response: | ||||||
|  |  | ||||||
|  |                 # Convert the HTTPResponse to a string and load data | ||||||
|  |                 response_data = response.read().decode('utf-8')   | ||||||
|  |                 data = json.loads(response_data) | ||||||
|  |  | ||||||
|  |                 # Get elements and set cache | ||||||
|  |                 elements = data.get('elements', []) | ||||||
|  |                 self.caching_strategy.set(cache_key, elements) | ||||||
|  |                 self.logger.debug(f'Cache set for {cache_key}') | ||||||
|  |         except urllib.error.URLError as e: | ||||||
|  |             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e | ||||||
|  |         except Exception as exc : | ||||||
|  |             raise Exception(f'An unexpected error occured: {str(exc)}') from exc | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def build_query(bbox: BBOX, osm_types: OSM_TYPES, | ||||||
|  |                     selector: str, conditions: list=None, out='center') -> str: | ||||||
|         """ |         """ | ||||||
|         Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data. |         Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data. | ||||||
|  |  | ||||||
|         Args: |         Args: | ||||||
|             area (tuple): A tuple representing the geographical search area, typically in the format  |             bbox (tuple): A tuple representing the geographical search area, typically in the format  | ||||||
|                         (radius, latitude, longitude). The first element is a string like "around:2000"  |                         (lat_min, lon_min, lat_max, lon_max). | ||||||
|                         specifying the search radius, and the second and third elements represent  |  | ||||||
|                         the latitude and longitude as floats or strings. |  | ||||||
|             osm_types (list[str]): A list of OSM element types to search for. Must be one or more of  |             osm_types (list[str]): A list of OSM element types to search for. Must be one or more of  | ||||||
|                                     'Way', 'Node', or 'Relation'. |                                     'Way', 'Node', or 'Relation'. | ||||||
|             selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.). |             selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.). | ||||||
| @@ -52,82 +150,203 @@ class Overpass : | |||||||
|         Notes: |         Notes: | ||||||
|             - If no conditions are provided, the query will just use the `selector` to filter the OSM  |             - If no conditions are provided, the query will just use the `selector` to filter the OSM  | ||||||
|             elements without additional constraints. |             elements without additional constraints. | ||||||
|             - The search area must always formatted as "(radius, lat, lon)". |  | ||||||
|         """ |         """ | ||||||
|         if not isinstance(conditions, list) : |         query = '[out:json];(' | ||||||
|             conditions = [conditions] |  | ||||||
|         if not isinstance(osm_types, list) : |  | ||||||
|             osm_types = [osm_types] |  | ||||||
|  |  | ||||||
|         query = '(' |         # convert the bbox to string. | ||||||
|  |         bbox_str = f"({','.join(map(str, bbox))})" | ||||||
|  |  | ||||||
|         # Round the radius to nearest 50 and coordinates to generate less queries |         if conditions is not None and len(conditions) > 0: | ||||||
|         if area[0] > 500 : |  | ||||||
|             search_radius = round(area[0] / 50) * 50 |  | ||||||
|             loc = tuple((round(area[1], 2), round(area[2], 2))) |  | ||||||
|         else : |  | ||||||
|             search_radius = round(area[0] / 25) * 25 |  | ||||||
|             loc = tuple((round(area[1], 3), round(area[2], 3))) |  | ||||||
|  |  | ||||||
|         search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})" |  | ||||||
|  |  | ||||||
|         if conditions : |  | ||||||
|             conditions = '(if: ' + ' && '.join(conditions) + ')' |             conditions = '(if: ' + ' && '.join(conditions) + ')' | ||||||
|         else : |         else : | ||||||
|             conditions = '' |             conditions = '' | ||||||
|  |  | ||||||
|         for elem in osm_types : |         for elem in osm_types : | ||||||
|             query += elem + '[' + selector + ']' + conditions + search_area + ';' |             query += elem + '[' + selector + ']' + conditions + bbox_str + ';' | ||||||
|  |  | ||||||
|         query += ');' + f'out {out};' |         query += ');' + f'out {out};' | ||||||
|  |  | ||||||
|         return query |         return query | ||||||
|  |  | ||||||
|  |  | ||||||
|     def send_query(self, query: str) -> ET: |     def _retrieve_cached_data(self, overlapping_cells: CELL, osm_types: OSM_TYPES, | ||||||
|  |                               selector: str, conditions: list, out: str) -> Tuple[List[dict], list[CELL]]: | ||||||
|         """ |         """ | ||||||
|         Sends the Overpass QL query to the Overpass API and returns the parsed JSON response. |         Retrieve cached data and identify missing cache quadrants. | ||||||
|  |  | ||||||
|         Args: |         Args: | ||||||
|             query (str): The Overpass QL query to be sent to the Overpass API. |             overlapping_cells (list): Cells to check for cached data. | ||||||
|  |             osm_types (list): OSM types (e.g., 'node', 'way'). | ||||||
|  |             selector (str): Key or tag to filter OSM elements. | ||||||
|  |             conditions (list): Additional conditions to apply. | ||||||
|  |             out (str): Output format. | ||||||
|  |  | ||||||
|         Returns: |         Returns: | ||||||
|             dict: The parsed JSON response from the Overpass API, or None if the request fails. |             tuple: A tuple containing: | ||||||
|  |                 - cached_responses (list): List of cached data found. | ||||||
|  |                 - non_cached_cells (list(tuple)): List of cells with missing data. | ||||||
|  |         """ | ||||||
|  |         cell_key_dict = {} | ||||||
|  |         for cell in overlapping_cells : | ||||||
|  |             for elem in osm_types : | ||||||
|  |                 key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})" | ||||||
|  |  | ||||||
|  |             cell_key_dict[cell] = get_cache_key(key_str) | ||||||
|  |  | ||||||
|  |         cached_responses = [] | ||||||
|  |         non_cached_cells = [] | ||||||
|  |  | ||||||
|  |         # Retrieve the cached data and mark the missing entries as hollow | ||||||
|  |         for cell, key in cell_key_dict.items(): | ||||||
|  |             cached_data = self.caching_strategy.get(key) | ||||||
|  |             if cached_data is not None : | ||||||
|  |                 cached_responses += cached_data | ||||||
|  |             else: | ||||||
|  |                 self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out) | ||||||
|  |                 non_cached_cells.append(cell) | ||||||
|  |  | ||||||
|  |         return cached_responses, non_cached_cells | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _build_query_from_hollow(json_data: dict) -> Tuple[str, str]: | ||||||
|  |         """ | ||||||
|  |         Build query string using information from a hollow cache entry. | ||||||
|  |         """ | ||||||
|  |         # Extract values from the JSON object | ||||||
|  |         key = json_data.get('key') | ||||||
|  |         cell = tuple(json_data.get('cell')) | ||||||
|  |         bbox = Overpass._get_bbox_from_grid_cell(cell) | ||||||
|  |         osm_types = json_data.get('osm_types') | ||||||
|  |         selector = json_data.get('selector') | ||||||
|  |         conditions = json_data.get('conditions') | ||||||
|  |         out = json_data.get('out') | ||||||
|  |  | ||||||
|  |  | ||||||
|  |         query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out) | ||||||
|  |         return query_str, key | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _get_overlapping_cells(query_bbox: tuple) -> List[CELL]: | ||||||
|  |         """ | ||||||
|  |         Returns a set of all grid cells that overlap with the given bounding box. | ||||||
|  |         """ | ||||||
|  |         # Extract location from the query bbox | ||||||
|  |         lat_min, lon_min, lat_max, lon_max = query_bbox | ||||||
|  |  | ||||||
|  |         min_lat_cell, min_lon_cell = Overpass._get_grid_cell(lat_min, lon_min) | ||||||
|  |         max_lat_cell, max_lon_cell = Overpass._get_grid_cell(lat_max, lon_max) | ||||||
|  |  | ||||||
|  |         overlapping_cells = set() | ||||||
|  |         for lat_idx in range(min_lat_cell, max_lat_cell + 1): | ||||||
|  |             for lon_idx in range(min_lon_cell, max_lon_cell + 1): | ||||||
|  |                 overlapping_cells.add((lat_idx, lon_idx)) | ||||||
|  |  | ||||||
|  |         return overlapping_cells | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _get_grid_cell(lat: float, lon: float) -> CELL: | ||||||
|  |         """ | ||||||
|  |         Returns the grid cell coordinates for a given latitude and longitude. | ||||||
|  |         Each grid cell is 0.05°lat x 0.05°lon resolution in size. | ||||||
|  |         """ | ||||||
|  |         lat_index = math.floor(lat / RESOLUTION) | ||||||
|  |         lon_index = math.floor(lon / RESOLUTION) | ||||||
|  |         return (lat_index, lon_index) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _get_bbox_from_grid_cell(cell: CELL) -> BBOX: | ||||||
|  |         """ | ||||||
|  |         Returns the bounding box for a given grid cell index. | ||||||
|  |         Each grid cell is resolution x resolution in size. | ||||||
|  |  | ||||||
|  |         The bounding box is returned as (min_lat, min_lon, max_lat, max_lon). | ||||||
|  |         """ | ||||||
|  |         # Calculate the southwest (min_lat, min_lon) corner of the bounding box | ||||||
|  |         min_lat = round(cell[0] * RESOLUTION, 2) | ||||||
|  |         min_lon = round(cell[1] * RESOLUTION, 2) | ||||||
|  |  | ||||||
|  |         # Calculate the northeast (max_lat, max_lon) corner of the bounding box | ||||||
|  |         max_lat = round((cell[0] + 1) * RESOLUTION, 2) | ||||||
|  |         max_lon = round((cell[1] + 1) * RESOLUTION, 2) | ||||||
|  |  | ||||||
|  |         return (min_lat, min_lon, max_lat, max_lon) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _get_non_cached_bbox(non_cached_cells: List[CELL], original_bbox: BBOX): | ||||||
|  |         """ | ||||||
|  |         Calculate the non-cached bounding box by excluding cached cells. | ||||||
|  |  | ||||||
|  |         Args: | ||||||
|  |             non_cached_cells (list): The list of cells that were not found in the cache. | ||||||
|  |             original_bbox (tuple): The original bounding box (min_lat, min_lon, max_lat, max_lon). | ||||||
|  |  | ||||||
|  |         Returns: | ||||||
|  |             tuple: The new bounding box that excludes cached cells, or None if all cells are cached. | ||||||
|  |         """ | ||||||
|  |         if not non_cached_cells: | ||||||
|  |             return None  # All cells were cached | ||||||
|  |  | ||||||
|  |         # Initialize the non-cached bounding box with extreme values | ||||||
|  |         min_lat, min_lon, max_lat, max_lon = float('inf'), float('inf'), float('-inf'), float('-inf') | ||||||
|  |  | ||||||
|  |         # Iterate over non-cached cells to find the new bounding box | ||||||
|  |         for cell in non_cached_cells: | ||||||
|  |             cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon = Overpass._get_bbox_from_grid_cell(cell) | ||||||
|  |  | ||||||
|  |             min_lat = min(min_lat, cell_min_lat) | ||||||
|  |             min_lon = min(min_lon, cell_min_lon) | ||||||
|  |             max_lat = max(max_lat, cell_max_lat) | ||||||
|  |             max_lon = max(max_lon, cell_max_lon) | ||||||
|  |  | ||||||
|  |         # If no update to bounding box, return the original | ||||||
|  |         if min_lat == float('inf') or min_lon == float('inf'): | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |         return (max(min_lat, original_bbox[0]),  | ||||||
|  |                 max(min_lon, original_bbox[1]),  | ||||||
|  |                 min(max_lat, original_bbox[2]),  | ||||||
|  |                 min(max_lon, original_bbox[3])) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     @staticmethod | ||||||
|  |     def _filter_landmarks(elements: List[dict], bbox: BBOX) -> List[dict]: | ||||||
|  |         """ | ||||||
|  |         Filters elements based on whether their coordinates are inside the given bbox. | ||||||
|  |  | ||||||
|  |         Args: | ||||||
|  |         - elements (list of dict): List of elements containing coordinates. | ||||||
|  |         - bbox (tuple): A bounding box defined as (min_lat, min_lon, max_lat, max_lon). | ||||||
|  |  | ||||||
|  |         Returns: | ||||||
|  |         - list: A list of elements whose coordinates are inside the bounding box. | ||||||
|         """ |         """ | ||||||
|  |  | ||||||
|         # Generate a cache key for the current query |         filtered_elements = [] | ||||||
|         cache_key = get_cache_key(query) |         min_lat, min_lon, max_lat, max_lon = bbox | ||||||
|  |  | ||||||
|         # Try to fetch the result from the cache |         for elem in elements: | ||||||
|         cached_response = self.caching_strategy.get(cache_key) |             # Extract coordinates based on the 'type' of element | ||||||
|         if cached_response is not None : |             if elem.get('type') != 'node': | ||||||
|             logger.debug("Cache hit.") |                 center = elem.get('center', {}) | ||||||
|             return cached_response |                 lat = float(center.get('lat', 0)) | ||||||
|  |                 lon = float(center.get('lon', 0)) | ||||||
|  |             else: | ||||||
|  |                 lat = float(elem.get('lat', 0)) | ||||||
|  |                 lon = float(elem.get('lon', 0)) | ||||||
|  |  | ||||||
|         # Prepare the data to be sent as POST request, encoded as bytes |             # Check if the coordinates fall within the given bounding box | ||||||
|         data = urllib.parse.urlencode({'data': query}).encode('utf-8') |             if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon: | ||||||
|  |                 filtered_elements.append(elem) | ||||||
|  |  | ||||||
|         try: |         return filtered_elements | ||||||
|             # Create a Request object with the specified URL, data, and headers |  | ||||||
|             request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) |  | ||||||
|  |  | ||||||
|             # Send the request and read the response |  | ||||||
|             with urllib.request.urlopen(request) as response: |  | ||||||
|                 # Read and decode the response |  | ||||||
|                 response_data = response.read().decode('utf-8') |  | ||||||
|                 root = ET.fromstring(response_data) |  | ||||||
|  |  | ||||||
|                 # Cache the response data as an ElementTree root |  | ||||||
|                 self.caching_strategy.set(cache_key, root) |  | ||||||
|                 logger.debug("Response data added to cache.") |  | ||||||
|  |  | ||||||
|                 return root |  | ||||||
|  |  | ||||||
|         except urllib.error.URLError as e: |  | ||||||
|             raise ConnectionError(f"Error connecting to Overpass API: {e}") from e |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | def get_base_info(elem: dict, osm_type: OSM_TYPES, with_name=False) : | ||||||
|     """ |     """ | ||||||
|     Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element. |     Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element. | ||||||
|  |  | ||||||
| @@ -136,7 +355,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | |||||||
|     extracting coordinates either directly or from a center tag, depending on the element type. |     extracting coordinates either directly or from a center tag, depending on the element type. | ||||||
|  |  | ||||||
|     Args: |     Args: | ||||||
|         elem (ET.Element): The XML element representing the OSM entity. |         elem (dict): The JSON element representing the OSM entity. | ||||||
|         osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates |         osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates | ||||||
|                         are extracted directly from the element; otherwise, from the 'center' tag. |                         are extracted directly from the element; otherwise, from the 'center' tag. | ||||||
|         with_name (bool): Whether to extract and return the name of the element. If True, it attempts |         with_name (bool): Whether to extract and return the name of the element. If True, it attempts | ||||||
| @@ -150,7 +369,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | |||||||
|     """ |     """ | ||||||
|     # 1. extract coordinates |     # 1. extract coordinates | ||||||
|     if osm_type != 'node' : |     if osm_type != 'node' : | ||||||
|         center = elem.find('center') |         center = elem.get('center') | ||||||
|         lat = float(center.get('lat')) |         lat = float(center.get('lat')) | ||||||
|         lon = float(center.get('lon')) |         lon = float(center.get('lon')) | ||||||
|  |  | ||||||
| @@ -165,7 +384,31 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) : | |||||||
|  |  | ||||||
|     # 3. Extract name if specified and return |     # 3. Extract name if specified and return | ||||||
|     if with_name : |     if with_name : | ||||||
|         name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None |         name = elem.get('tags', {}).get('name') | ||||||
|         return osm_id, coords, name |         return osm_id, coords, name | ||||||
|     else : |     else : | ||||||
|         return osm_id, coords |         return osm_id, coords | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def fill_cache(): | ||||||
|  |     """ | ||||||
|  |     Scans the specified cache directory for files starting with 'hollow_' and attempts to load | ||||||
|  |     their contents as JSON to fill the cache of the Overpass system. | ||||||
|  |     """ | ||||||
|  |     overpass = Overpass() | ||||||
|  |  | ||||||
|  |     with os.scandir(OSM_CACHE_DIR) as it: | ||||||
|  |         for entry in it: | ||||||
|  |             if entry.is_file() and entry.name.startswith('hollow_'): | ||||||
|  |  | ||||||
|  |                 try : | ||||||
|  |                     # Read the whole file content as a string | ||||||
|  |                     with open(entry.path, 'r') as f: | ||||||
|  |                         # load data and fill the cache with the query and key | ||||||
|  |                         json_data = json.load(f) | ||||||
|  |                         overpass.fill_cache(json_data) | ||||||
|  |                     # Now delete the file as the cache is filled | ||||||
|  |                     os.remove(entry.path) | ||||||
|  |  | ||||||
|  |                 except Exception as exc : | ||||||
|  |                     overpass.logger.error(f'An error occured while parsing file {entry.path} as .json file') | ||||||
|   | |||||||
| @@ -1,4 +1,4 @@ | |||||||
| city_bbox_side: 7500 #m | max_bbox_side: 4000   #m | ||||||
| radius_close_to: 50 | radius_close_to: 50 | ||||||
| church_coeff: 0.55 | church_coeff: 0.55 | ||||||
| nature_coeff: 1.4 | nature_coeff: 1.4 | ||||||
| @@ -8,5 +8,5 @@ image_bonus: 1.1 | |||||||
| viewpoint_bonus: 5 | viewpoint_bonus: 5 | ||||||
| wikipedia_bonus: 1.25 | wikipedia_bonus: 1.25 | ||||||
| name_bonus: 3 | name_bonus: 3 | ||||||
| N_important: 40 | N_important: 60 | ||||||
| pay_bonus: -1 | pay_bonus: -1 | ||||||
|   | |||||||
| @@ -4,3 +4,6 @@ average_walking_speed: 4.8 | |||||||
| max_landmarks: 10 | max_landmarks: 10 | ||||||
| max_landmarks_refiner: 20 | max_landmarks_refiner: 20 | ||||||
| overshoot: 0.0016 | overshoot: 0.0016 | ||||||
|  | time_limit: 1 | ||||||
|  | gap_rel: 0.05 | ||||||
|  | max_iter: 40 | ||||||
| @@ -2,7 +2,7 @@ | |||||||
|  |  | ||||||
| from typing import Optional, Literal | from typing import Optional, Literal | ||||||
| from uuid import uuid4, UUID | from uuid import uuid4, UUID | ||||||
| from pydantic import BaseModel, Field | from pydantic import BaseModel, ConfigDict, Field | ||||||
|  |  | ||||||
|  |  | ||||||
| # Output to frontend | # Output to frontend | ||||||
| @@ -144,8 +144,4 @@ class Toilets(BaseModel) : | |||||||
|         """ |         """ | ||||||
|         return f'Toilets @{self.location}' |         return f'Toilets @{self.location}' | ||||||
|  |  | ||||||
|     class Config: |     model_config = ConfigDict(from_attributes=True) | ||||||
|         """ |  | ||||||
|         This allows us to easily convert the model to and from dictionaries |  | ||||||
|         """ |  | ||||||
|         from_attributes = True |  | ||||||
|   | |||||||
| @@ -27,11 +27,13 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name | |||||||
|         "/trip/new", |         "/trip/new", | ||||||
|         json={ |         json={ | ||||||
|             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, |             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, | ||||||
|             "nature": {"type": "nature", "score": 5}, |             "nature": {"type": "nature", "score": 0}, | ||||||
|             "shopping": {"type": "shopping", "score": 5}, |             "shopping": {"type": "shopping", "score": 0}, | ||||||
|             "max_time_minute": duration_minutes, |             "max_time_minute": duration_minutes, | ||||||
|             "detour_tolerance_minute": 0}, |             "detour_tolerance_minute": 0}, | ||||||
|             "start": [48.084588, 7.280405] |             # "start": [48.084588, 7.280405] | ||||||
|  |             # "start": [45.74445023349939, 4.8222687890538865] | ||||||
|  |             "start": [45.75156398104873, 4.827154464827647] | ||||||
|             } |             } | ||||||
|         ) |         ) | ||||||
|     result = response.json() |     result = response.json() | ||||||
| @@ -51,11 +53,11 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name | |||||||
|     assert response.status_code == 200  # check for successful planning |     assert response.status_code == 200  # check for successful planning | ||||||
|     assert isinstance(landmarks, list)  # check that the return type is a list |     assert isinstance(landmarks, list)  # check that the return type is a list | ||||||
|     assert len(landmarks) > 2           # check that there is something to visit |     assert len(landmarks) > 2           # check that there is something to visit | ||||||
|  |     assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" | ||||||
|     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" |     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" | ||||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" |     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||||
|     # assert 2!= 3 |     # assert 2!= 3 | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name | def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. |     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. | ||||||
| @@ -97,10 +99,9 @@ def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" |     assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" | ||||||
|     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" |     assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_cologne(client, request) :   # pylint: disable=redefined-outer-name | def test_cologne(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. |     Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area. | ||||||
|      |      | ||||||
|     Args: |     Args: | ||||||
|         client: |         client: | ||||||
| @@ -141,7 +142,7 @@ def test_cologne(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|  |  | ||||||
| def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name | def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. |     Test n°4 : Custom test in Strasbourg to ensure proper decision making in crowded area. | ||||||
|      |      | ||||||
|     Args: |     Args: | ||||||
|         client: |         client: | ||||||
| @@ -182,7 +183,7 @@ def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|  |  | ||||||
| def test_zurich(client, request) :   # pylint: disable=redefined-outer-name | def test_zurich(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. |     Test n°5 : Custom test in Zurich to ensure proper decision making in crowded area. | ||||||
|      |      | ||||||
|     Args: |     Args: | ||||||
|         client: |         client: | ||||||
| @@ -223,24 +224,24 @@ def test_zurich(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|  |  | ||||||
| def test_paris(client, request) :   # pylint: disable=redefined-outer-name | def test_paris(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area. |     Test n°6 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area. | ||||||
|      |      | ||||||
|     Args: |     Args: | ||||||
|         client: |         client: | ||||||
|         request: |         request: | ||||||
|     """ |     """ | ||||||
|     start_time = time.time()  # Start timer |     start_time = time.time()  # Start timer | ||||||
|     duration_minutes = 300 |     duration_minutes = 200 | ||||||
|  |  | ||||||
|     response = client.post( |     response = client.post( | ||||||
|         "/trip/new", |         "/trip/new", | ||||||
|         json={ |         json={ | ||||||
|             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, |             "preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, | ||||||
|                             "nature": {"type": "nature", "score": 5}, |                             "nature": {"type": "nature", "score": 0}, | ||||||
|                             "shopping": {"type": "shopping", "score": 5}, |                             "shopping": {"type": "shopping", "score": 5}, | ||||||
|                             "max_time_minute": duration_minutes, |                             "max_time_minute": duration_minutes, | ||||||
|                             "detour_tolerance_minute": 0}, |                             "detour_tolerance_minute": 0}, | ||||||
|             "start": [48.86248803298562, 2.346451131285925] |             "start": [48.85468881798671, 2.3423925755998374] | ||||||
|             } |             } | ||||||
|         ) |         ) | ||||||
|     result = response.json() |     result = response.json() | ||||||
| @@ -264,7 +265,7 @@ def test_paris(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|  |  | ||||||
| def test_new_york(client, request) :   # pylint: disable=redefined-outer-name | def test_new_york(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area. |     Test n°7 : Custom test in New York to ensure proper decision making in crowded area. | ||||||
|      |      | ||||||
|     Args: |     Args: | ||||||
|         client: |         client: | ||||||
| @@ -305,7 +306,7 @@ def test_new_york(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|  |  | ||||||
| def test_shopping(client, request) :   # pylint: disable=redefined-outer-name | def test_shopping(client, request) :   # pylint: disable=redefined-outer-name | ||||||
|     """ |     """ | ||||||
|     Test n°3 : Custom test in Lyon centre to ensure shopping clusters are found. |     Test n°8 : Custom test in Lyon centre to ensure shopping clusters are found. | ||||||
|      |      | ||||||
|     Args: |     Args: | ||||||
|         client: |         client: | ||||||
| @@ -334,8 +335,8 @@ def test_shopping(client, request) :   # pylint: disable=redefined-outer-name | |||||||
|     # Add details to report |     # Add details to report | ||||||
|     log_trip_details(request, landmarks, result['total_time'], duration_minutes) |     log_trip_details(request, landmarks, result['total_time'], duration_minutes) | ||||||
|  |  | ||||||
|     for elem in landmarks : |     # for elem in landmarks : | ||||||
|         print(elem) |     #     print(elem) | ||||||
|  |  | ||||||
|     # checks : |     # checks : | ||||||
|     assert response.status_code == 200  # check for successful planning |     assert response.status_code == 200  # check for successful planning | ||||||
|   | |||||||
| @@ -9,7 +9,8 @@ from pydantic import BaseModel | |||||||
| from ..overpass.overpass import Overpass, get_base_info | from ..overpass.overpass import Overpass, get_base_info | ||||||
| from ..structs.landmark import Landmark | from ..structs.landmark import Landmark | ||||||
| from .get_time_distance import get_distance | from .get_time_distance import get_distance | ||||||
| from ..constants import OSM_CACHE_DIR | from .utils import create_bbox | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| # silence the overpass logger | # silence the overpass logger | ||||||
| @@ -79,8 +80,7 @@ class ClusterManager: | |||||||
|             bbox: The bounding box coordinates (around:radius, center_lat, center_lon). |             bbox: The bounding box coordinates (around:radius, center_lat, center_lon). | ||||||
|         """ |         """ | ||||||
|         # Setup the caching in the Overpass class. |         # Setup the caching in the Overpass class. | ||||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) |         self.overpass = Overpass() | ||||||
|  |  | ||||||
|  |  | ||||||
|         self.cluster_type = cluster_type |         self.cluster_type = cluster_type | ||||||
|         if cluster_type == 'shopping' : |         if cluster_type == 'shopping' : | ||||||
| @@ -95,27 +95,24 @@ class ClusterManager: | |||||||
|             raise NotImplementedError("Please choose only an available option for cluster detection") |             raise NotImplementedError("Please choose only an available option for cluster detection") | ||||||
|  |  | ||||||
|         # Initialize the points for cluster detection |         # Initialize the points for cluster detection | ||||||
|         query = self.overpass.build_query( |         try: | ||||||
|             area = bbox, |             result = self.overpass.send_query( | ||||||
|  |             bbox = bbox, | ||||||
|             osm_types = osm_types, |             osm_types = osm_types, | ||||||
|             selector = sel, |             selector = sel, | ||||||
|             out = out |             out = out | ||||||
|         ) |         ) | ||||||
|         self.logger.debug(f"Cluster query: {query}") |  | ||||||
|  |  | ||||||
|         try: |  | ||||||
|             result = self.overpass.send_query(query) |  | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             self.logger.error(f"Error fetching landmarks: {e}") |             self.logger.error(f"Error fetching clusters: {e}") | ||||||
|  |  | ||||||
|         if result is None : |         if result is None : | ||||||
|             self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.") |             self.logger.debug(f"Found no {cluster_type} clusters, overpass query returned no datapoints.") | ||||||
|             self.valid = False |             self.valid = False | ||||||
|  |  | ||||||
|         else : |         else : | ||||||
|             points = [] |             points = [] | ||||||
|             for osm_type in osm_types : |             for elem in result: | ||||||
|                 for elem in result.findall(osm_type): |                 osm_type = elem.get('type') | ||||||
|                  |                  | ||||||
|                 # Get coordinates and append them to the points list |                 # Get coordinates and append them to the points list | ||||||
|                 _, coords = get_base_info(elem, osm_type) |                 _, coords = get_base_info(elem, osm_type) | ||||||
| @@ -137,7 +134,7 @@ class ClusterManager: | |||||||
|  |  | ||||||
|                 # Check that there are is least 1 cluster |                 # Check that there are is least 1 cluster | ||||||
|                 if len(set(labels)) > 1 : |                 if len(set(labels)) > 1 : | ||||||
|                     self.logger.debug(f"Found {len(set(labels))} different clusters.") |                     self.logger.info(f"Found {len(set(labels))} different {cluster_type} clusters.") | ||||||
|                     # Separate clustered points and noise points |                     # Separate clustered points and noise points | ||||||
|                     self.cluster_points = self.all_points[labels != -1] |                     self.cluster_points = self.all_points[labels != -1] | ||||||
|                     self.cluster_labels = labels[labels != -1] |                     self.cluster_labels = labels[labels != -1] | ||||||
| @@ -145,7 +142,7 @@ class ClusterManager: | |||||||
|                     self.valid = True |                     self.valid = True | ||||||
|  |  | ||||||
|                 else : |                 else : | ||||||
|                     self.logger.debug(f"Detected 0 {cluster_type} clusters.") |                     self.logger.info(f"Found 0 {cluster_type} clusters.") | ||||||
|                     self.valid = False |                     self.valid = False | ||||||
|  |  | ||||||
|             else : |             else : | ||||||
| @@ -218,8 +215,7 @@ class ClusterManager: | |||||||
|         """ |         """ | ||||||
|  |  | ||||||
|         # Define the bounding box for a given radius around the coordinates |         # Define the bounding box for a given radius around the coordinates | ||||||
|         lat, lon = cluster.centroid |         bbox = create_bbox(cluster.centroid, 1000) | ||||||
|         bbox = (1000, lat, lon) |  | ||||||
|          |          | ||||||
|         # Query neighborhoods and shopping malls |         # Query neighborhoods and shopping malls | ||||||
|         selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"'] |         selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"'] | ||||||
| @@ -238,25 +234,22 @@ class ClusterManager: | |||||||
|         osm_types = ['node', 'way', 'relation'] |         osm_types = ['node', 'way', 'relation'] | ||||||
|  |  | ||||||
|         for sel in selectors : |         for sel in selectors : | ||||||
|             query = self.overpass.build_query( |             try: | ||||||
|                 area = bbox, |                 result = self.overpass.send_query(bbox = bbox, | ||||||
|                                                   osm_types = osm_types, |                                                   osm_types = osm_types, | ||||||
|                                                   selector = sel, |                                                   selector = sel, | ||||||
|                                                   out = 'ids center' |                                                   out = 'ids center' | ||||||
|                                                   ) |                                                   ) | ||||||
|  |  | ||||||
|             try: |  | ||||||
|                 result = self.overpass.send_query(query) |  | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 self.logger.error(f"Error fetching landmarks: {e}") |                 self.logger.error(f"Error fetching clusters: {e}") | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
|             if result is None : |             if result is None : | ||||||
|                 self.logger.error(f"Error fetching landmarks: {e}") |                 self.logger.error(f"Error fetching clusters: {e}") | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
|             for osm_type in osm_types : |             for elem in result: | ||||||
|                 for elem in result.findall(osm_type): |                 osm_type = elem.get('type') | ||||||
|  |  | ||||||
|                 id, coords, name = get_base_info(elem, osm_type, with_name=True) |                 id, coords, name = get_base_info(elem, osm_type, with_name=True) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,19 +1,15 @@ | |||||||
| """Module used to import data from OSM and arrange them in categories.""" | """Module used to import data from OSM and arrange them in categories.""" | ||||||
| import logging | import logging | ||||||
| import xml.etree.ElementTree as ET |  | ||||||
| import yaml | import yaml | ||||||
|  |  | ||||||
|  |  | ||||||
| from ..structs.preferences import Preferences | from ..structs.preferences import Preferences | ||||||
| from ..structs.landmark import Landmark | from ..structs.landmark import Landmark | ||||||
| from .take_most_important import take_most_important | from .take_most_important import take_most_important | ||||||
| from .cluster_manager import ClusterManager | from .cluster_manager import ClusterManager | ||||||
| from ..overpass.overpass import Overpass, get_base_info | from ..overpass.overpass import Overpass, get_base_info | ||||||
|  | from .utils import create_bbox | ||||||
|  |  | ||||||
| from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR | from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH | ||||||
|  |  | ||||||
| # silence the overpass logger |  | ||||||
| logging.getLogger('Overpass').setLevel(level=logging.CRITICAL) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class LandmarkManager: | class LandmarkManager: | ||||||
| @@ -37,8 +33,7 @@ class LandmarkManager: | |||||||
|  |  | ||||||
|         with LANDMARK_PARAMETERS_PATH.open('r') as f: |         with LANDMARK_PARAMETERS_PATH.open('r') as f: | ||||||
|             parameters = yaml.safe_load(f) |             parameters = yaml.safe_load(f) | ||||||
|             self.max_bbox_side = parameters['city_bbox_side'] |             self.max_bbox_side = parameters['max_bbox_side'] | ||||||
|             self.radius_close_to = parameters['radius_close_to'] |  | ||||||
|             self.church_coeff = parameters['church_coeff'] |             self.church_coeff = parameters['church_coeff'] | ||||||
|             self.nature_coeff = parameters['nature_coeff'] |             self.nature_coeff = parameters['nature_coeff'] | ||||||
|             self.overall_coeff = parameters['overall_coeff'] |             self.overall_coeff = parameters['overall_coeff'] | ||||||
| @@ -56,7 +51,7 @@ class LandmarkManager: | |||||||
|             self.detour_factor = parameters['detour_factor'] |             self.detour_factor = parameters['detour_factor'] | ||||||
|  |  | ||||||
|         # Setup the caching in the Overpass class. |         # Setup the caching in the Overpass class. | ||||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) |         self.overpass = Overpass() | ||||||
|  |  | ||||||
|         self.logger.info('LandmakManager successfully initialized.') |         self.logger.info('LandmakManager successfully initialized.') | ||||||
|  |  | ||||||
| @@ -80,39 +75,39 @@ class LandmarkManager: | |||||||
|         """ |         """ | ||||||
|         self.logger.debug('Starting to fetch landmarks...') |         self.logger.debug('Starting to fetch landmarks...') | ||||||
|         max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor) |         max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor) | ||||||
|         reachable_bbox_side = min(max_walk_dist, self.max_bbox_side) |         radius = min(max_walk_dist, int(self.max_bbox_side/2)) | ||||||
|  |  | ||||||
|         # use set to avoid duplicates, this requires some __methods__ to be set in Landmark |         # use set to avoid duplicates, this requires some __methods__ to be set in Landmark | ||||||
|         all_landmarks = set() |         all_landmarks = set() | ||||||
|  |  | ||||||
|         # Create a bbox using the around technique, tuple of strings |         # Create a bbox using the around technique, tuple of strings | ||||||
|         bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1])) |         bbox = create_bbox(center_coordinates, radius) | ||||||
|  |  | ||||||
|         # list for sightseeing |         # list for sightseeing | ||||||
|         if preferences.sightseeing.score != 0: |         if preferences.sightseeing.score != 0: | ||||||
|             self.logger.debug('Fetching sightseeing landmarks...') |             self.logger.debug('Fetching sightseeing landmarks...') | ||||||
|             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score) |             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score) | ||||||
|             all_landmarks.update(current_landmarks) |             all_landmarks.update(current_landmarks) | ||||||
|             self.logger.debug('Fetching sightseeing clusters...') |             self.logger.info(f'Found {len(current_landmarks)} sightseeing landmarks') | ||||||
|  |  | ||||||
|             # special pipeline for historic neighborhoods |             # special pipeline for historic neighborhoods | ||||||
|             neighborhood_manager = ClusterManager(bbox, 'sightseeing') |             neighborhood_manager = ClusterManager(bbox, 'sightseeing') | ||||||
|             historic_clusters = neighborhood_manager.generate_clusters() |             historic_clusters = neighborhood_manager.generate_clusters() | ||||||
|             all_landmarks.update(historic_clusters) |             all_landmarks.update(historic_clusters) | ||||||
|             self.logger.debug('Sightseeing clusters done') |  | ||||||
|  |  | ||||||
|         # list for nature |         # list for nature | ||||||
|         if preferences.nature.score != 0: |         if preferences.nature.score != 0: | ||||||
|             self.logger.debug('Fetching nature landmarks...') |             self.logger.debug('Fetching nature landmarks...') | ||||||
|             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score) |             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score) | ||||||
|             all_landmarks.update(current_landmarks) |             all_landmarks.update(current_landmarks) | ||||||
|  |             self.logger.info(f'Found {len(current_landmarks)} nature landmarks') | ||||||
|  |  | ||||||
|  |  | ||||||
|         # list for shopping |         # list for shopping | ||||||
|         if preferences.shopping.score != 0: |         if preferences.shopping.score != 0: | ||||||
|             self.logger.debug('Fetching shopping landmarks...') |             self.logger.debug('Fetching shopping landmarks...') | ||||||
|             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score) |             current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score) | ||||||
|             self.logger.debug('Fetching shopping clusters...') |             self.logger.info(f'Found {len(current_landmarks)} shopping landmarks') | ||||||
|  |  | ||||||
|             # set time for all shopping activites : |             # set time for all shopping activites : | ||||||
|             for landmark in current_landmarks : |             for landmark in current_landmarks : | ||||||
| @@ -123,8 +118,6 @@ class LandmarkManager: | |||||||
|             shopping_manager = ClusterManager(bbox, 'shopping') |             shopping_manager = ClusterManager(bbox, 'shopping') | ||||||
|             shopping_clusters = shopping_manager.generate_clusters() |             shopping_clusters = shopping_manager.generate_clusters() | ||||||
|             all_landmarks.update(shopping_clusters) |             all_landmarks.update(shopping_clusters) | ||||||
|             self.logger.debug('Shopping clusters done') |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|         landmarks_constrained = take_most_important(all_landmarks, self.n_important) |         landmarks_constrained = take_most_important(all_landmarks, self.n_important) | ||||||
| @@ -179,7 +172,7 @@ class LandmarkManager: | |||||||
|         """ |         """ | ||||||
|         return_list = [] |         return_list = [] | ||||||
|  |  | ||||||
|         if landmarktype == 'nature' : query_conditions = [] |         if landmarktype == 'nature' : query_conditions = None | ||||||
|         else : query_conditions = ['count_tags()>5'] |         else : query_conditions = ['count_tags()>5'] | ||||||
|  |  | ||||||
|         # caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously |         # caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously | ||||||
| @@ -190,60 +183,58 @@ class LandmarkManager: | |||||||
|             osm_types = ['way', 'relation'] |             osm_types = ['way', 'relation'] | ||||||
|  |  | ||||||
|             if 'viewpoint' in sel : |             if 'viewpoint' in sel : | ||||||
|                 query_conditions = [] |                 query_conditions = None | ||||||
|                 osm_types.append('node') |                 osm_types.append('node') | ||||||
|  |  | ||||||
|             query = self.overpass.build_query( |             # Send the overpass query | ||||||
|                 area = bbox, |             try: | ||||||
|  |                 result = self.overpass.send_query( | ||||||
|  |                     bbox = bbox, | ||||||
|                     osm_types = osm_types, |                     osm_types = osm_types, | ||||||
|                     selector = sel, |                     selector = sel, | ||||||
|                     conditions = query_conditions,        # except for nature.... |                     conditions = query_conditions,        # except for nature.... | ||||||
|                 out = 'center' |                     out = 'ids center tags' | ||||||
|                     ) |                     ) | ||||||
|             self.logger.debug(f"Query: {query}") |  | ||||||
|  |  | ||||||
|             try: |  | ||||||
|                 result = self.overpass.send_query(query) |  | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 self.logger.error(f"Error fetching landmarks: {e}") |                 self.logger.error(f"Error fetching landmarks: {e}") | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
|             return_list += self.xml_to_landmarks(result, landmarktype, preference_level) |             return_list += self._to_landmarks(result, landmarktype, preference_level) | ||||||
|  |  | ||||||
|         self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}") |         self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}") | ||||||
|  |  | ||||||
|         return return_list |         return return_list | ||||||
|  |  | ||||||
|  |  | ||||||
|     def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]: |     def _to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]: | ||||||
|         """ |         """ | ||||||
|         Parse the Overpass API result and extract landmarks. |         Parse the Overpass API result and extract landmarks. | ||||||
|  |  | ||||||
|         This method processes the XML root element returned by the Overpass API and  |         This method processes the JSON elements returned by the Overpass API and  | ||||||
|         extracts landmarks of types 'node', 'way', and 'relation'. It retrieves  |         extracts landmarks of types 'node', 'way', and 'relation'. It retrieves  | ||||||
|         relevant information such as name, coordinates, and tags, and converts them  |         relevant information such as name, coordinates, and tags, and converts them  | ||||||
|         into Landmark objects. |         into Landmark objects. | ||||||
|  |  | ||||||
|         Args: |         Args: | ||||||
|         root (ET.Element): The root element of the XML response from Overpass API. |         elements (list): The elements of json response from Overpass API. | ||||||
|         elem_type (str): The type of landmark (e.g., node, way, relation). |         elem_type (str): The type of landmark (e.g., node, way, relation). | ||||||
|  |  | ||||||
|         Returns: |         Returns: | ||||||
|         list[Landmark]: A list of Landmark objects extracted from the XML data. |         list[Landmark]: A list of Landmark objects extracted from the JSON data. | ||||||
|         """ |         """ | ||||||
|         if root is None : |         if elements is None : | ||||||
|             return [] |             return [] | ||||||
|  |  | ||||||
|         landmarks = [] |         landmarks = [] | ||||||
|         for osm_type in ['node', 'way', 'relation'] : |         for elem in elements: | ||||||
|             for elem in root.findall(osm_type): |             osm_type = elem.get('type') | ||||||
|  |  | ||||||
|             id, coords, name = get_base_info(elem, osm_type, with_name=True) |             id, coords, name = get_base_info(elem, osm_type, with_name=True) | ||||||
|  |  | ||||||
|             if name is None or coords is None : |             if name is None or coords is None : | ||||||
|                 continue |                 continue | ||||||
|  |  | ||||||
|                 tags = elem.findall('tag') |             tags = elem.get('tags') | ||||||
|  |  | ||||||
|             # Convert this to Landmark object |             # Convert this to Landmark object | ||||||
|             landmark = Landmark(name=name, |             landmark = Landmark(name=name, | ||||||
| @@ -254,10 +245,10 @@ class LandmarkManager: | |||||||
|                                 attractiveness=0, |                                 attractiveness=0, | ||||||
|                                 n_tags=len(tags)) |                                 n_tags=len(tags)) | ||||||
|  |  | ||||||
|  |             # self.logger.debug('added landmark.') | ||||||
|  |  | ||||||
|             # Browse through tags to add information to landmark. |             # Browse through tags to add information to landmark. | ||||||
|                 for tag in tags: |             for key, value in tags.items(): | ||||||
|                     key = tag.get('k') |  | ||||||
|                     value = tag.get('v') |  | ||||||
|  |  | ||||||
|                 # Skip this landmark if not suitable. |                 # Skip this landmark if not suitable. | ||||||
|                 if key == 'building:part' and value == 'yes' : |                 if key == 'building:part' and value == 'yes' : | ||||||
|   | |||||||
| @@ -1,10 +1,9 @@ | |||||||
| """Module for finding public toilets around given coordinates.""" | """Module for finding public toilets around given coordinates.""" | ||||||
| import logging | import logging | ||||||
| import xml.etree.ElementTree as ET |  | ||||||
|  |  | ||||||
| from ..overpass.overpass import Overpass, get_base_info | from ..overpass.overpass import Overpass, get_base_info | ||||||
| from ..structs.landmark import Toilets | from ..structs.landmark import Toilets | ||||||
| from ..constants import OSM_CACHE_DIR | from .utils import create_bbox | ||||||
|  |  | ||||||
|  |  | ||||||
| # silence the overpass logger | # silence the overpass logger | ||||||
| @@ -41,7 +40,7 @@ class ToiletsManager: | |||||||
|         self.location = location |         self.location = location | ||||||
|  |  | ||||||
|         # Setup the caching in the Overpass class. |         # Setup the caching in the Overpass class. | ||||||
|         self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) |         self.overpass = Overpass() | ||||||
|  |  | ||||||
|  |  | ||||||
|     def generate_toilet_list(self) -> list[Toilets] : |     def generate_toilet_list(self) -> list[Toilets] : | ||||||
| @@ -53,51 +52,49 @@ class ToiletsManager: | |||||||
|         list[Toilets]: A list of `Toilets` objects containing detailed information  |         list[Toilets]: A list of `Toilets` objects containing detailed information  | ||||||
|                        about the toilets found around the given coordinates. |                        about the toilets found around the given coordinates. | ||||||
|         """ |         """ | ||||||
|         bbox = tuple((self.radius, self.location[0], self.location[1])) |         bbox = create_bbox(self.location, self.radius) | ||||||
|         osm_types = ['node', 'way', 'relation'] |         osm_types = ['node', 'way', 'relation'] | ||||||
|         toilets_list = [] |         toilets_list = [] | ||||||
|  |  | ||||||
|         query = self.overpass.build_query( |         query = Overpass.build_query( | ||||||
|                 area = bbox, |             bbox = bbox, | ||||||
|             osm_types = osm_types, |             osm_types = osm_types, | ||||||
|             selector = '"amenity"="toilets"', |             selector = '"amenity"="toilets"', | ||||||
|             out = 'ids center tags' |             out = 'ids center tags' | ||||||
|             ) |             ) | ||||||
|         self.logger.debug(f"Query: {query}") |  | ||||||
|  |  | ||||||
|         try: |         try: | ||||||
|             result = self.overpass.send_query(query) |             result = self.overpass.fetch_data_from_api(query_str=query) | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             self.logger.error(f"Error fetching landmarks: {e}") |             self.logger.error(f"Error fetching landmarks: {e}") | ||||||
|             return None |             return None | ||||||
|  |  | ||||||
|         toilets_list = self.xml_to_toilets(result) |         toilets_list = self.to_toilets(result) | ||||||
|  |  | ||||||
|         return toilets_list |         return toilets_list | ||||||
|  |  | ||||||
|  |  | ||||||
|     def xml_to_toilets(self, root: ET.Element) -> list[Toilets]: |     def to_toilets(self, elements: list) -> list[Toilets]: | ||||||
|         """ |         """ | ||||||
|         Parse the Overpass API result and extract landmarks. |         Parse the Overpass API result and extract landmarks. | ||||||
|  |  | ||||||
|         This method processes the XML root element returned by the Overpass API and  |         This method processes the JSON elements returned by the Overpass API and  | ||||||
|         extracts landmarks of types 'node', 'way', and 'relation'. It retrieves  |         extracts landmarks of types 'node', 'way', and 'relation'. It retrieves  | ||||||
|         relevant information such as name, coordinates, and tags, and converts them  |         relevant information such as name, coordinates, and tags, and converts them  | ||||||
|         into Landmark objects. |         into Landmark objects. | ||||||
|  |  | ||||||
|         Args: |         Args: | ||||||
|         root (ET.Element): The root element of the XML response from Overpass API. |         list (osm elements): The root element of the JSON response from Overpass API. | ||||||
|         elem_type (str): The type of landmark (e.g., node, way, relation). |         elem_type (str): The type of landmark (e.g., node, way, relation). | ||||||
|  |  | ||||||
|         Returns: |         Returns: | ||||||
|         list[Landmark]: A list of Landmark objects extracted from the XML data. |         list[Landmark]: A list of Landmark objects extracted from the JSON data. | ||||||
|         """ |         """ | ||||||
|         if root is None : |         if elements is None : | ||||||
|             return [] |             return [] | ||||||
|  |  | ||||||
|         toilets_list = [] |         toilets_list = [] | ||||||
|         for osm_type in ['node', 'way', 'relation'] : |         for elem in elements: | ||||||
|             for elem in root.findall(osm_type): |             osm_type = elem.get('type') | ||||||
|             # Get coordinates and append them to the points list |             # Get coordinates and append them to the points list | ||||||
|             _, coords = get_base_info(elem, osm_type) |             _, coords = get_base_info(elem, osm_type) | ||||||
|             if coords is None : |             if coords is None : | ||||||
| @@ -106,7 +103,7 @@ class ToiletsManager: | |||||||
|             toilets = Toilets(location=coords) |             toilets = Toilets(location=coords) | ||||||
|  |  | ||||||
|             # Extract tags as a dictionary |             # Extract tags as a dictionary | ||||||
|                 tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')} |             tags = elem.get('tags') | ||||||
|  |  | ||||||
|             if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes': |             if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes': | ||||||
|                 toilets.wheelchair = True |                 toilets.wheelchair = True | ||||||
|   | |||||||
							
								
								
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | |||||||
|  | """Various helper functions""" | ||||||
|  | import math as m | ||||||
|  |  | ||||||
|  | def create_bbox(coords: tuple[float, float], radius: int): | ||||||
|  |     """ | ||||||
|  |     Create a bounding box around the given coordinates. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         coords (tuple[float, float]): The latitude and longitude of the center of the bounding box. | ||||||
|  |         radius (int): The half-side length of the bounding box in meters. | ||||||
|  |  | ||||||
|  |     Returns: | ||||||
|  |         tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude | ||||||
|  |                                             defining the bounding box. | ||||||
|  |     """ | ||||||
|  |     # Earth's radius in meters | ||||||
|  |     R = 6378137 | ||||||
|  |     lat, lon = coords | ||||||
|  |     d_lat = radius / R | ||||||
|  |     d_lon = radius / (R * m.cos(m.pi * lat / 180)) | ||||||
|  |  | ||||||
|  |     lat_min = lat - d_lat * 180 / m.pi | ||||||
|  |     lat_max = lat + d_lat * 180 / m.pi | ||||||
|  |     lon_min = lon - d_lon * 180 / m.pi | ||||||
|  |     lon_max = lon + d_lon * 180 / m.pi | ||||||
|  |  | ||||||
|  |     return (lat_min, lon_min, lat_max, lon_max) | ||||||
		Reference in New Issue
	
	Block a user