linting
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 2m51s
Run linting on the backend code / Build (pull_request) Successful in 37s
Run testing on the backend code / Build (pull_request) Failing after 3m8s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 24s

This commit is contained in:
Helldragon67 2025-01-23 11:28:41 +01:00
parent 78f1dcaab4
commit b9356dc4ee
8 changed files with 173 additions and 91 deletions

View File

@ -1,3 +1,4 @@
"""Module defining the caching strategy for overpass requests."""
import os
import xml.etree.ElementTree as ET
import hashlib
@ -15,18 +16,41 @@ def get_cache_key(query: str) -> str:
class CachingStrategyBase:
"""
Base class for implementing caching strategies.
This class defines the structure for a caching strategy with basic methods
that must be implemented by subclasses. Subclasses should define how to
retrieve, store, and close the cache.
"""
def get(self, key):
"""Retrieve the cached data associated with the provided key."""
raise NotImplementedError('Subclass should implement get')
def set(self, key, data):
def set(self, key, value):
"""Store data in the cache with the specified key."""
raise NotImplementedError('Subclass should implement set')
def close(self):
pass
"""Clean up or close any resources used by the caching strategy."""
# For later use if xml does not suit well
class JSONCache(CachingStrategyBase):
"""
A caching strategy that stores and retrieves data in JSON format.
This class provides methods to cache data as JSON files in a specified directory.
The directory is automatically suffixed with '_JSON' to distinguish it from other
caching strategies. The data is stored and retrieved using JSON serialization.
Args:
cache_dir (str): The base directory where JSON cache files will be stored.
Defaults to 'OSM_CACHE_DIR' with a '_JSON' suffix.
Methods:
get(key): Retrieve cached data from a JSON file associated with the given key.
set(key, value): Store data in a JSON file with the specified key.
"""
def __init__(self, cache_dir=OSM_CACHE_DIR):
# Add the class name as a suffix to the directory
self._cache_dir = f'{cache_dir}_JSON'
@ -39,16 +63,31 @@ class JSONCache(CachingStrategyBase):
def get(self, key):
filename = self._filename(key)
if os.path.exists(filename):
with open(filename, 'r') as file:
with open(filename, 'r', encoding='utf-8') as file:
return ujson.load(file)
return None
def set(self, key, value):
with open(self._filename(key), 'w') as file:
with open(self._filename(key), 'w', encoding='utf-8') as file:
ujson.dump(value, file)
class XMLCache(CachingStrategyBase):
"""
A caching strategy that stores and retrieves data in XML format.
This class provides methods to cache data as XML files in a specified directory.
The directory is automatically suffixed with '_XML' to distinguish it from other
caching strategies. The data is stored and retrieved using XML serialization.
Args:
cache_dir (str): The base directory where XML cache files will be stored.
Defaults to 'OSM_CACHE_DIR' with a '_XML' suffix.
Methods:
get(key): Retrieve cached data from a XML file associated with the given key.
set(key, value): Store data in a XML file with the specified key.
"""
def __init__(self, cache_dir=OSM_CACHE_DIR):
# Add the class name as a suffix to the directory
self._cache_dir = f'{cache_dir}_XML'
@ -84,13 +123,22 @@ class XMLCache(CachingStrategyBase):
class CachingStrategy:
__strategy = XMLCache() # Default caching strategy
"""
A class to manage different caching strategies.
# Dictionary to map string identifiers to caching strategy classes
This class provides an interface to switch between different caching strategies
(e.g., XMLCache, JSONCache) dynamically. It allows caching data in different formats,
depending on the strategy being used. By default, it uses the XMLCache strategy.
Attributes:
__strategy (CachingStrategyBase): The currently active caching strategy.
__strategies (dict): A mapping between strategy names (as strings) and their corresponding
classes, allowing dynamic selection of caching strategies.
"""
__strategy = XMLCache() # Default caching strategy
__strategies = {
'XML': XMLCache,
'JSON': JSONCache,
# Add more strategies here if needed
}
@classmethod
@ -105,13 +153,13 @@ class CachingStrategy:
# If a previous strategy exists, close it
if cls.__strategy:
cls.__strategy.close()
# Retrieve the strategy class based on the strategy name
strategy_class = cls.__strategies.get(strategy_name)
if not strategy_class:
raise ValueError(f"Unknown caching strategy: {strategy_name}")
# Instantiate the new strategy with the provided arguments
cls.__strategy = strategy_class(**kwargs)
return cls.__strategy
@ -129,5 +177,3 @@ class CachingStrategy:
if not cls.__strategy:
raise RuntimeError("Caching strategy has not been set.")
cls.__strategy.set(key, value)

View File

@ -1,3 +1,4 @@
"""Module allowing connexion to overpass api and fectch data from OSM."""
from typing import Literal, List
import urllib
import json
@ -9,8 +10,8 @@ from .caching_strategy import get_cache_key, CachingStrategy
ElementTypes = List[Literal['way', 'node', 'relation']]
def build_query(area: tuple, element_types: ElementTypes, selector: str,
conditions=[], out='center'):
def build_query(area: tuple, element_types: ElementTypes,
selector: str, conditions=[], out='center'):
"""
Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
@ -62,7 +63,7 @@ def build_query(area: tuple, element_types: ElementTypes, selector: str,
return query
def send_overpass_query(query: str) -> dict:
def send_query(query: str) -> dict:
"""
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
@ -96,7 +97,7 @@ def send_overpass_query(query: str) -> dict:
try:
# Create a Request object with the specified URL, data, and headers
request = urllib.request.Request(overpass_url, data=data, headers=headers)
# Send the request and read the response
with urllib.request.urlopen(request) as response:
# Read and decode the response

View File

@ -122,7 +122,7 @@ def dict_to_selector_list(d: dict) -> list:
return return_list
def send_overpass_query(query: str) -> dict:
def send_query(query: str) -> dict:
"""
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
@ -280,7 +280,7 @@ for sel in dict_to_selector_list(amenity_selector):
out='center')
print(query + '\n')
root = send_overpass_query(query)
root = send_query(query)
landmarks += parse_result(root, 'nature')

View File

@ -5,11 +5,9 @@ from typing import Literal
import numpy as np
from sklearn.cluster import DBSCAN
from pydantic import BaseModel
# from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
# from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..overpass.overpass import build_query, send_overpass_query
from ..overpass.caching_strategy import CachingStrategy
from ..overpass.overpass import build_query, send_query
from ..overpass.caching_strategy import CachingStrategy
from ..structs.landmark import Landmark
from .get_time_distance import get_distance
from ..constants import OSM_CACHE_DIR
@ -81,8 +79,6 @@ class ClusterManager:
Args:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
"""
# self.overpass = Overpass()
# CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
CachingStrategy.use('XML', cache_dir=OSM_CACHE_DIR)
self.cluster_type = cluster_type
@ -94,6 +90,8 @@ class ClusterManager:
osm_types = ['way']
sel = '"historic"~"^(monument|building|yes)$"'
out = 'ids center'
else :
raise NotImplementedError("Please choose only an available option for cluster detection")
# Initialize the points for cluster detection
query = build_query(
@ -105,25 +103,25 @@ class ClusterManager:
self.logger.debug(f"Cluster query: {query}")
try:
result = send_overpass_query(query)
result = send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
if result is None :
self.logger.error(f"Error fetching {cluster_type} clusters, overpass query returned None.")
self.valid = False
else :
points = []
for osm_type in osm_types :
for elem in result.findall(osm_type):
center = elem.find('center')
if osm_type != 'node' :
center = elem.find('center')
lat = float(center.get('lat'))
lon = float(center.get('lon'))
points.append(tuple((lat, lon)))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
@ -136,7 +134,7 @@ class ClusterManager:
if self.cluster_type == 'shopping' and len(self.all_points) > 200 :
dbscan = DBSCAN(eps=0.00118, min_samples=15, algorithm='kd_tree') # for large cities
elif self.cluster_type == 'sightseeing' :
dbscan = DBSCAN(eps=0.003, min_samples=10, algorithm='kd_tree') # for historic neighborhoods
dbscan = DBSCAN(eps=0.0025, min_samples=15, algorithm='kd_tree') # for historic neighborhoods
else :
dbscan = DBSCAN(eps=0.00075, min_samples=10, algorithm='kd_tree') # for small cities
@ -249,7 +247,7 @@ class ClusterManager:
)
try:
result = send_overpass_query(query)
result = send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
@ -270,7 +268,7 @@ class ClusterManager:
if osm_type != 'node' :
lat = float(center.get('lat'))
lon = float(center.get('lon'))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
@ -290,7 +288,7 @@ class ClusterManager:
return Landmark(
name=new_name,
type=self.cluster_type,
location=cluster.centroid, # TODO: use the fact the we can also recognize streets.
location=cluster.centroid, # later: use the fact the we can also recognize streets.
attractiveness=cluster.importance,
n_tags=0,
osm_id=osm_id,

View File

@ -1,14 +1,14 @@
"""Module used to import data from OSM and arrange them in categories."""
import logging
import yaml
import xml.etree.ElementTree as ET
import yaml
from ..structs.preferences import Preferences
from ..structs.landmark import Landmark
from .take_most_important import take_most_important
from .cluster_manager import ClusterManager
from ..overpass.overpass import build_query, send_overpass_query
from ..overpass.overpass import build_query, send_query
from ..overpass.caching_strategy import CachingStrategy
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
@ -205,11 +205,11 @@ class LandmarkManager:
self.logger.debug(f"Query: {query}")
try:
result = send_overpass_query(query)
result = send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
return_list += self.xml_to_landmarks(result, landmarktype, preference_level)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
@ -240,23 +240,27 @@ class LandmarkManager:
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
center = elem.find('center')
tags = elem.findall('tag')
# Extract the center latitude and longitude if available.
if name is not None and center is not None:
if osm_type != 'node' :
center = elem.find('center')
lat = float(center.get('lat'))
lon = float(center.get('lon'))
coords = tuple((lat, lon))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
coords = tuple((lat, lon))
if name is None or coords is None :
continue
# Convert this to Landmark object
landmark = Landmark(name=name,
type=landmarktype,
location=coords,
osm_id=elem.get('id'),
osm_id=elem.get('id'),
osm_type=osm_type,
attractiveness=0,
n_tags=len(tags))
@ -277,7 +281,7 @@ class LandmarkManager:
break
# if value == 'apartments' :
# break
# Fill in the other attributes.
if key == 'image' :
landmark.image_url = value
@ -291,7 +295,7 @@ class LandmarkManager:
landmark.name_en = value
if 'building:' in key or 'pay' in key :
landmark.n_tags -= 1
# Set the duration.
if value in ['museum', 'aquarium', 'planetarium'] :
landmark.duration = 60
@ -304,12 +308,10 @@ class LandmarkManager:
else :
landmark.duration = 5
else:
# add them to cache here before setting the score
# name should be : 'osm_type + str(osm_id) + 'json'
else:
self.set_landmark_score(landmark, landmarktype, preference_level)
landmarks.append(landmark)
# self.logger.debug('new landmark added')
continue
return landmarks

View File

@ -153,10 +153,10 @@ class Optimizer:
up_ind_y = upper_ind[1]
# Loop over the upper triangular indices, excluding diagonal elements
for i in range(len(up_ind_x)):
if up_ind_x[i] != up_ind_y[i]:
for i, up_ind in enumerate(up_ind_x):
if up_ind != up_ind_y[i]:
# Add (L*L-L)/2 constraints to break symmetry
prob += (x[up_ind_x[i]*L + up_ind_y[i]] + x[up_ind_y[i]*L + up_ind_x[i]] <= 1)
prob += (x[up_ind*L + up_ind_y[i]] + x[up_ind_y[i]*L + up_ind] <= 1)
def init_eq_not_stay(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):

View File

@ -6,7 +6,8 @@ from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
from ..structs.landmark import Landmark
from . import get_time_distance, take_most_important
from .get_time_distance import get_time
from .take_most_important import take_most_important
from .optimizer import Optimizer
from ..constants import OPTIMIZER_PARAMETERS_PATH
@ -195,7 +196,7 @@ class Refiner :
# Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time_distance.get_time(current_landmark.location, lm.location))
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location))
path.append(nearest_landmark)
coordinates.append(nearest_landmark.location)
current_landmark = nearest_landmark
@ -238,7 +239,7 @@ class Refiner :
if self.is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
return take_most_important.take_most_important(second_order_landmarks, int(self.max_landmarks_refiner*0.75))
return take_most_important(second_order_landmarks, int(self.max_landmarks_refiner*0.75))
# Try fix the shortest path using shapely

View File

@ -1,8 +1,9 @@
"""Module for finding public toilets around given coordinates."""
import logging
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
import xml.etree.ElementTree as ET
from ..overpass.overpass import build_query, send_query
from ..overpass.caching_strategy import CachingStrategy
from ..structs.landmark import Toilets
from ..constants import OSM_CACHE_DIR
@ -39,8 +40,7 @@ class ToiletsManager:
self.radius = radius
self.location = location
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
CachingStrategy.use('XML', cacheDir=OSM_CACHE_DIR)
def generate_toilet_list(self) -> list[Toilets] :
@ -53,49 +53,83 @@ class ToiletsManager:
about the toilets found around the given coordinates.
"""
bbox = tuple((f"around:{self.radius}", str(self.location[0]), str(self.location[1])))
osm_types = ['node', 'way', 'relation']
toilets_list = []
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
# selector can in principle be a list already,
# but it generates the intersection of the queries
# we want the union
query = build_query(
area = bbox,
element_types = osm_types,
selector = ['"amenity"="toilets"'],
includeCenter = True,
out = 'center'
out = 'ids center tags'
)
self.logger.debug(f"Query: {query}")
try:
result = self.overpass.query(query)
result = send_query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
return None
for elem in result.elements():
location = (elem.centerLat(), elem.centerLon())
# handle unprecise and no-name locations
if location[0] is None:
location = (elem.lat(), elem.lon())
else :
continue
toilets = Toilets(location=location)
if 'wheelchair' in elem.tags().keys() and elem.tag('wheelchair') == 'yes':
toilets.wheelchair = True
if 'changing_table' in elem.tags().keys() and elem.tag('changing_table') == 'yes':
toilets.changing_table = True
if 'fee' in elem.tags().keys() and elem.tag('fee') == 'yes':
toilets.fee = True
if 'opening_hours' in elem.tags().keys() :
toilets.opening_hours = elem.tag('opening_hours')
toilets_list.append(toilets)
toilets_list = self.xml_to_toilets(result)
return toilets_list
def xml_to_toilets(self, root: ET.Element) -> list[Toilets]:
"""
Parse the Overpass API result and extract landmarks.
This method processes the XML root element returned by the Overpass API and
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
relevant information such as name, coordinates, and tags, and converts them
into Landmark objects.
Args:
root (ET.Element): The root element of the XML response from Overpass API.
elem_type (str): The type of landmark (e.g., node, way, relation).
Returns:
list[Landmark]: A list of Landmark objects extracted from the XML data.
"""
if root is None :
return []
toilets_list = []
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
center = elem.find('center')
# Extract the center latitude and longitude if available.
if osm_type != 'node' :
lat = float(center.get('lat'))
lon = float(center.get('lon'))
location = tuple((lat, lon))
else :
lat = float(elem.get('lat'))
lon = float(elem.get('lon'))
location = tuple((lat, lon))
if location is None :
continue
toilets = Toilets(location=location)
# Extract tags as a dictionary
tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')}
if 'wheelchair' in tags().keys() and tags['wheelchair'] == 'yes':
toilets.wheelchair = True
if 'changing_table' in tags().keys() and tags['changing_table'] == 'yes':
toilets.changing_table = True
if 'fee' in tags().keys() and tags['fee'] == 'yes':
toilets.fee = True
if 'opening_hours' in tags().keys() :
toilets.opening_hours = elem.tag('opening_hours')
toilets_list.append(toilets)
return toilets_list