amazing cache #55

Merged
kscheidecker merged 22 commits from backend/grid-based-cache into main 2025-01-30 12:40:36 +00:00
7 changed files with 196 additions and 193 deletions
Showing only changes of commit 978cae290b - Show all commits

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,5 @@
import os import os
import xml.etree.ElementTree as ET import json
import hashlib import hashlib
from ..constants import OSM_CACHE_DIR, OSM_TYPES from ..constants import OSM_CACHE_DIR, OSM_TYPES
@ -37,9 +37,9 @@ class CachingStrategyBase:
"""Clean up or close any resources used by the caching strategy.""" """Clean up or close any resources used by the caching strategy."""
class XMLCache(CachingStrategyBase): class JSONCache(CachingStrategyBase):
""" """
A caching strategy that stores and retrieves data in XML format. A caching strategy that stores and retrieves data in JSON format.
""" """
def __init__(self, cache_dir=OSM_CACHE_DIR): def __init__(self, cache_dir=OSM_CACHE_DIR):
# Add the class name as a suffix to the directory # Add the class name as a suffix to the directory
@ -48,27 +48,28 @@ class XMLCache(CachingStrategyBase):
os.makedirs(self._cache_dir) os.makedirs(self._cache_dir)
def _filename(self, key): def _filename(self, key):
return os.path.join(self._cache_dir, f'{key}.xml') return os.path.join(self._cache_dir, f'{key}.json')
def get(self, key): def get(self, key):
"""Retrieve XML data from the cache and parse it as an ElementTree.""" """Retrieve JSON data from the cache and parse it as an ElementTree."""
filename = self._filename(key) filename = self._filename(key)
if os.path.exists(filename): if os.path.exists(filename):
try: try:
# Parse and return the cached XML data # Open and parse the cached JSON data
tree = ET.parse(filename) with open(filename, 'r', encoding='utf-8') as file:
return tree.getroot() # Return the root element of the parsed XML data = json.load(file)
except ET.ParseError: return data # Return the parsed JSON data
return None except json.JSONDecodeError as err:
return None # Return None if parsing fails
return None return None
def set(self, key, value): def set(self, key, value):
"""Save the XML data as an ElementTree to the cache.""" """Save the JSON data as an ElementTree to the cache."""
filename = self._filename(key) filename = self._filename(key)
tree = ET.ElementTree(value) # value is expected to be an ElementTree root element
try: try:
with open(filename, 'wb') as file: # Write the JSON data to the cache file
tree.write(file, encoding='utf-8', xml_declaration=True) with open(filename, 'w', encoding='utf-8') as file:
json.dump(value, file, ensure_ascii=False, indent=4)
except IOError as e: except IOError as e:
raise IOError(f"Error writing to cache file: {filename} - {e}") from e raise IOError(f"Error writing to cache file: {filename} - {e}") from e
@ -77,23 +78,22 @@ class XMLCache(CachingStrategyBase):
"""Create an empty placeholder cache entry for a future fill.""" """Create an empty placeholder cache entry for a future fill."""
hollow_key = f'hollow_{key}' hollow_key = f'hollow_{key}'
filename = self._filename(hollow_key) filename = self._filename(hollow_key)
# Create the root element <cache>
root = ET.Element("params")
# Add sub-elements with provided values
ET.SubElement(root, "key").text = key
ET.SubElement(root, "cell").text = f"({cell[0]}, {cell[1]})"
ET.SubElement(root, "osm_types").text = ','.join(osm_types)
ET.SubElement(root, "selector").text = selector
ET.SubElement(root, "conditions").text = ','.join(conditions) if conditions else "none"
ET.SubElement(root, "out").text = out
# Create an ElementTree object from the root # Create the hollow JSON structure
tree = ET.ElementTree(root) hollow_data = {
"key": key,
# Write the XML to the file "cell": list(cell),
with open(filename, 'wb') as file: "osm_types": list(osm_types),
tree.write(file, encoding='utf-8', xml_declaration=True) "selector": selector,
"conditions": conditions if conditions else "none",
"out": out
}
# Write the hollow data to the cache file
try:
with open(filename, 'w', encoding='utf-8') as file:
json.dump(hollow_data, file, ensure_ascii=False, indent=4)
except IOError as e:
raise IOError(f"Error writing hollow cache to file: {filename} - {e}") from e
def close(self): def close(self):
"""Cleanup method, if needed.""" """Cleanup method, if needed."""
@ -103,13 +103,13 @@ class CachingStrategy:
""" """
A class to manage different caching strategies. A class to manage different caching strategies.
""" """
__strategy = XMLCache() # Default caching strategy __strategy = JSONCache() # Default caching strategy
__strategies = { __strategies = {
'XML': XMLCache, 'JSON': JSONCache,
} }
@classmethod @classmethod
def use(cls, strategy_name='XML', **kwargs): def use(cls, strategy_name='JSON', **kwargs):
if cls.__strategy: if cls.__strategy:
cls.__strategy.close() cls.__strategy.close()

View File

@ -3,7 +3,7 @@ import os
import urllib import urllib
import math import math
import logging import logging
import xml.etree.ElementTree as ET import json
from .caching_strategy import get_cache_key, CachingStrategy from .caching_strategy import get_cache_key, CachingStrategy
from ..constants import OSM_CACHE_DIR, OSM_TYPES from ..constants import OSM_CACHE_DIR, OSM_TYPES
@ -20,7 +20,7 @@ class Overpass :
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def __init__(self, caching_strategy: str = 'XML', cache_dir: str = OSM_CACHE_DIR) : def __init__(self, caching_strategy: str = 'JSON', cache_dir: str = OSM_CACHE_DIR) :
""" """
Initialize the Overpass instance with the url, headers and caching strategy. Initialize the Overpass instance with the url, headers and caching strategy.
""" """
@ -30,9 +30,9 @@ class Overpass :
def send_query(self, bbox: tuple, osm_types: OSM_TYPES, def send_query(self, bbox: tuple, osm_types: OSM_TYPES,
selector: str, conditions=[], out='center') -> ET: selector: str, conditions=[], out='center'):
""" """
Sends the Overpass QL query to the Overpass API and returns the parsed XML response. Sends the Overpass QL query to the Overpass API and returns the parsed json response.
Args: Args:
bbox (tuple): Bounding box for the query. bbox (tuple): Bounding box for the query.
@ -42,7 +42,7 @@ class Overpass :
out (str): Output format ('center', 'body', etc.). Defaults to 'center'. out (str): Output format ('center', 'body', etc.). Defaults to 'center'.
Returns: Returns:
ET.Element: Parsed XML response from the Overpass API, or cached data if available. dict: Parsed json response from the Overpass API, or cached data if available.
""" """
# Determine which grid cells overlap with this bounding box. # Determine which grid cells overlap with this bounding box.
overlapping_cells = Overpass._get_overlapping_cells(bbox) overlapping_cells = Overpass._get_overlapping_cells(bbox)
@ -59,10 +59,10 @@ class Overpass :
# Missing data: Make a query to Overpass API # Missing data: Make a query to Overpass API
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out) query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
self.fetch_data_from_api(query_str) return self.fetch_data_from_api(query_str)
def fetch_data_from_api(self, query_str: str, cache_key: str = None) -> ET.Element: def fetch_data_from_api(self, query_str: str, cache_key: str = None) -> dict:
""" """
Fetch data from the Overpass API and update the cache. Fetch data from the Overpass API and update the cache.
@ -72,33 +72,37 @@ class Overpass :
hollow_cache_keys (list): Cache keys for missing data to be updated. hollow_cache_keys (list): Cache keys for missing data to be updated.
Returns: Returns:
ET.Element: Combined cached and fetched data. dict: Combined cached and fetched data.
""" """
try: try:
data = urllib.parse.urlencode({'data': query_str}).encode('utf-8') data = urllib.parse.urlencode({'data': query_str}).encode('utf-8')
request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers) request = urllib.request.Request(self.overpass_url, data=data, headers=self.headers)
with urllib.request.urlopen(request) as response: with urllib.request.urlopen(request) as response:
response_data = response.read().decode('utf-8') response_data = response.read().decode('utf-8') # Convert the HTTPResponse to a string
root = ET.fromstring(response_data) data = json.loads(response_data) # Load the JSON from the string
elements = data.get('elements', [])
if cache_key is not None : if cache_key is not None :
self.caching_strategy.set(cache_key, root) self.caching_strategy.set(cache_key, elements)
self.logger.debug(f'Cache set.') self.logger.debug(f'Cache set.')
else : else :
self.logger.debug(f'Cache miss. Fetching data through Overpass\nQuery = {query_str}') self.logger.debug(f'Cache miss. Fetching data through Overpass\nQuery = {query_str}')
return root return elements
except urllib.error.URLError as e: except urllib.error.URLError as e:
self.logger.error(f"Error connecting to Overpass API: {e}") self.logger.error(f"Error connecting to Overpass API: {e}")
raise ConnectionError(f"Error connecting to Overpass API: {e}") raise ConnectionError(f"Error connecting to Overpass API: {e}")
except Exception as exc :
raise Exception(f'An unexpected error occured: {str(exc)}') from exc
def fill_cache(self, xml_string: str) :
def fill_cache(self, json_data) :
""" """
Fill cache with data by using a hollow cache entry's information. Fill cache with data by using a hollow cache entry's information.
""" """
query_str, cache_key = Overpass._build_query_from_hollow(xml_string) query_str, cache_key = Overpass._build_query_from_hollow(json_data)
self.fetch_data_from_api(query_str, cache_key) self.fetch_data_from_api(query_str, cache_key)
@ -133,7 +137,7 @@ class Overpass :
if not isinstance(osm_types, list) : if not isinstance(osm_types, list) :
osm_types = [osm_types] osm_types = [osm_types]
query = '(' query = '[out:json];('
# convert the bbox to string. # convert the bbox to string.
bbox_str = f"({','.join(map(str, bbox))})" bbox_str = f"({','.join(map(str, bbox))})"
@ -190,21 +194,21 @@ class Overpass :
@staticmethod @staticmethod
def _build_query_from_hollow(xml_string): def _build_query_from_hollow(json_data):
""" """
Build query string using information from a hollow cache entry. Build query string using information from a hollow cache entry.
""" """
# Parse the XML string into an ElementTree object # Parse the JSON string into a dictionary
root = ET.fromstring(xml_string) data = json.loads(json_data)
# Extract values from the XML tree # Extract values from the JSON object
key = root.find('key').text key = data.get('key')
cell = tuple(map(float, root.find('cell').text.strip('()').split(','))) cell = tuple(data.get('cell'))
bbox = Overpass._get_bbox_from_grid_cell(cell[0], cell[1]) bbox = Overpass._get_bbox_from_grid_cell(cell[0], cell[1])
osm_types = root.find('osm_types').text.split(',') osm_types = data.get('osm_types')
selector = root.find('selector').text selector = data.get('selector')
conditions = root.find('conditions').text.split(',') if root.find('conditions').text != "none" else [] conditions = data.get('conditions') if data.get('conditions') != "none" else []
out = root.find('out').text out = data.get('out')
query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out) query_str = Overpass.build_query(bbox, osm_types, selector, conditions, out)
@ -265,14 +269,14 @@ class Overpass :
""" """
Combines data from multiple cached responses into a single result. Combines data from multiple cached responses into a single result.
""" """
combined_data = ET.Element("osm") combined_data = []
for cached_data in cached_data_list: for cached_data in cached_data_list:
for element in cached_data: for element in cached_data:
combined_data.append(element) combined_data.append(element)
return combined_data return combined_data
def get_base_info(elem: ET.Element, osm_type: OSM_TYPES, with_name=False) : def get_base_info(elem: dict, osm_type: OSM_TYPES, with_name=False) :
""" """
Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element. Extracts base information (coordinates, OSM ID, and optionally a name) from an OSM element.
@ -281,7 +285,7 @@ def get_base_info(elem: ET.Element, osm_type: OSM_TYPES, with_name=False) :
extracting coordinates either directly or from a center tag, depending on the element type. extracting coordinates either directly or from a center tag, depending on the element type.
Args: Args:
elem (ET.Element): The XML element representing the OSM entity. elem (dict): The JSON element representing the OSM entity.
osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates osm_type (str): The type of the OSM entity (e.g., 'node', 'way'). If 'node', the coordinates
are extracted directly from the element; otherwise, from the 'center' tag. are extracted directly from the element; otherwise, from the 'center' tag.
with_name (bool): Whether to extract and return the name of the element. If True, it attempts with_name (bool): Whether to extract and return the name of the element. If True, it attempts
@ -295,7 +299,7 @@ def get_base_info(elem: ET.Element, osm_type: OSM_TYPES, with_name=False) :
""" """
# 1. extract coordinates # 1. extract coordinates
if osm_type != 'node' : if osm_type != 'node' :
center = elem.find('center') center = elem.get('center')
lat = float(center.get('lat')) lat = float(center.get('lat'))
lon = float(center.get('lon')) lon = float(center.get('lon'))
@ -310,7 +314,7 @@ def get_base_info(elem: ET.Element, osm_type: OSM_TYPES, with_name=False) :
# 3. Extract name if specified and return # 3. Extract name if specified and return
if with_name : if with_name :
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None name = elem.get('tags', {}).get('name')
return osm_id, coords, name return osm_id, coords, name
else : else :
return osm_id, coords return osm_id, coords
@ -318,7 +322,7 @@ def get_base_info(elem: ET.Element, osm_type: OSM_TYPES, with_name=False) :
def fill_cache(): def fill_cache():
overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) overpass = Overpass(caching_strategy='JSON', cache_dir=OSM_CACHE_DIR)
with os.scandir(OSM_CACHE_DIR) as it: with os.scandir(OSM_CACHE_DIR) as it:
for entry in it: for entry in it:
@ -326,10 +330,10 @@ def fill_cache():
# Read the whole file content as a string # Read the whole file content as a string
with open(entry.path, 'r') as f: with open(entry.path, 'r') as f:
xml_string = f.read() json_data = f.read()
# Fill the cache with the query and key # Fill the cache with the query and key
overpass.fill_cache(xml_string) overpass.fill_cache(json_data)
# Now delete the file as the cache is filled # Now delete the file as the cache is filled
os.remove(entry.path) os.remove(entry.path)

View File

@ -28,7 +28,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
json={ json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5}, "preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5}, "nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5}, "shopping": {"type": "shopping", "score": 0},
"max_time_minute": duration_minutes, "max_time_minute": duration_minutes,
"detour_tolerance_minute": 0}, "detour_tolerance_minute": 0},
"start": [48.084588, 7.280405] "start": [48.084588, 7.280405]
@ -56,6 +56,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
# assert 2!= 3 # assert 2!= 3
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
""" """
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area. Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
@ -97,7 +98,6 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}" assert duration_minutes*0.8 < result['total_time'], f"Trip too short: {result['total_time']} instead of {duration_minutes}"
assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}" assert duration_minutes*1.2 > result['total_time'], f"Trip too long: {result['total_time']} instead of {duration_minutes}"
def test_cologne(client, request) : # pylint: disable=redefined-outer-name def test_cologne(client, request) : # pylint: disable=redefined-outer-name
""" """
Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area. Test n°3 : Custom test in Cologne to ensure proper decision making in crowded area.

View File

@ -9,7 +9,6 @@ from pydantic import BaseModel
from ..overpass.overpass import Overpass, get_base_info from ..overpass.overpass import Overpass, get_base_info
from ..structs.landmark import Landmark from ..structs.landmark import Landmark
from .get_time_distance import get_distance from .get_time_distance import get_distance
from ..constants import OSM_CACHE_DIR
from .utils import create_bbox from .utils import create_bbox
@ -81,7 +80,7 @@ class ClusterManager:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon). bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
""" """
# Setup the caching in the Overpass class. # Setup the caching in the Overpass class.
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) self.overpass = Overpass()
self.cluster_type = cluster_type self.cluster_type = cluster_type
if cluster_type == 'shopping' : if cluster_type == 'shopping' :
@ -112,13 +111,13 @@ class ClusterManager:
else : else :
points = [] points = []
for osm_type in osm_types : for elem in result:
for elem in result.findall(osm_type): osm_type = elem.get('type')
# Get coordinates and append them to the points list # Get coordinates and append them to the points list
_, coords = get_base_info(elem, osm_type) _, coords = get_base_info(elem, osm_type)
if coords is not None : if coords is not None :
points.append(coords) points.append(coords)
if points : if points :
self.all_points = np.array(points) self.all_points = np.array(points)
@ -249,20 +248,20 @@ class ClusterManager:
self.logger.error(f"Error fetching landmarks: {e}") self.logger.error(f"Error fetching landmarks: {e}")
continue continue
for osm_type in osm_types : for elem in result:
for elem in result.findall(osm_type): osm_type = elem.get('type')
id, coords, name = get_base_info(elem, osm_type, with_name=True) id, coords, name = get_base_info(elem, osm_type, with_name=True)
if name is None or coords is None : if name is None or coords is None :
continue continue
d = get_distance(cluster.centroid, coords) d = get_distance(cluster.centroid, coords)
if d < min_dist : if d < min_dist :
min_dist = d min_dist = d
new_name = name new_name = name
osm_type = osm_type # Add type: 'way' or 'relation' osm_type = osm_type # Add type: 'way' or 'relation'
osm_id = id # Add OSM id osm_id = id # Add OSM id
return Landmark( return Landmark(
name=new_name, name=new_name,

View File

@ -12,7 +12,7 @@ from .cluster_manager import ClusterManager
from ..overpass.overpass import Overpass, get_base_info from ..overpass.overpass import Overpass, get_base_info
from .utils import create_bbox from .utils import create_bbox
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH
class LandmarkManager: class LandmarkManager:
@ -54,7 +54,7 @@ class LandmarkManager:
self.detour_factor = parameters['detour_factor'] self.detour_factor = parameters['detour_factor']
# Setup the caching in the Overpass class. # Setup the caching in the Overpass class.
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) self.overpass = Overpass()
self.logger.info('LandmakManager successfully initialized.') self.logger.info('LandmakManager successfully initialized.')
@ -194,24 +194,24 @@ class LandmarkManager:
# Send the overpass query # Send the overpass query
try: try:
result = self.overpass.send_query( result = self.overpass.send_query(
bbox = bbox, bbox = bbox,
osm_types = osm_types, osm_types = osm_types,
selector = sel, selector = sel,
conditions = query_conditions, # except for nature.... conditions = query_conditions, # except for nature....
out = 'center' out = 'ids center tags'
) )
except Exception as e: except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}") self.logger.error(f"Error fetching landmarks: {e}")
continue continue
return_list += self.xml_to_landmarks(result, landmarktype, preference_level) return_list += self.json_to_landmarks(result, landmarktype, preference_level)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}") self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list return return_list
def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]: def json_to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]:
""" """
Parse the Overpass API result and extract landmarks. Parse the Overpass API result and extract landmarks.
@ -221,82 +221,83 @@ class LandmarkManager:
into Landmark objects. into Landmark objects.
Args: Args:
root (ET.Element): The root element of the XML response from Overpass API. elements (list): The elements of json response from Overpass API.
elem_type (str): The type of landmark (e.g., node, way, relation). elem_type (str): The type of landmark (e.g., node, way, relation).
Returns: Returns:
list[Landmark]: A list of Landmark objects extracted from the XML data. list[Landmark]: A list of Landmark objects extracted from the XML data.
""" """
if root is None : print(f'in landmarks manager : {type(elements)}')
if elements is None :
return [] return []
landmarks = [] landmarks = []
for osm_type in ['node', 'way', 'relation'] : for elem in elements:
for elem in root.findall(osm_type): osm_type = elem.get('type')
id, coords, name = get_base_info(elem, osm_type, with_name=True) id, coords, name = get_base_info(elem, osm_type, with_name=True)
if name is None or coords is None :
continue
tags = elem.findall('tag')
# Convert this to Landmark object
landmark = Landmark(name=name,
type=landmarktype,
location=coords,
osm_id=id,
osm_type=osm_type,
attractiveness=0,
n_tags=len(tags))
# Browse through tags to add information to landmark.
for tag in tags:
key = tag.get('k')
value = tag.get('v')
# Skip this landmark if not suitable.
if key == 'building:part' and value == 'yes' :
break
if 'disused:' in key :
break
if 'boundary:' in key :
break
if 'shop' in key and landmarktype != 'shopping' :
break
# if value == 'apartments' :
# break
# Fill in the other attributes.
if key == 'image' :
landmark.image_url = value
if key == 'website' :
landmark.website_url = value
if key == 'place_of_worship' :
landmark.is_place_of_worship = True
if key == 'wikipedia' :
landmark.wiki_url = value
if key == 'name:en' :
landmark.name_en = value
if 'building:' in key or 'pay' in key :
landmark.n_tags -= 1
# Set the duration.
if value in ['museum', 'aquarium', 'planetarium'] :
landmark.duration = 60
elif value == 'viewpoint' :
landmark.is_viewpoint = True
landmark.duration = 10
elif value == 'cathedral' :
landmark.is_place_of_worship = False
landmark.duration = 10
else:
self.set_landmark_score(landmark, landmarktype, preference_level)
landmarks.append(landmark)
if name is None or coords is None :
continue continue
tags = elem.get('tags')
# Convert this to Landmark object
landmark = Landmark(name=name,
type=landmarktype,
location=coords,
osm_id=id,
osm_type=osm_type,
attractiveness=0,
n_tags=len(tags))
# self.logger.debug('added landmark.')
# Browse through tags to add information to landmark.
for key, value in tags.items():
# Skip this landmark if not suitable.
if key == 'building:part' and value == 'yes' :
break
if 'disused:' in key :
break
if 'boundary:' in key :
break
if 'shop' in key and landmarktype != 'shopping' :
break
# if value == 'apartments' :
# break
# Fill in the other attributes.
if key == 'image' :
landmark.image_url = value
if key == 'website' :
landmark.website_url = value
if key == 'place_of_worship' :
landmark.is_place_of_worship = True
if key == 'wikipedia' :
landmark.wiki_url = value
if key == 'name:en' :
landmark.name_en = value
if 'building:' in key or 'pay' in key :
landmark.n_tags -= 1
# Set the duration.
if value in ['museum', 'aquarium', 'planetarium'] :
landmark.duration = 60
elif value == 'viewpoint' :
landmark.is_viewpoint = True
landmark.duration = 10
elif value == 'cathedral' :
landmark.is_place_of_worship = False
landmark.duration = 10
else:
self.set_landmark_score(landmark, landmarktype, preference_level)
landmarks.append(landmark)
continue
return landmarks return landmarks
def dict_to_selector_list(d: dict) -> list: def dict_to_selector_list(d: dict) -> list:

View File

@ -4,7 +4,6 @@ import xml.etree.ElementTree as ET
from ..overpass.overpass import Overpass, get_base_info from ..overpass.overpass import Overpass, get_base_info
from ..structs.landmark import Toilets from ..structs.landmark import Toilets
from ..constants import OSM_CACHE_DIR
from .utils import create_bbox from .utils import create_bbox
@ -42,7 +41,7 @@ class ToiletsManager:
self.location = location self.location = location
# Setup the caching in the Overpass class. # Setup the caching in the Overpass class.
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR) self.overpass = Overpass()
def generate_toilet_list(self) -> list[Toilets] : def generate_toilet_list(self) -> list[Toilets] :
@ -70,55 +69,55 @@ class ToiletsManager:
self.logger.error(f"Error fetching landmarks: {e}") self.logger.error(f"Error fetching landmarks: {e}")
return None return None
toilets_list = self.xml_to_toilets(result) toilets_list = self.to_toilets(result)
return toilets_list return toilets_list
def xml_to_toilets(self, root: ET.Element) -> list[Toilets]: def to_toilets(self, elements: list) -> list[Toilets]:
""" """
Parse the Overpass API result and extract landmarks. Parse the Overpass API result and extract landmarks.
This method processes the XML root element returned by the Overpass API and This method processes the JSON elements returned by the Overpass API and
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
relevant information such as name, coordinates, and tags, and converts them relevant information such as name, coordinates, and tags, and converts them
into Landmark objects. into Landmark objects.
Args: Args:
root (ET.Element): The root element of the XML response from Overpass API. list (osm elements): The root element of the JSON response from Overpass API.
elem_type (str): The type of landmark (e.g., node, way, relation). elem_type (str): The type of landmark (e.g., node, way, relation).
Returns: Returns:
list[Landmark]: A list of Landmark objects extracted from the XML data. list[Landmark]: A list of Landmark objects extracted from the XML data.
""" """
if root is None : if elements is None :
return [] return []
toilets_list = [] toilets_list = []
for osm_type in ['node', 'way', 'relation'] : for elem in elements:
for elem in root.findall(osm_type): osm_type = elem.get('type')
# Get coordinates and append them to the points list # Get coordinates and append them to the points list
_, coords = get_base_info(elem, osm_type) _, coords = get_base_info(elem, osm_type)
if coords is None : if coords is None :
continue continue
toilets = Toilets(location=coords) toilets = Toilets(location=coords)
# Extract tags as a dictionary # Extract tags as a dictionary
tags = {tag.get('k'): tag.get('v') for tag in elem.findall('tag')} tags = elem.get('tags')
if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes': if 'wheelchair' in tags.keys() and tags['wheelchair'] == 'yes':
toilets.wheelchair = True toilets.wheelchair = True
if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes': if 'changing_table' in tags.keys() and tags['changing_table'] == 'yes':
toilets.changing_table = True toilets.changing_table = True
if 'fee' in tags.keys() and tags['fee'] == 'yes': if 'fee' in tags.keys() and tags['fee'] == 'yes':
toilets.fee = True toilets.fee = True
if 'opening_hours' in tags.keys() : if 'opening_hours' in tags.keys() :
toilets.opening_hours = tags['opening_hours'] toilets.opening_hours = tags['opening_hours']
toilets_list.append(toilets) toilets_list.append(toilets)
return toilets_list return toilets_list