Compare commits
	
		
			23 Commits
		
	
	
		
			431ae7c670
			...
			6f54522b8c
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 6f54522b8c | |||
| 080ecd28ae | |||
| 21706ea7e6 | |||
| 83c1533e78 | |||
| 1f4815c991 | |||
| 699737bc40 | |||
| 1240f86d6e | |||
| 2a5023df4b | |||
| 581644a108 | |||
| f48dcf80c2 | |||
| 757773f433 | |||
| 25c2b6b0d1 | |||
| b527318eec | |||
| f2943eb3ad | |||
| 2ac8499dfb | |||
| 4a904c3d3c | |||
| 978cae290b | |||
| bab6cfe74e | |||
| 71abeabbd2 | |||
| f64e60ddf6 | |||
| d6f723bee1 | |||
| a3243431e0 | |||
| 3605408ebb | 
@@ -28,7 +28,7 @@ jobs:
 | 
			
		||||
      working-directory: backend
 | 
			
		||||
 | 
			
		||||
    - name: Run Tests
 | 
			
		||||
      run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=INFO
 | 
			
		||||
      run: pipenv run pytest src --html=report.html --self-contained-html --log-cli-level=DEBUG
 | 
			
		||||
      working-directory: backend
 | 
			
		||||
 | 
			
		||||
    - name: Upload HTML report
 | 
			
		||||
 
 | 
			
		||||
@@ -445,7 +445,9 @@ disable=raw-checker-failed,
 | 
			
		||||
        logging-fstring-interpolation,
 | 
			
		||||
        duplicate-code,
 | 
			
		||||
        relative-beyond-top-level, 
 | 
			
		||||
        invalid-name
 | 
			
		||||
        invalid-name,
 | 
			
		||||
        too-many-arguments,
 | 
			
		||||
        too-many-positional-arguments
 | 
			
		||||
 | 
			
		||||
# Enable the message, report, category or checker with the given id(s). You can
 | 
			
		||||
# either give multiple identifier separated by comma (,) or put this option
 | 
			
		||||
 
 | 
			
		||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							@@ -2,6 +2,7 @@
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from typing import List, Literal, Tuple
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
LOCATION_PREFIX = Path('src')
 | 
			
		||||
@@ -14,6 +15,8 @@ OPTIMIZER_PARAMETERS_PATH = PARAMETERS_DIR / 'optimizer_parameters.yaml'
 | 
			
		||||
cache_dir_string = os.getenv('OSM_CACHE_DIR', './cache')
 | 
			
		||||
OSM_CACHE_DIR = Path(cache_dir_string)
 | 
			
		||||
 | 
			
		||||
OSM_TYPES = List[Literal['way', 'node', 'relation']]
 | 
			
		||||
BBOX = Tuple[float, float, float, float]
 | 
			
		||||
 | 
			
		||||
MEMCACHED_HOST_PATH = os.getenv('MEMCACHED_HOST_PATH', None)
 | 
			
		||||
if MEMCACHED_HOST_PATH == "none":
 | 
			
		||||
 
 | 
			
		||||
@@ -3,7 +3,7 @@
 | 
			
		||||
import logging
 | 
			
		||||
import time
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from fastapi import FastAPI, HTTPException, Query
 | 
			
		||||
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
 | 
			
		||||
 | 
			
		||||
from .logging_config import configure_logging
 | 
			
		||||
from .structs.landmark import Landmark, Toilets
 | 
			
		||||
@@ -14,8 +14,10 @@ from .utils.landmarks_manager import LandmarkManager
 | 
			
		||||
from .utils.toilets_manager import ToiletsManager
 | 
			
		||||
from .optimization.optimizer import Optimizer
 | 
			
		||||
from .optimization.refiner import Refiner
 | 
			
		||||
from .overpass.overpass import fill_cache
 | 
			
		||||
from .cache import client as cache_client
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
manager = LandmarkManager()
 | 
			
		||||
@@ -35,11 +37,11 @@ async def lifespan(app: FastAPI):
 | 
			
		||||
app = FastAPI(lifespan=lifespan)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.post("/trip/new")
 | 
			
		||||
def new_trip(preferences: Preferences,
 | 
			
		||||
             start: tuple[float, float],
 | 
			
		||||
             end: tuple[float, float] | None = None) -> Trip:
 | 
			
		||||
             end: tuple[float, float] | None = None, 
 | 
			
		||||
             background_tasks: BackgroundTasks = None) -> Trip:
 | 
			
		||||
    """
 | 
			
		||||
    Main function to call the optimizer.
 | 
			
		||||
 | 
			
		||||
@@ -91,6 +93,9 @@ def new_trip(preferences: Preferences,
 | 
			
		||||
        preferences = preferences
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if len(landmarks) == 0 :
 | 
			
		||||
        raise HTTPException(status_code=500, detail="No landmarks were found.")
 | 
			
		||||
 | 
			
		||||
    # insert start and finish to the landmarks list
 | 
			
		||||
    landmarks_short.insert(0, start_landmark)
 | 
			
		||||
    landmarks_short.append(end_landmark)
 | 
			
		||||
@@ -114,6 +119,9 @@ def new_trip(preferences: Preferences,
 | 
			
		||||
        refined_tour = refiner.refine_optimization(landmarks, base_tour,
 | 
			
		||||
                                               preferences.max_time_minute,
 | 
			
		||||
                                               preferences.detour_tolerance_minute)
 | 
			
		||||
    except TimeoutError as te :
 | 
			
		||||
        logger.error(f'Refiner failed : {str(te)} Using base tour.')
 | 
			
		||||
        refined_tour = base_tour
 | 
			
		||||
    except Exception as exc :
 | 
			
		||||
        raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc
 | 
			
		||||
 | 
			
		||||
@@ -127,6 +135,9 @@ def new_trip(preferences: Preferences,
 | 
			
		||||
    # upon creation of the trip, persistence of both the trip and its landmarks is ensured.
 | 
			
		||||
    trip = Trip.from_linked_landmarks(linked_tour, cache_client)
 | 
			
		||||
    logger.info(f'Generated a trip of {trip.total_time} minutes with {len(refined_tour)} landmarks in {round(t_generate_landmarks + t_first_stage + t_second_stage,3)} seconds.')
 | 
			
		||||
 | 
			
		||||
    background_tasks.add_task(fill_cache)
 | 
			
		||||
 | 
			
		||||
    return trip
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -55,6 +55,9 @@ class Optimizer:
 | 
			
		||||
            self.average_walking_speed = parameters['average_walking_speed']
 | 
			
		||||
            self.max_landmarks = parameters['max_landmarks']
 | 
			
		||||
            self.overshoot = parameters['overshoot']
 | 
			
		||||
            self.time_limit = parameters['time_limit']
 | 
			
		||||
            self.gap_rel = parameters['gap_rel']
 | 
			
		||||
            self.max_iter = parameters['max_iter']
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int):
 | 
			
		||||
@@ -490,7 +493,18 @@ class Optimizer:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def warm_start(self, x: list[pl.LpVariable], L: int) :
 | 
			
		||||
        """
 | 
			
		||||
        This function sets the initial values of the decision variables to a feasible solution.
 | 
			
		||||
        This can help the solver start with a feasible or heuristic solution,
 | 
			
		||||
        potentially speeding up convergence.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
        x (list[pl.LpVariable]): A list of PuLP decision variables (binary variables).
 | 
			
		||||
        L (int): The size parameter, representing a dimension (likely related to a grid or matrix).
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
        list[pl.LpVariable]: The modified list of PuLP decision variables with initial values set.
 | 
			
		||||
        """
 | 
			
		||||
        for i in range(L*L) :
 | 
			
		||||
            x[i].setInitialValue(0)
 | 
			
		||||
 | 
			
		||||
@@ -573,7 +587,10 @@ class Optimizer:
 | 
			
		||||
        prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
 | 
			
		||||
 | 
			
		||||
        # Solve the problem and extract results.
 | 
			
		||||
        prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1, timeLimit=10, warmStart=False))
 | 
			
		||||
        try :
 | 
			
		||||
            prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit+1, gapRel=self.gap_rel))
 | 
			
		||||
        except Exception as exc :
 | 
			
		||||
            raise Exception(f"No solution found: {exc}") from exc
 | 
			
		||||
        status = pl.LpStatus[prob.status]
 | 
			
		||||
        solution = [pl.value(var) for var in x]  # The values of the decision variables (will be 0 or 1)
 | 
			
		||||
 | 
			
		||||
@@ -588,18 +605,21 @@ class Optimizer:
 | 
			
		||||
        circles = self.is_connected(solution)
 | 
			
		||||
 | 
			
		||||
        i = 0
 | 
			
		||||
        timeout = 40
 | 
			
		||||
        while circles is not None :
 | 
			
		||||
            i += 1
 | 
			
		||||
            if i == timeout :
 | 
			
		||||
                self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
 | 
			
		||||
                raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
 | 
			
		||||
            if i == self.max_iter :
 | 
			
		||||
                self.logger.error(f'Timeout: No solution found after {self.max_iter} iterations.')
 | 
			
		||||
                raise TimeoutError(f"Optimization took too long. No solution found after {self.max_iter} iterations.")
 | 
			
		||||
 | 
			
		||||
            for circle in circles :
 | 
			
		||||
                self.prevent_circle(prob, x, circle, L)
 | 
			
		||||
 | 
			
		||||
            # Solve the problem again
 | 
			
		||||
            prob.solve(pl.PULP_CBC_CMD(msg=False))
 | 
			
		||||
            try :
 | 
			
		||||
                prob.solve(pl.PULP_CBC_CMD(msg=False, timeLimit=self.time_limit, gapRel=self.gap_rel))
 | 
			
		||||
            except Exception as exc :
 | 
			
		||||
                raise Exception(f"No solution found: {exc}") from exc
 | 
			
		||||
 | 
			
		||||
            solution = [pl.value(var) for var in x]
 | 
			
		||||
 | 
			
		||||
            if pl.LpStatus[prob.status] != 'Optimal' :
 | 
			
		||||
@@ -614,5 +634,5 @@ class Optimizer:
 | 
			
		||||
        order = self.get_order(solution)
 | 
			
		||||
        tour =  [landmarks[i] for i in order]
 | 
			
		||||
 | 
			
		||||
        self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
 | 
			
		||||
        self.logger.info(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
 | 
			
		||||
        return tour
 | 
			
		||||
 
 | 
			
		||||
@@ -1,9 +1,8 @@
 | 
			
		||||
"""Module defining the caching strategy for overpass requests."""
 | 
			
		||||
import os
 | 
			
		||||
import xml.etree.ElementTree as ET
 | 
			
		||||
import json
 | 
			
		||||
import hashlib
 | 
			
		||||
 | 
			
		||||
from ..constants import OSM_CACHE_DIR
 | 
			
		||||
from ..constants import OSM_CACHE_DIR, OSM_TYPES
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_cache_key(query: str) -> str:
 | 
			
		||||
@@ -17,10 +16,6 @@ def get_cache_key(query: str) -> str:
 | 
			
		||||
class CachingStrategyBase:
 | 
			
		||||
    """
 | 
			
		||||
    Base class for implementing caching strategies.
 | 
			
		||||
 | 
			
		||||
    This class defines the structure for a caching strategy with basic methods
 | 
			
		||||
    that must be implemented by subclasses. Subclasses should define how to
 | 
			
		||||
    retrieve, store, and close the cache.
 | 
			
		||||
    """
 | 
			
		||||
    def get(self, key):
 | 
			
		||||
        """Retrieve the cached data associated with the provided key."""
 | 
			
		||||
@@ -30,111 +25,108 @@ class CachingStrategyBase:
 | 
			
		||||
        """Store data in the cache with the specified key."""
 | 
			
		||||
        raise NotImplementedError('Subclass should implement set')
 | 
			
		||||
 | 
			
		||||
    def set_hollow(self, key, **kwargs):
 | 
			
		||||
        """Create a hollow (empty) cache entry with a specific key."""
 | 
			
		||||
        raise NotImplementedError('Subclass should implement set_hollow')
 | 
			
		||||
 | 
			
		||||
    def close(self):
 | 
			
		||||
        """Clean up or close any resources used by the caching strategy."""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class XMLCache(CachingStrategyBase):
 | 
			
		||||
class JSONCache(CachingStrategyBase):
 | 
			
		||||
    """
 | 
			
		||||
    A caching strategy that stores and retrieves data in XML format.
 | 
			
		||||
 | 
			
		||||
    This class provides methods to cache data as XML files in a specified directory.
 | 
			
		||||
    The directory is automatically suffixed with '_XML' to distinguish it from other
 | 
			
		||||
    caching strategies. The data is stored and retrieved using XML serialization.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        cache_dir (str): The base directory where XML cache files will be stored.
 | 
			
		||||
                         Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix.
 | 
			
		||||
 | 
			
		||||
    Methods:
 | 
			
		||||
        get(key): Retrieve cached data from a XML file associated with the given key.
 | 
			
		||||
        set(key, value): Store data in a XML file with the specified key.
 | 
			
		||||
    A caching strategy that stores and retrieves data in JSON format.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, cache_dir=OSM_CACHE_DIR):
 | 
			
		||||
        # Add the class name as a suffix to the directory
 | 
			
		||||
        self._cache_dir = f'{cache_dir}_XML'
 | 
			
		||||
        self._cache_dir = f'{cache_dir}'
 | 
			
		||||
        if not os.path.exists(self._cache_dir):
 | 
			
		||||
            os.makedirs(self._cache_dir)
 | 
			
		||||
 | 
			
		||||
    def _filename(self, key):
 | 
			
		||||
        return os.path.join(self._cache_dir, f'{key}.xml')
 | 
			
		||||
        return os.path.join(self._cache_dir, f'{key}.json')
 | 
			
		||||
 | 
			
		||||
    def get(self, key):
 | 
			
		||||
        """Retrieve XML data from the cache and parse it as an ElementTree."""
 | 
			
		||||
        """Retrieve JSON data from the cache and parse it as an ElementTree."""
 | 
			
		||||
        filename = self._filename(key)
 | 
			
		||||
        if os.path.exists(filename):
 | 
			
		||||
            try:
 | 
			
		||||
                # Parse and return the cached XML data
 | 
			
		||||
                tree = ET.parse(filename)
 | 
			
		||||
                return tree.getroot()  # Return the root element of the parsed XML
 | 
			
		||||
            except ET.ParseError:
 | 
			
		||||
                # print(f"Error parsing cached XML file: {filename}")
 | 
			
		||||
                return None
 | 
			
		||||
                # Open and parse the cached JSON data
 | 
			
		||||
                with open(filename, 'r', encoding='utf-8') as file:
 | 
			
		||||
                    data = json.load(file)
 | 
			
		||||
                # Return the data as a list of dicts.
 | 
			
		||||
                return data
 | 
			
		||||
            except json.JSONDecodeError:
 | 
			
		||||
                return None  # Return None if parsing fails
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    def set(self, key, value):
 | 
			
		||||
        """Save the XML data as an ElementTree to the cache."""
 | 
			
		||||
        """Save the JSON data as an ElementTree to the cache."""
 | 
			
		||||
        filename = self._filename(key)
 | 
			
		||||
        tree = ET.ElementTree(value)  # value is expected to be an ElementTree root element
 | 
			
		||||
        try:
 | 
			
		||||
            # Write the XML data to a file
 | 
			
		||||
            with open(filename, 'wb') as file:
 | 
			
		||||
                tree.write(file, encoding='utf-8', xml_declaration=True)
 | 
			
		||||
            # Write the JSON data to the cache file
 | 
			
		||||
            with open(filename, 'w', encoding='utf-8') as file:
 | 
			
		||||
                json.dump(value, file, ensure_ascii=False, indent=4)
 | 
			
		||||
        except IOError as e:
 | 
			
		||||
            raise IOError(f"Error writing to cache file: {filename} - {e}") from e
 | 
			
		||||
 | 
			
		||||
    def set_hollow(self, key, cell: tuple, osm_types: list,
 | 
			
		||||
                    selector: str, conditions: list=None, out='center'):
 | 
			
		||||
        """Create an empty placeholder cache entry for a future fill."""
 | 
			
		||||
        hollow_key = f'hollow_{key}'
 | 
			
		||||
        filename = self._filename(hollow_key)
 | 
			
		||||
 | 
			
		||||
        # Create the hollow JSON structure
 | 
			
		||||
        hollow_data = {
 | 
			
		||||
            "key": key,
 | 
			
		||||
            "cell": list(cell),
 | 
			
		||||
            "osm_types": list(osm_types),
 | 
			
		||||
            "selector": selector,
 | 
			
		||||
            "conditions": conditions,
 | 
			
		||||
            "out": out
 | 
			
		||||
        }
 | 
			
		||||
        # Write the hollow data to the cache file
 | 
			
		||||
        try:
 | 
			
		||||
            with open(filename, 'w', encoding='utf-8') as file:
 | 
			
		||||
                json.dump(hollow_data, file, ensure_ascii=False, indent=4)
 | 
			
		||||
        except IOError as e:
 | 
			
		||||
            raise IOError(f"Error writing hollow cache to file: {filename} - {e}") from e
 | 
			
		||||
 | 
			
		||||
    def close(self):
 | 
			
		||||
        """Cleanup method, if needed."""
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
class CachingStrategy:
 | 
			
		||||
    """
 | 
			
		||||
    A class to manage different caching strategies.
 | 
			
		||||
 | 
			
		||||
    This class provides an interface to switch between different caching strategies 
 | 
			
		||||
    (e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats, 
 | 
			
		||||
    depending on the strategy being used. By default, it uses the XMLCache strategy.
 | 
			
		||||
 | 
			
		||||
    Attributes:
 | 
			
		||||
    __strategy (CachingStrategyBase): The currently active caching strategy.
 | 
			
		||||
    __strategies (dict): A mapping between strategy names (as strings) and their corresponding
 | 
			
		||||
                         classes, allowing dynamic selection of caching strategies.
 | 
			
		||||
    """
 | 
			
		||||
    __strategy = XMLCache()  # Default caching strategy
 | 
			
		||||
    __strategy = JSONCache()  # Default caching strategy
 | 
			
		||||
    __strategies = {
 | 
			
		||||
        'XML': XMLCache,
 | 
			
		||||
        'JSON': JSONCache,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def use(cls, strategy_name='XML', **kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        Set the caching strategy based on the strategy_name provided.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            strategy_name (str): The name of the caching strategy (e.g., 'XML').
 | 
			
		||||
            **kwargs: Additional keyword arguments to pass when initializing the strategy.
 | 
			
		||||
        """
 | 
			
		||||
        # If a previous strategy exists, close it
 | 
			
		||||
    def use(cls, strategy_name='JSON', **kwargs):
 | 
			
		||||
        if cls.__strategy:
 | 
			
		||||
            cls.__strategy.close()
 | 
			
		||||
 | 
			
		||||
        # Retrieve the strategy class based on the strategy name
 | 
			
		||||
        strategy_class = cls.__strategies.get(strategy_name)
 | 
			
		||||
 | 
			
		||||
        if not strategy_class:
 | 
			
		||||
            raise ValueError(f"Unknown caching strategy: {strategy_name}")
 | 
			
		||||
 | 
			
		||||
        # Instantiate the new strategy with the provided arguments
 | 
			
		||||
        cls.__strategy = strategy_class(**kwargs)
 | 
			
		||||
        return cls.__strategy
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def get(cls, key):
 | 
			
		||||
        """Get data from the current strategy's cache."""
 | 
			
		||||
        if not cls.__strategy:
 | 
			
		||||
            raise RuntimeError("Caching strategy has not been set.")
 | 
			
		||||
        return cls.__strategy.get(key)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def set(cls, key, value):
 | 
			
		||||
        """Set data in the current strategy's cache."""
 | 
			
		||||
        if not cls.__strategy:
 | 
			
		||||
            raise RuntimeError("Caching strategy has not been set.")
 | 
			
		||||
        cls.__strategy.set(key, value)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def set_hollow(cls, key, cell: tuple, osm_types: OSM_TYPES,
 | 
			
		||||
                    selector: str, conditions: list=None, out='center'):
 | 
			
		||||
        """Create a hollow cache entry."""
 | 
			
		||||
        cls.__strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,14 +1,17 @@
 | 
			
		||||
"""Module allowing connexion to overpass api and fectch data from OSM."""
 | 
			
		||||
from typing import Literal, List
 | 
			
		||||
import os
 | 
			
		||||
import urllib
 | 
			
		||||
import math
 | 
			
		||||
import logging
 | 
			
		||||
import xml.etree.ElementTree as ET
 | 
			
		||||
import json
 | 
			
		||||
from typing import List, Tuple
 | 
			
		||||
 | 
			
		||||
from .caching_strategy import get_cache_key, CachingStrategy
 | 
			
		||||
from ..constants import OSM_CACHE_DIR
 | 
			
		||||
from ..constants import OSM_CACHE_DIR, OSM_TYPES, BBOX
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger('Overpass')
 | 
			
		||||
osm_types = List[Literal['way', 'node', 'relation']]
 | 
			
		||||
 | 
			
		||||
RESOLUTION = 0.05
 | 
			
		||||
CELL = Tuple[int, int]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Overpass :
 | 
			
		||||
@@ -16,7 +19,10 @@ class Overpass :
 | 
			
		||||
    Overpass class to manage the query building and sending to overpass api.
 | 
			
		||||
    The caching strategy is a part of this class and initialized upon creation of the Overpass object.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) :
 | 
			
		||||
    logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def __init__(self, caching_strategy: str = 'JSON', cache_dir: str = OSM_CACHE_DIR) :
 | 
			
		||||
        """
 | 
			
		||||
        Initialize the Overpass instance with the url, headers and caching strategy.
 | 
			
		||||
        """
 | 
			
		||||
@@ -25,17 +31,109 @@ class Overpass :
 | 
			
		||||
        self.caching_strategy = CachingStrategy.use(caching_strategy, cache_dir=cache_dir)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def build_query(self, area: tuple, osm_types: osm_types,
 | 
			
		||||
                    selector: str, conditions=[], out='center') -> str:
 | 
			
		||||
    def send_query(self, bbox: BBOX, osm_types: OSM_TYPES,
 | 
			
		||||
                    selector: str, conditions: list=None, out='center') -> List[dict]:
 | 
			
		||||
        """
 | 
			
		||||
        Sends the Overpass QL query to the Overpass API and returns the parsed json response.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            bbox (tuple): Bounding box for the query.
 | 
			
		||||
            osm_types (list[str]): List of OSM element types (e.g., 'node', 'way').
 | 
			
		||||
            selector (str): Key or tag to filter OSM elements (e.g., 'highway').
 | 
			
		||||
            conditions (list): Optional list of additional filter conditions in Overpass QL format.
 | 
			
		||||
            out (str): Output format ('center', 'body', etc.). Defaults to 'center'.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            list:   Parsed json response from the Overpass API, or cached data if available.
 | 
			
		||||
        """
 | 
			
		||||
        # Determine which grid cells overlap with this bounding box.
 | 
			
		||||
        overlapping_cells = Overpass._get_overlapping_cells(bbox)
 | 
			
		||||
 | 
			
		||||
        # Retrieve cached data and identify missing cache entries
 | 
			
		||||
        cached_responses, non_cached_cells = self._retrieve_cached_data(overlapping_cells, osm_types, selector, conditions, out)
 | 
			
		||||
 | 
			
		||||
        self.logger.info(f'Cache hit for {len(overlapping_cells)-len(non_cached_cells)}/{len(overlapping_cells)} quadrants.')
 | 
			
		||||
 | 
			
		||||
        # If there is no missing data, return the cached responses after filtering.
 | 
			
		||||
        if not non_cached_cells :
 | 
			
		||||
            return Overpass._filter_landmarks(cached_responses, bbox)
 | 
			
		||||
 | 
			
		||||
        # If there is no cached data, fetch all from Overpass.
 | 
			
		||||
        elif not cached_responses :
 | 
			
		||||
            query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
 | 
			
		||||
            return self.fetch_data_from_api(query_str)
 | 
			
		||||
 | 
			
		||||
        # Hybrid cache: some data from Overpass, some data from cache.
 | 
			
		||||
        else :
 | 
			
		||||
            # Resize the bbox for smaller search area and build new query string.
 | 
			
		||||
            non_cached_bbox = Overpass._get_non_cached_bbox(non_cached_cells, bbox)
 | 
			
		||||
            query_str = Overpass.build_query(non_cached_bbox, osm_types, selector, conditions, out)
 | 
			
		||||
            non_cached_responses = self.fetch_data_from_api(query_str)
 | 
			
		||||
            return Overpass._filter_landmarks(cached_responses, bbox) + non_cached_responses
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def fetch_data_from_api(self, query_str: str) -> List[dict]:
 | 
			
		||||
        """
 | 
			
		||||
        Fetch data from the Overpass API and return the json data.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            query_str (str): The Overpass query string.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: Combined cached and fetched data.
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
 | 
			
		||||
            request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
 | 
			
		||||
 | 
			
		||||
            with urllib.request.urlopen(request) as response:
 | 
			
		||||
                response_data = response.read().decode('utf-8')  # Convert the HTTPResponse to a string
 | 
			
		||||
                data = json.loads(response_data)  # Load the JSON from the string
 | 
			
		||||
                elements = data.get('elements', [])
 | 
			
		||||
                # self.logger.debug(f'Query = {query_str}')
 | 
			
		||||
                return elements
 | 
			
		||||
 | 
			
		||||
        except urllib.error.URLError as e:
 | 
			
		||||
            self.logger.error(f"Error connecting to Overpass API: {e}")
 | 
			
		||||
            raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
 | 
			
		||||
        except Exception as exc :
 | 
			
		||||
            raise Exception(f'An unexpected error occured: {str(exc)}') from exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def fill_cache(self, json_data: dict) :
 | 
			
		||||
        """
 | 
			
		||||
        Fill cache with data by using a hollow cache entry's information.
 | 
			
		||||
        """
 | 
			
		||||
        query_str, cache_key = Overpass._build_query_from_hollow(json_data)
 | 
			
		||||
        try:
 | 
			
		||||
            data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
 | 
			
		||||
            request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
 | 
			
		||||
 | 
			
		||||
            with urllib.request.urlopen(request) as response:
 | 
			
		||||
 | 
			
		||||
                # Convert the HTTPResponse to a string and load data
 | 
			
		||||
                response_data = response.read().decode('utf-8')  
 | 
			
		||||
                data = json.loads(response_data)
 | 
			
		||||
 | 
			
		||||
                # Get elements and set cache
 | 
			
		||||
                elements = data.get('elements', [])
 | 
			
		||||
                self.caching_strategy.set(cache_key, elements)
 | 
			
		||||
                self.logger.debug(f'Cache set for {cache_key}')
 | 
			
		||||
        except urllib.error.URLError as e:
 | 
			
		||||
            raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
 | 
			
		||||
        except Exception as exc :
 | 
			
		||||
            raise Exception(f'An unexpected error occured: {str(exc)}') from exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def build_query(bbox: BBOX, osm_types: OSM_TYPES,
 | 
			
		||||
                    selector: str, conditions: list=None, out='center') -> str:
 | 
			
		||||
        """
 | 
			
		||||
        Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            area (tuple): A tuple representing the geographical search area, typically in the format 
 | 
			
		||||
                        (radius, latitude, longitude). The first element is a string like "around:2000" 
 | 
			
		||||
                        specifying the search radius, and the second and third elements represent 
 | 
			
		||||
                        the latitude and longitude as floats or strings.
 | 
			
		||||
            bbox (tuple): A tuple representing the geographical search area, typically in the format 
 | 
			
		||||
                        (lat_min, lon_min, lat_max, lon_max).
 | 
			
		||||
            osm_types (list[str]): A list of OSM element types to search for. Must be one or more of 
 | 
			
		||||
                                    'Way', 'Node', or 'Relation'.
 | 
			
		||||
            selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.).
 | 
			
		||||
@@ -52,82 +150,203 @@ class Overpass :
 | 
			
		||||
        Notes:
 | 
			
		||||
            - If no conditions are provided, the query will just use the `selector` to filter the OSM 
 | 
			
		||||
            elements without additional constraints.
 | 
			
		||||
            - The search area must always formatted as "(radius, lat, lon)".
 | 
			
		||||
        """
 | 
			
		||||
        if not isinstance(conditions, list) :
 | 
			
		||||
            conditions = [conditions]
 | 
			
		||||
        if not isinstance(osm_types, list) :
 | 
			
		||||
            osm_types = [osm_types]
 | 
			
		||||
        query = '[out:json];('
 | 
			
		||||
 | 
			
		||||
        query = '('
 | 
			
		||||
        # convert the bbox to string.
 | 
			
		||||
        bbox_str = f"({','.join(map(str, bbox))})"
 | 
			
		||||
 | 
			
		||||
        # Round the radius to nearest 50 and coordinates to generate less queries
 | 
			
		||||
        if area[0] > 500 :
 | 
			
		||||
            search_radius = round(area[0] / 50) * 50
 | 
			
		||||
            loc = tuple((round(area[1], 2), round(area[2], 2)))
 | 
			
		||||
        else :
 | 
			
		||||
            search_radius = round(area[0] / 25) * 25
 | 
			
		||||
            loc = tuple((round(area[1], 3), round(area[2], 3)))
 | 
			
		||||
 | 
			
		||||
        search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})"
 | 
			
		||||
 | 
			
		||||
        if conditions :
 | 
			
		||||
        if conditions is not None and len(conditions) > 0:
 | 
			
		||||
            conditions = '(if: ' + ' && '.join(conditions) + ')'
 | 
			
		||||
        else :
 | 
			
		||||
            conditions = ''
 | 
			
		||||
 | 
			
		||||
        for elem in osm_types :
 | 
			
		||||
            query += elem + '[' + selector + ']' + conditions + search_area + ';'
 | 
			
		||||
            query += elem + '[' + selector + ']' + conditions + bbox_str + ';'
 | 
			
		||||
 | 
			
		||||
        query += ');' + f'out {out};'
 | 
			
		||||
 | 
			
		||||
        return query
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def send_query(self, query: str) -> ET:
 | 
			
		||||
    def _retrieve_cached_data(self, overlapping_cells: CELL, osm_types: OSM_TYPES,
 | 
			
		||||
                              selector: str, conditions: list, out: str) -> Tuple[List[dict], list[CELL]]:
 | 
			
		||||
        """
 | 
			
		||||
        Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
 | 
			
		||||
        Retrieve cached data and identify missing cache quadrants.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            query (str): The Overpass QL query to be sent to the Overpass API.
 | 
			
		||||
            overlapping_cells (list): Cells to check for cached data.
 | 
			
		||||
            osm_types (list): OSM types (e.g., 'node', 'way').
 | 
			
		||||
            selector (str): Key or tag to filter OSM elements.
 | 
			
		||||
            conditions (list): Additional conditions to apply.
 | 
			
		||||
            out (str): Output format.
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The parsed JSON response from the Overpass API, or None if the request fails.
 | 
			
		||||
            tuple: A tuple containing:
 | 
			
		||||
                - cached_responses (list): List of cached data found.
 | 
			
		||||
                - non_cached_cells (list(tuple)): List of cells with missing data.
 | 
			
		||||
        """
 | 
			
		||||
        cell_key_dict = {}
 | 
			
		||||
        for cell in overlapping_cells :
 | 
			
		||||
            for elem in osm_types :
 | 
			
		||||
                key_str = f"{elem}[{selector}]{conditions}({','.join(map(str, cell))})"
 | 
			
		||||
 | 
			
		||||
            cell_key_dict[cell] = get_cache_key(key_str)
 | 
			
		||||
 | 
			
		||||
        cached_responses = []
 | 
			
		||||
        non_cached_cells = []
 | 
			
		||||
 | 
			
		||||
        # Retrieve the cached data and mark the missing entries as hollow
 | 
			
		||||
        for cell, key in cell_key_dict.items():
 | 
			
		||||
            cached_data = self.caching_strategy.get(key)
 | 
			
		||||
            if cached_data is not None :
 | 
			
		||||
                cached_responses += cached_data
 | 
			
		||||
            else:
 | 
			
		||||
                self.caching_strategy.set_hollow(key, cell, osm_types, selector, conditions, out)
 | 
			
		||||
                non_cached_cells.append(cell)
 | 
			
		||||
 | 
			
		||||
        return cached_responses, non_cached_cells
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _build_query_from_hollow(json_data: dict) -> Tuple[str, str]:
 | 
			
		||||
        """
 | 
			
		||||
        Build query string using information from a hollow cache entry.
 | 
			
		||||
        """
 | 
			
		||||
        # Extract values from the JSON object
 | 
			
		||||
        key = json_data.get('key')
 | 
			
		||||
        cell = tuple(json_data.get('cell'))
 | 
			
		||||
        bbox = Overpass._get_bbox_from_grid_cell(cell)
 | 
			
		||||
        osm_types = json_data.get('osm_types')
 | 
			
		||||
        selector = json_data.get('selector')
 | 
			
		||||
        conditions = json_data.get('conditions')
 | 
			
		||||
        out = json_data.get('out')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
 | 
			
		||||
        return query_str, key
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_overlapping_cells(query_bbox: tuple) -> List[CELL]:
 | 
			
		||||
        """
 | 
			
		||||
        Returns a set of all grid cells that overlap with the given bounding box.
 | 
			
		||||
        """
 | 
			
		||||
        # Extract location from the query bbox
 | 
			
		||||
        lat_min, lon_min, lat_max, lon_max = query_bbox
 | 
			
		||||
 | 
			
		||||
        min_lat_cell, min_lon_cell = Overpass._get_grid_cell(lat_min, lon_min)
 | 
			
		||||
        max_lat_cell, max_lon_cell = Overpass._get_grid_cell(lat_max, lon_max)
 | 
			
		||||
 | 
			
		||||
        overlapping_cells = set()
 | 
			
		||||
        for lat_idx in range(min_lat_cell, max_lat_cell + 1):
 | 
			
		||||
            for lon_idx in range(min_lon_cell, max_lon_cell + 1):
 | 
			
		||||
                overlapping_cells.add((lat_idx, lon_idx))
 | 
			
		||||
 | 
			
		||||
        return overlapping_cells
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_grid_cell(lat: float, lon: float) -> CELL:
 | 
			
		||||
        """
 | 
			
		||||
        Returns the grid cell coordinates for a given latitude and longitude.
 | 
			
		||||
        Each grid cell is 0.05°lat x 0.05°lon resolution in size.
 | 
			
		||||
        """
 | 
			
		||||
        lat_index = math.floor(lat / RESOLUTION)
 | 
			
		||||
        lon_index = math.floor(lon / RESOLUTION)
 | 
			
		||||
        return (lat_index, lon_index)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_bbox_from_grid_cell(cell: CELL) -> BBOX:
 | 
			
		||||
        """
 | 
			
		||||
        Returns the bounding box for a given grid cell index.
 | 
			
		||||
        Each grid cell is resolution x resolution in size.
 | 
			
		||||
 | 
			
		||||
        The bounding box is returned as (min_lat, min_lon, max_lat, max_lon).
 | 
			
		||||
        """
 | 
			
		||||
        # Calculate the southwest (min_lat, min_lon) corner of the bounding box
 | 
			
		||||
        min_lat = round(cell[0] * RESOLUTION, 2)
 | 
			
		||||
        min_lon = round(cell[1] * RESOLUTION, 2)
 | 
			
		||||
 | 
			
		||||
        # Calculate the northeast (max_lat, max_lon) corner of the bounding box
 | 
			
		||||
        max_lat = round((cell[0] + 1) * RESOLUTION, 2)
 | 
			
		||||
        max_lon = round((cell[1] + 1) * RESOLUTION, 2)
 | 
			
		||||
 | 
			
		||||
        return (min_lat, min_lon, max_lat, max_lon)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _get_non_cached_bbox(non_cached_cells: List[CELL], original_bbox: BBOX):
 | 
			
		||||
        """
 | 
			
		||||
        Calculate the non-cached bounding box by excluding cached cells.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
            non_cached_cells (list): The list of cells that were not found in the cache.
 | 
			
		||||
            original_bbox (tuple): The original bounding box (min_lat, min_lon, max_lat, max_lon).
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
            tuple: The new bounding box that excludes cached cells, or None if all cells are cached.
 | 
			
		||||
        """
 | 
			
		||||
        if not non_cached_cells:
 | 
			
		||||
            return None  # All cells were cached
 | 
			
		||||
 | 
			
		||||
        # Initialize the non-cached bounding box with extreme values
 | 
			
		||||
        min_lat, min_lon, max_lat, max_lon = float('inf'), float('inf'), float('-inf'), float('-inf')
 | 
			
		||||
 | 
			
		||||
        # Iterate over non-cached cells to find the new bounding box
 | 
			
		||||
        for cell in non_cached_cells:
 | 
			
		||||
            cell_min_lat, cell_min_lon, cell_max_lat, cell_max_lon = Overpass._get_bbox_from_grid_cell(cell)
 | 
			
		||||
 | 
			
		||||
            min_lat = min(min_lat, cell_min_lat)
 | 
			
		||||
            min_lon = min(min_lon, cell_min_lon)
 | 
			
		||||
            max_lat = max(max_lat, cell_max_lat)
 | 
			
		||||
            max_lon = max(max_lon, cell_max_lon)
 | 
			
		||||
 | 
			
		||||
        # If no update to bounding box, return the original
 | 
			
		||||
        if min_lat == float('inf') or min_lon == float('inf'):
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        return (max(min_lat, original_bbox[0]), 
 | 
			
		||||
                max(min_lon, original_bbox[1]), 
 | 
			
		||||
                min(max_lat, original_bbox[2]), 
 | 
			
		||||
                min(max_lon, original_bbox[3]))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _filter_landmarks(elements: List[dict], bbox: BBOX) -> List[dict]:
 | 
			
		||||
        """
 | 
			
		||||
        Filters elements based on whether their coordinates are inside the given bbox.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
        - elements (list of dict): List of elements containing coordinates.
 | 
			
		||||
        - bbox (tuple): A bounding box defined as (min_lat, min_lon, max_lat, max_lon).
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
        - list: A list of elements whose coordinates are inside the bounding box.
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        # Generate a cache key for the current query
 | 
			
		||||
        cache_key = get_cache_key(query)
 | 
			
		||||
        filtered_elements = []
 | 
			
		||||
        min_lat, min_lon, max_lat, max_lon = bbox
 | 
			
		||||
 | 
			
		||||
        # Try to fetch the result from the cache
 | 
			
		||||
        cached_response = self.caching_strategy.get(cache_key)
 | 
			
		||||
        if cached_response is not None :
 | 
			
		||||
            logger.debug("Cache hit.")
 | 
			
		||||
            return cached_response
 | 
			
		||||
        for elem in elements:
 | 
			
		||||
            # Extract coordinates based on the 'type' of element
 | 
			
		||||
            if elem.get('type') != 'node':
 | 
			
		||||
                center = elem.get('center', {})
 | 
			
		||||
                lat = float(center.get('lat', 0))
 | 
			
		||||
                lon = float(center.get('lon', 0))
 | 
			
		||||
            else:
 | 
			
		||||
                lat = float(elem.get('lat', 0))
 | 
			
		||||
                lon = float(elem.get('lon', 0))
 | 
			
		||||
 | 
			
		||||
        # Prepare the data to be sent as POST request, encoded as bytes
 | 
			
		||||
        data = urllib.parse.urlencode({'data': query}).encode('utf-8')
 | 
			
		||||
            # Check if the coordinates fall within the given bounding box
 | 
			
		||||
            if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon:
 | 
			
		||||
                filtered_elements.append(elem)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # Create a Request object with the specified URL, data, and headers
 | 
			
		||||
            request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
 | 
			
		||||
 | 
			
		||||
            # Send the request and read the response
 | 
			
		||||
            with urllib.request.urlopen(request) as response:
 | 
			
		||||
                # Read and decode the response
 | 
			
		||||
                response_data = response.read().decode('utf-8')
 | 
			
		||||
                root = ET.fromstring(response_data)
 | 
			
		||||
 | 
			
		||||
                # Cache the response data as an ElementTree root
 | 
			
		||||
                self.caching_strategy.set(cache_key, root)
 | 
			
		||||
                logger.debug("Response data added to cache.")
 | 
			
		||||
 | 
			
		||||
                return root
 | 
			
		||||
 | 
			
		||||
        except urllib.error.URLError as e:
 | 
			
		||||
            raise ConnectionError(f"Error connecting to Overpass API: {e}") from e
 | 
			
		||||
        return filtered_elements
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
 | 
			
		||||
def get_base_info(elem: dict, osm_type: OSM_TYPES, with_name=False) :
 | 
			
		||||
    """
 | 
			
		||||
    Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element.
 | 
			
		||||
 | 
			
		||||
@@ -136,7 +355,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
 | 
			
		||||
    extracting coordinates either directly or from a center tag, depending on the element type.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        elem (ET.Element): The XML element representing the OSM entity.
 | 
			
		||||
        elem (dict): The JSON element representing the OSM entity.
 | 
			
		||||
        osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates
 | 
			
		||||
                        are extracted directly from the element; otherwise, from the 'center' tag.
 | 
			
		||||
        with_name (bool): Whether to extract and return the name of the element. If True, it attempts
 | 
			
		||||
@@ -150,7 +369,7 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
 | 
			
		||||
    """
 | 
			
		||||
    # 1. extract coordinates
 | 
			
		||||
    if osm_type != 'node' :
 | 
			
		||||
        center = elem.find('center')
 | 
			
		||||
        center = elem.get('center')
 | 
			
		||||
        lat = float(center.get('lat'))
 | 
			
		||||
        lon = float(center.get('lon'))
 | 
			
		||||
 | 
			
		||||
@@ -165,7 +384,31 @@ def get_base_info(elem: ET.Element, osm_type: osm_types, with_name=False) :
 | 
			
		||||
 | 
			
		||||
    # 3. Extract name if specified and return
 | 
			
		||||
    if with_name :
 | 
			
		||||
        name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
 | 
			
		||||
        name = elem.get('tags', {}).get('name')
 | 
			
		||||
        return osm_id, coords, name
 | 
			
		||||
    else :
 | 
			
		||||
        return osm_id, coords
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def fill_cache():
 | 
			
		||||
    """
 | 
			
		||||
    Scans the specified cache directory for files starting with 'hollow_' and attempts to load
 | 
			
		||||
    their contents as JSON to fill the cache of the Overpass system.
 | 
			
		||||
    """
 | 
			
		||||
    overpass = Overpass()
 | 
			
		||||
 | 
			
		||||
    with os.scandir(OSM_CACHE_DIR) as it:
 | 
			
		||||
        for entry in it:
 | 
			
		||||
            if entry.is_file() and entry.name.startswith('hollow_'):
 | 
			
		||||
 | 
			
		||||
                try :
 | 
			
		||||
                    # Read the whole file content as a string
 | 
			
		||||
                    with open(entry.path, 'r') as f:
 | 
			
		||||
                        # load data and fill the cache with the query and key
 | 
			
		||||
                        json_data = json.load(f)
 | 
			
		||||
                        overpass.fill_cache(json_data)
 | 
			
		||||
                    # Now delete the file as the cache is filled
 | 
			
		||||
                    os.remove(entry.path)
 | 
			
		||||
 | 
			
		||||
                except Exception as exc :
 | 
			
		||||
                    overpass.logger.error(f'An error occured while parsing file {entry.path} as .json file')
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
city_bbox_side: 7500 #m
 | 
			
		||||
max_bbox_side: 4000   #m
 | 
			
		||||
radius_close_to: 50
 | 
			
		||||
church_coeff: 0.55
 | 
			
		||||
nature_coeff: 1.4
 | 
			
		||||
@@ -8,5 +8,5 @@ image_bonus: 1.1
 | 
			
		||||
viewpoint_bonus: 5
 | 
			
		||||
wikipedia_bonus: 1.25
 | 
			
		||||
name_bonus: 3
 | 
			
		||||
N_important: 40
 | 
			
		||||
N_important: 60
 | 
			
		||||
pay_bonus: -1
 | 
			
		||||
 
 | 
			
		||||
@@ -4,3 +4,6 @@ average_walking_speed: 4.8
 | 
			
		||||
max_landmarks: 10
 | 
			
		||||
max_landmarks_refiner: 20
 | 
			
		||||
overshoot: 0.0016
 | 
			
		||||
time_limit: 1
 | 
			
		||||
gap_rel: 0.05
 | 
			
		||||
max_iter: 40
 | 
			
		||||
@@ -2,7 +2,7 @@
 | 
			
		||||
 | 
			
		||||
from typing import Optional, Literal
 | 
			
		||||
from uuid import uuid4, UUID
 | 
			
		||||
from pydantic import BaseModel, Field
 | 
			
		||||
from pydantic import BaseModel, ConfigDict, Field
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Output to frontend
 | 
			
		||||
@@ -144,8 +144,4 @@ class Toilets(BaseModel) :
 | 
			
		||||
        """
 | 
			
		||||
        return f'Toilets @{self.location}'
 | 
			
		||||
 | 
			
		||||
    class Config:
 | 
			
		||||
        """
 | 
			
		||||
        This allows us to easily convert the model to and from dictionaries
 | 
			
		||||
        """
 | 
			
		||||
        from_attributes = True
 | 
			
		||||
    model_config = ConfigDict(from_attributes=True)
 | 
			
		||||
 
 | 
			
		||||
@@ -27,11 +27,13 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name
 | 
			
		||||
        "/trip/new",
 | 
			
		||||
        json={
 | 
			
		||||
            "preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
 | 
			
		||||
            "nature": {"type": "nature", "score": 5},
 | 
			
		||||
            "shopping": {"type": "shopping", "score": 5},
 | 
			
		||||
            "nature": {"type": "nature", "score": 0},
 | 
			
		||||
            "shopping": {"type": "shopping", "score": 0},
 | 
			
		||||
            "max_time_minute": duration_minutes,
 | 
			
		||||
            "detour_tolerance_minute": 0},
 | 
			
		||||
            "start": [48.084588, 7.280405]
 | 
			
		||||
            # "start": [48.084588, 7.280405]
 | 
			
		||||
            # "start": [45.74445023349939, 4.8222687890538865]
 | 
			
		||||
            "start": [45.75156398104873, 4.827154464827647]
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    result = response.json()
 | 
			
		||||
@@ -51,11 +53,11 @@ def test_turckheim(client, request):    # pylint: disable=redefined-outer-name
 | 
			
		||||
    assert response.status_code == 200  # check for successful planning
 | 
			
		||||
    assert isinstance(landmarks, list)  # check that the return type is a list
 | 
			
		||||
    assert len(landmarks) > 2           # check that there is something to visit
 | 
			
		||||
    assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
 | 
			
		||||
    assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
 | 
			
		||||
    assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
 | 
			
		||||
    # assert 2!= 3
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
 | 
			
		||||
@@ -97,10 +99,9 @@ def test_bellecour(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
 | 
			
		||||
    assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_cologne(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
 | 
			
		||||
    Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area.
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        client:
 | 
			
		||||
@@ -141,7 +142,7 @@ def test_cologne(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
 | 
			
		||||
def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
 | 
			
		||||
    Test n°4 : Custom test in Strasbourg to ensure proper decision making in crowded area.
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        client:
 | 
			
		||||
@@ -182,7 +183,7 @@ def test_strasbourg(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
 | 
			
		||||
def test_zurich(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
 | 
			
		||||
    Test n°5 : Custom test in Zurich to ensure proper decision making in crowded area.
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        client:
 | 
			
		||||
@@ -223,24 +224,24 @@ def test_zurich(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
 | 
			
		||||
def test_paris(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
 | 
			
		||||
    Test n°6 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        client:
 | 
			
		||||
        request:
 | 
			
		||||
    """
 | 
			
		||||
    start_time = time.time()  # Start timer
 | 
			
		||||
    duration_minutes = 300
 | 
			
		||||
    duration_minutes = 200
 | 
			
		||||
 | 
			
		||||
    response = client.post(
 | 
			
		||||
        "/trip/new",
 | 
			
		||||
        json={
 | 
			
		||||
            "preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
 | 
			
		||||
                            "nature": {"type": "nature", "score": 5},
 | 
			
		||||
                            "nature": {"type": "nature", "score": 0},
 | 
			
		||||
                            "shopping": {"type": "shopping", "score": 5},
 | 
			
		||||
                            "max_time_minute": duration_minutes,
 | 
			
		||||
                            "detour_tolerance_minute": 0},
 | 
			
		||||
            "start": [48.86248803298562, 2.346451131285925]
 | 
			
		||||
            "start": [48.85468881798671, 2.3423925755998374]
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
    result = response.json()
 | 
			
		||||
@@ -264,7 +265,7 @@ def test_paris(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
 | 
			
		||||
def test_new_york(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area.
 | 
			
		||||
    Test n°7 : Custom test in New York to ensure proper decision making in crowded area.
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        client:
 | 
			
		||||
@@ -305,7 +306,7 @@ def test_new_york(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
 | 
			
		||||
def test_shopping(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    """
 | 
			
		||||
    Test n°3 : Custom test in Lyon centre to ensure shopping clusters are found.
 | 
			
		||||
    Test n°8 : Custom test in Lyon centre to ensure shopping clusters are found.
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        client:
 | 
			
		||||
@@ -334,8 +335,8 @@ def test_shopping(client, request) :   # pylint: disable=redefined-outer-name
 | 
			
		||||
    # Add details to report
 | 
			
		||||
    log_trip_details(request, landmarks, result['total_time'], duration_minutes)
 | 
			
		||||
 | 
			
		||||
    for elem in landmarks :
 | 
			
		||||
        print(elem)
 | 
			
		||||
    # for elem in landmarks :
 | 
			
		||||
    #     print(elem)
 | 
			
		||||
 | 
			
		||||
    # checks :
 | 
			
		||||
    assert response.status_code == 200  # check for successful planning
 | 
			
		||||
 
 | 
			
		||||
@@ -9,7 +9,8 @@ from pydantic import BaseModel
 | 
			
		||||
from ..overpass.overpass import Overpass, get_base_info
 | 
			
		||||
from ..structs.landmark import Landmark
 | 
			
		||||
from .get_time_distance import get_distance
 | 
			
		||||
from ..constants import OSM_CACHE_DIR
 | 
			
		||||
from .utils import create_bbox
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# silence the overpass logger
 | 
			
		||||
@@ -79,8 +80,7 @@ class ClusterManager:
 | 
			
		||||
            bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
 | 
			
		||||
        """
 | 
			
		||||
        # Setup the caching in the Overpass class.
 | 
			
		||||
        self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
 | 
			
		||||
 | 
			
		||||
        self.overpass = Overpass()
 | 
			
		||||
 | 
			
		||||
        self.cluster_type = cluster_type
 | 
			
		||||
        if cluster_type == 'shopping' :
 | 
			
		||||
@@ -95,27 +95,24 @@ class ClusterManager:
 | 
			
		||||
            raise NotImplementedError("Please choose only an available option for cluster detection")
 | 
			
		||||
 | 
			
		||||
        # Initialize the points for cluster detection
 | 
			
		||||
        query = self.overpass.build_query(
 | 
			
		||||
            area = bbox,
 | 
			
		||||
        try:
 | 
			
		||||
            result = self.overpass.send_query(
 | 
			
		||||
            bbox = bbox,
 | 
			
		||||
            osm_types = osm_types,
 | 
			
		||||
            selector = sel,
 | 
			
		||||
            out = out
 | 
			
		||||
        )
 | 
			
		||||
        self.logger.debug(f"Cluster query: {query}")
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            result = self.overpass.send_query(query)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.error(f"Error fetching landmarks: {e}")
 | 
			
		||||
            self.logger.error(f"Error fetching clusters: {e}")
 | 
			
		||||
 | 
			
		||||
        if result is None :
 | 
			
		||||
            self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.")
 | 
			
		||||
            self.logger.debug(f"Found no {cluster_type} clusters, overpass query returned no datapoints.")
 | 
			
		||||
            self.valid = False
 | 
			
		||||
 | 
			
		||||
        else :
 | 
			
		||||
            points = []
 | 
			
		||||
            for osm_type in osm_types :
 | 
			
		||||
                for elem in result.findall(osm_type):
 | 
			
		||||
            for elem in result:
 | 
			
		||||
                osm_type = elem.get('type')
 | 
			
		||||
                
 | 
			
		||||
                # Get coordinates and append them to the points list
 | 
			
		||||
                _, coords = get_base_info(elem, osm_type)
 | 
			
		||||
@@ -137,7 +134,7 @@ class ClusterManager:
 | 
			
		||||
 | 
			
		||||
                # Check that there are is least 1 cluster
 | 
			
		||||
                if len(set(labels)) > 1 :
 | 
			
		||||
                    self.logger.debug(f"Found {len(set(labels))} different clusters.")
 | 
			
		||||
                    self.logger.info(f"Found {len(set(labels))} different {cluster_type} clusters.")
 | 
			
		||||
                    # Separate clustered points and noise points
 | 
			
		||||
                    self.cluster_points = self.all_points[labels != -1]
 | 
			
		||||
                    self.cluster_labels = labels[labels != -1]
 | 
			
		||||
@@ -145,7 +142,7 @@ class ClusterManager:
 | 
			
		||||
                    self.valid = True
 | 
			
		||||
 | 
			
		||||
                else :
 | 
			
		||||
                    self.logger.debug(f"Detected 0 {cluster_type} clusters.")
 | 
			
		||||
                    self.logger.info(f"Found 0 {cluster_type} clusters.")
 | 
			
		||||
                    self.valid = False
 | 
			
		||||
 | 
			
		||||
            else :
 | 
			
		||||
@@ -218,8 +215,7 @@ class ClusterManager:
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        # Define the bounding box for a given radius around the coordinates
 | 
			
		||||
        lat, lon = cluster.centroid
 | 
			
		||||
        bbox = (1000, lat, lon)
 | 
			
		||||
        bbox = create_bbox(cluster.centroid, 1000)
 | 
			
		||||
        
 | 
			
		||||
        # Query neighborhoods and shopping malls
 | 
			
		||||
        selectors = ['"place"~"^(suburb|neighborhood|neighbourhood|quarter|city_block)$"']
 | 
			
		||||
@@ -238,25 +234,22 @@ class ClusterManager:
 | 
			
		||||
        osm_types = ['node', 'way', 'relation']
 | 
			
		||||
 | 
			
		||||
        for sel in selectors :
 | 
			
		||||
            query = self.overpass.build_query(
 | 
			
		||||
                area = bbox,
 | 
			
		||||
            try:
 | 
			
		||||
                result = self.overpass.send_query(bbox = bbox,
 | 
			
		||||
                                                  osm_types = osm_types,
 | 
			
		||||
                                                  selector = sel,
 | 
			
		||||
                                                  out = 'ids center'
 | 
			
		||||
                                                  )
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                result = self.overpass.send_query(query)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                self.logger.error(f"Error fetching landmarks: {e}")
 | 
			
		||||
                self.logger.error(f"Error fetching clusters: {e}")
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            if result is None :
 | 
			
		||||
                self.logger.error(f"Error fetching landmarks: {e}")
 | 
			
		||||
                self.logger.error(f"Error fetching clusters: {e}")
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            for osm_type in osm_types :
 | 
			
		||||
                for elem in result.findall(osm_type):
 | 
			
		||||
            for elem in result:
 | 
			
		||||
                osm_type = elem.get('type')
 | 
			
		||||
 | 
			
		||||
                id, coords, name = get_base_info(elem, osm_type, with_name=True)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,19 +1,15 @@
 | 
			
		||||
"""Module used to import data from OSM and arrange them in categories."""
 | 
			
		||||
import logging
 | 
			
		||||
import xml.etree.ElementTree as ET
 | 
			
		||||
import yaml
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from ..structs.preferences import Preferences
 | 
			
		||||
from ..structs.landmark import Landmark
 | 
			
		||||
from .take_most_important import take_most_important
 | 
			
		||||
from .cluster_manager import ClusterManager
 | 
			
		||||
from ..overpass.overpass import Overpass, get_base_info
 | 
			
		||||
from .utils import create_bbox
 | 
			
		||||
 | 
			
		||||
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
 | 
			
		||||
 | 
			
		||||
# silence the overpass logger
 | 
			
		||||
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
 | 
			
		||||
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LandmarkManager:
 | 
			
		||||
@@ -37,8 +33,7 @@ class LandmarkManager:
 | 
			
		||||
 | 
			
		||||
        with LANDMARK_PARAMETERS_PATH.open('r') as f:
 | 
			
		||||
            parameters = yaml.safe_load(f)
 | 
			
		||||
            self.max_bbox_side = parameters['city_bbox_side']
 | 
			
		||||
            self.radius_close_to = parameters['radius_close_to']
 | 
			
		||||
            self.max_bbox_side = parameters['max_bbox_side']
 | 
			
		||||
            self.church_coeff = parameters['church_coeff']
 | 
			
		||||
            self.nature_coeff = parameters['nature_coeff']
 | 
			
		||||
            self.overall_coeff = parameters['overall_coeff']
 | 
			
		||||
@@ -56,7 +51,7 @@ class LandmarkManager:
 | 
			
		||||
            self.detour_factor = parameters['detour_factor']
 | 
			
		||||
 | 
			
		||||
        # Setup the caching in the Overpass class.
 | 
			
		||||
        self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
 | 
			
		||||
        self.overpass = Overpass()
 | 
			
		||||
 | 
			
		||||
        self.logger.info('LandmakManager successfully initialized.')
 | 
			
		||||
 | 
			
		||||
@@ -80,39 +75,39 @@ class LandmarkManager:
 | 
			
		||||
        """
 | 
			
		||||
        self.logger.debug('Starting to fetch landmarks...')
 | 
			
		||||
        max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor)
 | 
			
		||||
        reachable_bbox_side = min(max_walk_dist, self.max_bbox_side)
 | 
			
		||||
        radius = min(max_walk_dist, int(self.max_bbox_side/2))
 | 
			
		||||
 | 
			
		||||
        # use set to avoid duplicates, this requires some __methods__ to be set in Landmark
 | 
			
		||||
        all_landmarks = set()
 | 
			
		||||
 | 
			
		||||
        # Create a bbox using the around technique, tuple of strings
 | 
			
		||||
        bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1]))
 | 
			
		||||
        bbox = create_bbox(center_coordinates, radius)
 | 
			
		||||
 | 
			
		||||
        # list for sightseeing
 | 
			
		||||
        if preferences.sightseeing.score != 0:
 | 
			
		||||
            self.logger.debug('Fetching sightseeing landmarks...')
 | 
			
		||||
            current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score)
 | 
			
		||||
            all_landmarks.update(current_landmarks)
 | 
			
		||||
            self.logger.debug('Fetching sightseeing clusters...')
 | 
			
		||||
            self.logger.info(f'Found {len(current_landmarks)} sightseeing landmarks')
 | 
			
		||||
 | 
			
		||||
            # special pipeline for historic neighborhoods
 | 
			
		||||
            neighborhood_manager = ClusterManager(bbox, 'sightseeing')
 | 
			
		||||
            historic_clusters = neighborhood_manager.generate_clusters()
 | 
			
		||||
            all_landmarks.update(historic_clusters)
 | 
			
		||||
            self.logger.debug('Sightseeing clusters done')
 | 
			
		||||
 | 
			
		||||
        # list for nature
 | 
			
		||||
        if preferences.nature.score != 0:
 | 
			
		||||
            self.logger.debug('Fetching nature landmarks...')
 | 
			
		||||
            current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score)
 | 
			
		||||
            all_landmarks.update(current_landmarks)
 | 
			
		||||
            self.logger.info(f'Found {len(current_landmarks)} nature landmarks')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        # list for shopping
 | 
			
		||||
        if preferences.shopping.score != 0:
 | 
			
		||||
            self.logger.debug('Fetching shopping landmarks...')
 | 
			
		||||
            current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score)
 | 
			
		||||
            self.logger.debug('Fetching shopping clusters...')
 | 
			
		||||
            self.logger.info(f'Found {len(current_landmarks)} shopping landmarks')
 | 
			
		||||
 | 
			
		||||
            # set time for all shopping activites :
 | 
			
		||||
            for landmark in current_landmarks :
 | 
			
		||||
@@ -123,8 +118,6 @@ class LandmarkManager:
 | 
			
		||||
            shopping_manager = ClusterManager(bbox, 'shopping')
 | 
			
		||||
            shopping_clusters = shopping_manager.generate_clusters()
 | 
			
		||||
            all_landmarks.update(shopping_clusters)
 | 
			
		||||
            self.logger.debug('Shopping clusters done')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        landmarks_constrained = take_most_important(all_landmarks, self.n_important)
 | 
			
		||||
@@ -179,7 +172,7 @@ class LandmarkManager:
 | 
			
		||||
        """
 | 
			
		||||
        return_list = []
 | 
			
		||||
 | 
			
		||||
        if landmarktype == 'nature' : query_conditions = []
 | 
			
		||||
        if landmarktype == 'nature' : query_conditions = None
 | 
			
		||||
        else : query_conditions = ['count_tags()>5']
 | 
			
		||||
 | 
			
		||||
        # caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
 | 
			
		||||
@@ -190,60 +183,58 @@ class LandmarkManager:
 | 
			
		||||
            osm_types = ['way', 'relation']
 | 
			
		||||
 | 
			
		||||
            if 'viewpoint' in sel :
 | 
			
		||||
                query_conditions = []
 | 
			
		||||
                query_conditions = None
 | 
			
		||||
                osm_types.append('node')
 | 
			
		||||
 | 
			
		||||
            query = self.overpass.build_query(
 | 
			
		||||
                area = bbox,
 | 
			
		||||
            # Send the overpass query
 | 
			
		||||
            try:
 | 
			
		||||
                result = self.overpass.send_query(
 | 
			
		||||
                    bbox = bbox,
 | 
			
		||||
                    osm_types = osm_types,
 | 
			
		||||
                    selector = sel,
 | 
			
		||||
                    conditions = query_conditions,        # except for nature....
 | 
			
		||||
                out = 'center'
 | 
			
		||||
                    out = 'ids center tags'
 | 
			
		||||
                    )
 | 
			
		||||
            self.logger.debug(f"Query: {query}")
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                result = self.overpass.send_query(query)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                self.logger.error(f"Error fetching landmarks: {e}")
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            return_list += self.xml_to_landmarks(result, landmarktype, preference_level)
 | 
			
		||||
            return_list += self._to_landmarks(result, landmarktype, preference_level)
 | 
			
		||||
 | 
			
		||||
        self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
 | 
			
		||||
 | 
			
		||||
        return return_list
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
 | 
			
		||||
    def _to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]:
 | 
			
		||||
        """
 | 
			
		||||
        Parse the Overpass API result and extract landmarks.
 | 
			
		||||
 | 
			
		||||
        This method processes the XML root element returned by the Overpass API and 
 | 
			
		||||
        This method processes the JSON elements returned by the Overpass API and 
 | 
			
		||||
        extracts landmarks of types 'node', 'way', and 'relation'. It retrieves 
 | 
			
		||||
        relevant information such as name, coordinates, and tags, and converts them 
 | 
			
		||||
        into Landmark objects.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
        root (ET.Element): The root element of the XML response from Overpass API.
 | 
			
		||||
        elements (list): The elements of json response from Overpass API.
 | 
			
		||||
        elem_type (str): The type of landmark (e.g., node, way, relation).
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
        list[Landmark]: A list of Landmark objects extracted from the XML data.
 | 
			
		||||
        list[Landmark]: A list of Landmark objects extracted from the JSON data.
 | 
			
		||||
        """
 | 
			
		||||
        if root is None :
 | 
			
		||||
        if elements is None :
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
        landmarks = []
 | 
			
		||||
        for osm_type in ['node', 'way', 'relation'] :
 | 
			
		||||
            for elem in root.findall(osm_type):
 | 
			
		||||
        for elem in elements:
 | 
			
		||||
            osm_type = elem.get('type')
 | 
			
		||||
 | 
			
		||||
            id, coords, name = get_base_info(elem, osm_type, with_name=True)
 | 
			
		||||
 | 
			
		||||
            if name is None or coords is None :
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
                tags = elem.findall('tag')
 | 
			
		||||
            tags = elem.get('tags')
 | 
			
		||||
 | 
			
		||||
            # Convert this to Landmark object
 | 
			
		||||
            landmark = Landmark(name=name,
 | 
			
		||||
@@ -254,10 +245,10 @@ class LandmarkManager:
 | 
			
		||||
                                attractiveness=0,
 | 
			
		||||
                                n_tags=len(tags))
 | 
			
		||||
 | 
			
		||||
            # self.logger.debug('added landmark.')
 | 
			
		||||
 | 
			
		||||
            # Browse through tags to add information to landmark.
 | 
			
		||||
                for tag in tags:
 | 
			
		||||
                    key = tag.get('k')
 | 
			
		||||
                    value = tag.get('v')
 | 
			
		||||
            for key, value in tags.items():
 | 
			
		||||
 | 
			
		||||
                # Skip this landmark if not suitable.
 | 
			
		||||
                if key == 'building:part' and value == 'yes' :
 | 
			
		||||
 
 | 
			
		||||
@@ -1,10 +1,9 @@
 | 
			
		||||
"""Module for finding public toilets around given coordinates."""
 | 
			
		||||
import logging
 | 
			
		||||
import xml.etree.ElementTree as ET
 | 
			
		||||
 | 
			
		||||
from ..overpass.overpass import Overpass, get_base_info
 | 
			
		||||
from ..structs.landmark import Toilets
 | 
			
		||||
from ..constants import OSM_CACHE_DIR
 | 
			
		||||
from .utils import create_bbox
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# silence the overpass logger
 | 
			
		||||
@@ -41,7 +40,7 @@ class ToiletsManager:
 | 
			
		||||
        self.location = location
 | 
			
		||||
 | 
			
		||||
        # Setup the caching in the Overpass class.
 | 
			
		||||
        self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
 | 
			
		||||
        self.overpass = Overpass()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def generate_toilet_list(self) -> list[Toilets] :
 | 
			
		||||
@@ -53,51 +52,49 @@ class ToiletsManager:
 | 
			
		||||
        list[Toilets]: A list of `Toilets` objects containing detailed information 
 | 
			
		||||
                       about the toilets found around the given coordinates.
 | 
			
		||||
        """
 | 
			
		||||
        bbox = tuple((self.radius, self.location[0], self.location[1]))
 | 
			
		||||
        bbox = create_bbox(self.location, self.radius)
 | 
			
		||||
        osm_types = ['node', 'way', 'relation']
 | 
			
		||||
        toilets_list = []
 | 
			
		||||
 | 
			
		||||
        query = self.overpass.build_query(
 | 
			
		||||
                area = bbox,
 | 
			
		||||
        query = Overpass.build_query(
 | 
			
		||||
            bbox = bbox,
 | 
			
		||||
            osm_types = osm_types,
 | 
			
		||||
            selector = '"amenity"="toilets"',
 | 
			
		||||
            out = 'ids center tags'
 | 
			
		||||
            )
 | 
			
		||||
        self.logger.debug(f"Query: {query}")
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            result = self.overpass.send_query(query)
 | 
			
		||||
            result = self.overpass.fetch_data_from_api(query_str=query)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.logger.error(f"Error fetching landmarks: {e}")
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        toilets_list = self.xml_to_toilets(result)
 | 
			
		||||
        toilets_list = self.to_toilets(result)
 | 
			
		||||
 | 
			
		||||
        return toilets_list
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def xml_to_toilets(self, root: ET.Element) -> list[Toilets]:
 | 
			
		||||
    def to_toilets(self, elements: list) -> list[Toilets]:
 | 
			
		||||
        """
 | 
			
		||||
        Parse the Overpass API result and extract landmarks.
 | 
			
		||||
 | 
			
		||||
        This method processes the XML root element returned by the Overpass API and 
 | 
			
		||||
        This method processes the JSON elements returned by the Overpass API and 
 | 
			
		||||
        extracts landmarks of types 'node', 'way', and 'relation'. It retrieves 
 | 
			
		||||
        relevant information such as name, coordinates, and tags, and converts them 
 | 
			
		||||
        into Landmark objects.
 | 
			
		||||
 | 
			
		||||
        Args:
 | 
			
		||||
        root (ET.Element): The root element of the XML response from Overpass API.
 | 
			
		||||
        list (osm elements): The root element of the JSON response from Overpass API.
 | 
			
		||||
        elem_type (str): The type of landmark (e.g., node, way, relation).
 | 
			
		||||
 | 
			
		||||
        Returns:
 | 
			
		||||
        list[Landmark]: A list of Landmark objects extracted from the XML data.
 | 
			
		||||
        list[Landmark]: A list of Landmark objects extracted from the JSON data.
 | 
			
		||||
        """
 | 
			
		||||
        if root is None :
 | 
			
		||||
        if elements is None :
 | 
			
		||||
            return []
 | 
			
		||||
 | 
			
		||||
        toilets_list = []
 | 
			
		||||
        for osm_type in ['node', 'way', 'relation'] :
 | 
			
		||||
            for elem in root.findall(osm_type):
 | 
			
		||||
        for elem in elements:
 | 
			
		||||
            osm_type = elem.get('type')
 | 
			
		||||
            # Get coordinates and append them to the points list
 | 
			
		||||
            _, coords = get_base_info(elem, osm_type)
 | 
			
		||||
            if coords is None :
 | 
			
		||||
@@ -106,7 +103,7 @@ class ToiletsManager:
 | 
			
		||||
            toilets = Toilets(location=coords)
 | 
			
		||||
 | 
			
		||||
            # Extract tags as a dictionary
 | 
			
		||||
                tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')}
 | 
			
		||||
            tags = elem.get('tags')
 | 
			
		||||
 | 
			
		||||
            if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes':
 | 
			
		||||
                toilets.wheelchair = True
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								backend/src/utils/utils.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,27 @@
 | 
			
		||||
"""Various helper functions"""
 | 
			
		||||
import math as m
 | 
			
		||||
 | 
			
		||||
def create_bbox(coords: tuple[float, float], radius: int):
 | 
			
		||||
    """
 | 
			
		||||
    Create a bounding box around the given coordinates.
 | 
			
		||||
 | 
			
		||||
    Args:
 | 
			
		||||
        coords (tuple[float, float]): The latitude and longitude of the center of the bounding box.
 | 
			
		||||
        radius (int): The half-side length of the bounding box in meters.
 | 
			
		||||
 | 
			
		||||
    Returns:
 | 
			
		||||
        tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude
 | 
			
		||||
                                            defining the bounding box.
 | 
			
		||||
    """
 | 
			
		||||
    # Earth's radius in meters
 | 
			
		||||
    R = 6378137
 | 
			
		||||
    lat, lon = coords
 | 
			
		||||
    d_lat = radius / R
 | 
			
		||||
    d_lon = radius / (R * m.cos(m.pi * lat / 180))
 | 
			
		||||
 | 
			
		||||
    lat_min = lat - d_lat * 180 / m.pi
 | 
			
		||||
    lat_max = lat + d_lat * 180 / m.pi
 | 
			
		||||
    lon_min = lon - d_lon * 180 / m.pi
 | 
			
		||||
    lon_max = lon + d_lon * 180 / m.pi
 | 
			
		||||
 | 
			
		||||
    return (lat_min, lon_min, lat_max, lon_max)
 | 
			
		||||
		Reference in New Issue
	
	Block a user