working cache
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 2m19s
Run linting on the backend code / Build (pull_request) Successful in 25s
Run testing on the backend code / Build (pull_request) Failing after 7m37s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 24s

This commit is contained in:
Helldragon67 2025-01-23 08:04:26 +01:00
parent c668158341
commit ca40de82dd
7 changed files with 389 additions and 388 deletions

1
backend/.gitignore vendored
View File

@ -1,4 +1,5 @@
# osm-cache and wikidata cache
cache_XML/
cache/
apicache/

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,133 @@
import os
import xml.etree.ElementTree as ET
import hashlib
import ujson
from ..constants import OSM_CACHE_DIR
def get_cache_key(query: str) -> str:
"""
Generate a unique cache key for the query using a hash function.
This ensures that queries with different parameters are cached separately.
"""
return hashlib.md5(query.encode('utf-8')).hexdigest()
class CachingStrategyBase:
def get(self, key):
raise NotImplementedError('Subclass should implement get')
def set(self, key, data):
raise NotImplementedError('Subclass should implement set')
def close(self):
pass
# For later use if xml does not suit well
class JSONCache(CachingStrategyBase):
def __init__(self, cache_dir=OSM_CACHE_DIR):
# Add the class name as a suffix to the directory
self._cache_dir = f'{cache_dir}_JSON'
if not os.path.exists(self._cache_dir):
os.makedirs(self._cache_dir)
def _filename(self, key):
return os.path.join(self._cache_dir, f'{key}.json')
def get(self, key):
filename = self._filename(key)
if os.path.exists(filename):
with open(filename, 'r') as file:
return ujson.load(file)
return None
def set(self, key, value):
with open(self._filename(key), 'w') as file:
ujson.dump(value, file)
class XMLCache(CachingStrategyBase):
def __init__(self, cache_dir=OSM_CACHE_DIR):
# Add the class name as a suffix to the directory
self._cache_dir = f'{cache_dir}_XML'
if not os.path.exists(self._cache_dir):
os.makedirs(self._cache_dir)
def _filename(self, key):
return os.path.join(self._cache_dir, f'{key}.xml')
def get(self, key):
"""Retrieve XML data from the cache and parse it as an ElementTree."""
filename = self._filename(key)
if os.path.exists(filename):
try:
# Parse and return the cached XML data
tree = ET.parse(filename)
return tree.getroot() # Return the root element of the parsed XML
except ET.ParseError:
print(f"Error parsing cached XML file: {filename}")
return None
return None
def set(self, key, value):
"""Save the XML data as an ElementTree to the cache."""
filename = self._filename(key)
tree = ET.ElementTree(value) # value is expected to be an ElementTree root element
try:
# Write the XML data to a file
with open(filename, 'wb') as file:
tree.write(file, encoding='utf-8', xml_declaration=True)
except IOError as e:
print(f"Error writing to cache file: {filename} - {e}")
class CachingStrategy:
__strategy = XMLCache() # Default caching strategy
# Dictionary to map string identifiers to caching strategy classes
__strategies = {
'XML': XMLCache,
'JSON': JSONCache,
# Add more strategies here if needed
}
@classmethod
def use(cls, strategy_name='XML', **kwargs):
"""
Set the caching strategy based on the strategy_name provided.
Args:
strategy_name (str): The name of the caching strategy (e.g., 'XML').
**kwargs: Additional keyword arguments to pass when initializing the strategy.
"""
# If a previous strategy exists, close it
if cls.__strategy:
cls.__strategy.close()
# Retrieve the strategy class based on the strategy name
strategy_class = cls.__strategies.get(strategy_name)
if not strategy_class:
raise ValueError(f"Unknown caching strategy: {strategy_name}")
# Instantiate the new strategy with the provided arguments
cls.__strategy = strategy_class(**kwargs)
return cls.__strategy
@classmethod
def get(cls, key):
"""Get data from the current strategy's cache."""
if not cls.__strategy:
raise RuntimeError("Caching strategy has not been set.")
return cls.__strategy.get(key)
@classmethod
def set(cls, key, value):
"""Set data in the current strategy's cache."""
if not cls.__strategy:
raise RuntimeError("Caching strategy has not been set.")
cls.__strategy.set(key, value)

View File

@ -0,0 +1,114 @@
from typing import Literal, List
import urllib
import json
import xml.etree.ElementTree as ET
from .caching_strategy import get_cache_key, CachingStrategy
ElementTypes = List[Literal['way', 'node', 'relation']]
def build_query(area: tuple, element_types: ElementTypes, selector: str,
conditions=[], out='center'):
"""
Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
Args:
area (tuple): A tuple representing the geographical search area, typically in the format
(radius, latitude, longitude). The first element is a string like "around:2000"
specifying the search radius, and the second and third elements represent
the latitude and longitude as floats or strings.
element_types (list[str]): A list of OSM element types to search for. Must be one or more of
'Way', 'Node', or 'Relation'.
selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.).
conditions (list, optional): A list of conditions to apply as additional filters for the
selected OSM elements. The conditions should be written in
the Overpass QL format, and they are combined with '&&' if
multiple are provided. Defaults to an empty list.
out (str, optional): Specifies the output type, such as 'center', 'body', or 'tags'.
Defaults to 'center'.
Returns:
str: The constructed Overpass QL query string.
Notes:
- If no conditions are provided, the query will just use the `selector` to filter the OSM
elements without additional constraints.
- The search area must always formatted as "(radius, lat, lon)".
"""
if not isinstance(conditions, list) :
conditions = [conditions]
query = '('
# Round the radius to nearest 50 and coordinates to generate less queries
search_radius = round(area[0] / 50) * 50
loc = tuple((round(area[1], 2), round(area[2], 2)))
search_area = f"(around:{search_radius}, {str(loc[0])}, {str(loc[1])})"
if conditions :
conditions = '(if: ' + ' && '.join(conditions) + ')'
else :
conditions = ''
for elem in element_types :
query += elem + '[' + selector + ']' + conditions + search_area + ';'
query += ');' + f'out {out};'
return query
def send_overpass_query(query: str, use_cache: bool = True) -> dict:
"""
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
Args:
query (str): The Overpass QL query to be sent to the Overpass API.
Returns:
dict: The parsed JSON response from the Overpass API, or None if the request fails.
"""
# Generate a cache key for the current query
cache_key = get_cache_key(query)
# Try to fetch the result from the cache
cached_response = CachingStrategy.get(cache_key)
if cached_response:
print("Cache hit!")
return cached_response
# Define the Overpass API endpoint
overpass_url = "https://overpass-api.de/api/interpreter"
# Prepare the data to be sent as POST request, encoded as bytes
data = urllib.parse.urlencode({'data': query}).encode('utf-8')
# Create a custom header with a User-Agent
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; OverpassQuery/1.0; +http://example.com)',
}
try:
# Create a Request object with the specified URL, data, and headers
request = urllib.request.Request(overpass_url, data=data, headers=headers)
# Send the request and read the response
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_data = response.read().decode('utf-8')
root = ET.fromstring(response_data)
# Cache the response data as an ElementTree root
CachingStrategy.set(cache_key, root)
return root
except urllib.error.URLError as e:
print(f"Error connecting to Overpass API: {e}")
return None
except json.JSONDecodeError:
print("Error decoding the JSON response from Overpass API.")
return None

View File

@ -11,7 +11,7 @@ def client():
"""Client used to call the app."""
return TestClient(app)
'''
def test_turckheim(client, request): # pylint: disable=redefined-outer-name
"""
Test n°1 : Custom test in Turckheim to ensure small villages are also supported.
@ -54,7 +54,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
assert len(landmarks) > 2 # check that there is something to visit
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
# assert 2==3
'''
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
"""
@ -97,7 +97,7 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
# assert 2 == 3
'''
def test_cologne(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
@ -336,7 +336,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
# def test_new_trip_single_prefs(client):
# response = client.post(

View File

@ -1,12 +1,15 @@
"""Module used to import data from OSM and arrange them in categories."""
import logging
import yaml
import xml.etree.ElementTree as ET
from ..structs.preferences import Preferences
from ..structs.landmark import Landmark
from .take_most_important import take_most_important
from .cluster_manager import ClusterManager
from .overpass import OverpassQueryBuilder, send_overpass_query, parse_result
from ..overpass.overpass import build_query, send_overpass_query
from ..overpass.caching_strategy import CachingStrategy
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
@ -53,8 +56,7 @@ class LandmarkManager:
self.walking_speed = parameters['average_walking_speed']
self.detour_factor = parameters['detour_factor']
# self.overpass = Overpass()
# CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
CachingStrategy.use('XML', cache_dir=OSM_CACHE_DIR)
self.logger.info('LandmakManager successfully initialized.')
@ -84,35 +86,32 @@ class LandmarkManager:
all_landmarks = set()
# Create a bbox using the around technique, tuple of strings
bbox = tuple((f"around:{min(2000, reachable_bbox_side/2)}", str(center_coordinates[0]), str(center_coordinates[1])))
bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1]))
# list for sightseeing
if preferences.sightseeing.score != 0:
self.logger.debug('Fetching sightseeing landmarks...')
score_function = lambda score: score * 10 * preferences.sightseeing.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, score_function)
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score)
all_landmarks.update(current_landmarks)
self.logger.debug('Fetching sightseeing clusters...')
# special pipeline for historic neighborhoods
neighborhood_manager = ClusterManager(bbox, 'sightseeing')
historic_clusters = neighborhood_manager.generate_clusters()
all_landmarks.update(historic_clusters)
self.logger.debug('Sightseeing clusters done')
# neighborhood_manager = ClusterManager(bbox, 'sightseeing')
# historic_clusters = neighborhood_manager.generate_clusters()
# all_landmarks.update(historic_clusters)
# self.logger.debug('Sightseeing clusters done')
# list for nature
if preferences.nature.score != 0:
self.logger.debug('Fetching nature landmarks...')
score_function = lambda score: score * 10 * self.nature_coeff * preferences.nature.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, score_function)
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score)
all_landmarks.update(current_landmarks)
# list for shopping
if preferences.shopping.score != 0:
self.logger.debug('Fetching shopping landmarks...')
score_function = lambda score: score * 10 * preferences.shopping.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, score_function)
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score)
self.logger.debug('Fetching shopping clusters...')
# set time for all shopping activites :
@ -121,10 +120,10 @@ class LandmarkManager:
all_landmarks.update(current_landmarks)
# special pipeline for shopping malls
shopping_manager = ClusterManager(bbox, 'shopping')
shopping_clusters = shopping_manager.generate_clusters()
all_landmarks.update(shopping_clusters)
self.logger.debug('Shopping clusters done')
# shopping_manager = ClusterManager(bbox, 'shopping')
# shopping_clusters = shopping_manager.generate_clusters()
# all_landmarks.update(shopping_clusters)
# self.logger.debug('Shopping clusters done')
@ -133,8 +132,19 @@ class LandmarkManager:
return all_landmarks, landmarks_constrained
def set_score(self, landmark: Landmark, landmarktype: str, preference_level: int) :
def set_landmark_score(self, landmark: Landmark, landmarktype: str, preference_level: int) :
"""
Calculate and set the attractiveness score for a given landmark.
This method evaluates the landmark's attractiveness based on its properties
(number of tags, presence of Wikipedia URL, image, website, and whether it's
a place of worship) and adjusts the score using the user's preference level.
Args:
landmark (Landmark): The landmark object to score.
landmarktype (str): The type of the landmark (currently unused).
preference_level (int): The user's preference level for this landmark type.
"""
score = landmark.n_tags**self.tag_exponent
if landmark.wiki_url :
score *= self.wikipedia_bonus
@ -144,11 +154,13 @@ class LandmarkManager:
score *= self.wikipedia_bonus
if landmark.is_place_of_worship :
score *= self.church_coeff
if landmarktype == 'nature' :
score *= self.nature_coeff
landmark.attractiveness = int(score * preference_level)
landmark.attractiveness = int(score * preference_level * 2)
'''
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, score_function: callable) -> list[Landmark]:
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, preference_level: int) -> list[Landmark]:
"""
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
@ -183,165 +195,7 @@ class LandmarkManager:
query_conditions = []
element_types.append('node')
query = OverpassQueryBuilder(
area = bbox,
element_types = element_types,
selector = sel,
conditions = query_conditions, # except for nature....
out = 'center'
)
self.logger.debug(f"Query: {query}")
try:
result = self.overpass.query(query)
except Exception as e:
self.logger.error(f"Error fetching landmarks: {e}")
continue
for elem in result.elements():
name = elem.tag('name')
location = (elem.centerLat(), elem.centerLon())
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
# TODO: exclude these from the get go
# handle unprecise and no-name locations
if name is None or location[0] is None:
if osm_type == 'node' and 'viewpoint' in elem.tags().values():
name = 'Viewpoint'
name_en = 'Viewpoint'
location = (elem.lat(), elem.lon())
else :
continue
# skip if part of another building
if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes':
continue
elem_type = landmarktype # Add the landmark type as 'sightseeing,
n_tags = len(elem.tags().keys()) # Add number of tags
score = n_tags**self.tag_exponent # Add score
duration = 5 # Set base duration to 5 minutes
# skip = False # Set skipping parameter to false
tag_values = set(elem.tags().values()) # Store tag values
# Retrieve image, name and website :
image_url = elem.tag('image')
website_url = elem.tag('website')
if website_url is None :
website_url = elem.tag('wikipedia')
name_en = elem.tag('name:en')
if elem_type != "nature" and elem.tag('leisure') == "park":
elem_type = "nature"
if elem.tag('wikipedia') is not None :
score += self.wikipedia_bonus
# Skip element if it is an administrative boundary or a disused thing or it is an appartement and useless amenities
if elem.tag('boundary') is not None or elem.tag('disused') is not None:
continue
if 'apartments' in elem.tags().values():
continue
if elem.tag('historic') is not None and elem.tag('historic') in ['manor', 'optical_telegraph', 'pound', 'shieling', 'wayside_cross']:
continue
# Adjust scoring, browse through tag keys using wildcards
for tag_key in elem.tags().keys():
if "pay" in tag_key:
# payment options are misleading and should not count for the scoring.
score += self.pay_bonus
if "building:" in tag_key:
# do not count the building description as being particularly useful
n_tags -= 1
# if landmarktype != "shopping":
# if "shop" in tag_key:
# skip = True
# break
# if tag_key == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']:
# skip = True
# break
# if skip:
# continue
score = score_function(score)
if "place_of_worship" in tag_values :
if 'cathedral' in tag_values :
duration = 10
else :
score *= self.church_coeff
elif 'viewpoint' in tag_values :
# viewpoints must count more
score = score * self.viewpoint_bonus
elif "museum" in tag_values or "aquarium" in tag_values or "planetarium" in tag_values:
duration = 60
# finally create our own landmark object
landmark = Landmark(
name = name,
type = elem_type,
location = location,
osm_type = osm_type,
osm_id = osm_id,
attractiveness = int(score),
must_do = False,
n_tags = int(n_tags),
duration = int(duration),
name_en = name_en,
image_url = image_url,
website_url = website_url
)
return_list.append(landmark)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list
'''
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, score_function: callable) -> list[Landmark]:
"""
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
Args:
bbox (tuple[float, float, float, float]): The bounding box coordinates (around:radius, center_lat, center_lon).
amenity_selector (dict): The Overpass API query selector for the desired landmark type.
landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping').
score_function (callable): The function to compute the score of the landmark based on its attributes.
Returns:
list[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria.
Notes:
- Landmarks are fetched using Overpass API queries.
- Selectors are translated from the dictionary to the Overpass query format. (e.g., 'amenity'='place_of_worship')
- Landmarks are filtered based on various conditions including tags and type.
- Scores are assigned to landmarks based on their attributes and surrounding elements.
"""
return_list = []
if landmarktype == 'nature' : query_conditions = []
else : query_conditions = ['count_tags()>5']
# caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
# we need to split the selectors into separate queries and merge the results
for sel in dict_to_selector_list(amenity_selector):
# self.logger.debug(f"Current selector: {sel}")
element_types = ['way', 'relation']
if 'viewpoint' in sel :
query_conditions = []
element_types.append('node')
query = OverpassQueryBuilder(
query = build_query(
area = bbox,
element_types = element_types,
selector = sel,
@ -356,12 +210,110 @@ class LandmarkManager:
self.logger.error(f"Error fetching landmarks: {e}")
continue
return_list = parse_result(result, landmarktype)
return_list += self.parse_overpass_result(result, landmarktype, preference_level)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list
def parse_overpass_result(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
"""
Parse the Overpass API result and extract landmarks.
This method processes the XML root element returned by the Overpass API and
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
relevant information such as name, coordinates, and tags, and converts them
into Landmark objects.
Args:
root (ET.Element): The root element of the XML response from Overpass API.
elem_type (str): The type of landmark (e.g., node, way, relation).
Returns:
list[Landmark]: A list of Landmark objects extracted from the XML data.
"""
if root is None :
return []
landmarks = []
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
# self.logger.debug('new landmark')
# Extract basic info from the landmark.
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
center = elem.find('center')
tags = elem.findall('tag')
# Extract the center latitude and longitude if available.
if name is not None and center is not None:
lat = float(center.get('lat'))
lon = float(center.get('lon'))
coords = tuple((lat, lon))
else :
continue
# Convert this to Landmark object
landmark = Landmark(name=name,
type=landmarktype,
location=coords,
osm_id=elem.get('id'),
osm_type=osm_type,
attractiveness=0,
n_tags=len(tags))
# Browse through tags to add information to landmark.
for tag in tags:
key = tag.get('k')
value = tag.get('v')
# Skip this landmark if not suitable.
if key == 'building:part' and value == 'yes' :
break
if 'disused:' in key :
break
if 'boundary:' in key :
break
if 'shop' in key and landmarktype != 'shopping' :
break
# if value == 'apartments' :
# break
# Fill in the other attributes.
if key == 'image' :
landmark.image_url = value
if key == 'website' :
landmark.website_url = value
if key == 'place_of_worship' :
landmark.is_place_of_worship = True
if key == 'wikipedia' :
landmark.wiki_url = value
if key == 'name:en' :
landmark.name_en = value
if 'building:' in key or 'pay' in key :
landmark.n_tags -= 1
# Set the duration.
if value in ['museum', 'aquarium', 'planetarium'] :
landmark.duration = 60
elif value == 'viewpoint' :
landmark.is_viewpoint = True
landmark.duration = 10
elif value == 'cathedral' :
landmark.is_place_of_worship = False
landmark.duration = 10
else :
landmark.duration = 5
else:
self.set_landmark_score(landmark, landmarktype, preference_level)
landmarks.append(landmark)
# self.logger.debug('new landmark added')
continue
return landmarks
def dict_to_selector_list(d: dict) -> list:
"""
Convert a dictionary of key-value pairs to a list of Overpass query strings.

View File

@ -1,199 +0,0 @@
from typing import Literal, List
import urllib
import json
import xml.etree.ElementTree as ET
from ..structs.landmark import Landmark
ElementTypes = List[Literal['way', 'node', 'relation']]
def OverpassQueryBuilder(area: tuple, element_types: ElementTypes, selector: str,
conditions=[], out='center'):
"""
Constructs a query string for the Overpass API to retrieve OpenStreetMap (OSM) data.
Args:
area (tuple): A tuple representing the geographical search area, typically in the format
(radius, latitude, longitude). The first element is a string like "around:2000"
specifying the search radius, and the second and third elements represent
the latitude and longitude as floats or strings.
element_types (list[str]): A list of OSM element types to search for. Must be one or more of
'Way', 'Node', or 'Relation'.
selector (str): The key or tag to filter the OSM elements (e.g., 'amenity', 'highway', etc.).
conditions (list, optional): A list of conditions to apply as additional filters for the
selected OSM elements. The conditions should be written in
the Overpass QL format, and they are combined with '&&' if
multiple are provided. Defaults to an empty list.
out (str, optional): Specifies the output type, such as 'center', 'body', or 'tags'.
Defaults to 'center'.
Returns:
str: The constructed Overpass QL query string.
Notes:
- If no conditions are provided, the query will just use the `selector` to filter the OSM
elements without additional constraints.
- The search area must always formatted as "(radius, lat, lon)".
"""
if not isinstance(conditions, list) :
conditions = [conditions]
query = '('
search_area = f"({', '.join(map(str, area))})"
if conditions :
conditions = '(if: ' + ' && '.join(conditions) + ')'
else :
conditions = ''
for elem in element_types :
query += elem + '[' + selector + ']' + conditions + search_area + ';'
query += ');' + f'out {out};'
return query
def send_overpass_query(query: str) -> dict:
"""
Sends the Overpass QL query to the Overpass API and returns the parsed JSON response.
Args:
query (str): The Overpass QL query to be sent to the Overpass API.
Returns:
dict: The parsed JSON response from the Overpass API, or None if the request fails.
"""
# Define the Overpass API endpoint
overpass_url = "https://overpass-api.de/api/interpreter"
# Prepare the data to be sent as POST request, encoded as bytes
data = urllib.parse.urlencode({'data': query}).encode('utf-8')
# Create a custom header with a User-Agent
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; OverpassQuery/1.0; +http://example.com)',
}
try:
# Create a Request object with the specified URL, data, and headers
request = urllib.request.Request(overpass_url, data=data, headers=headers)
# Send the request and read the response
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_data = response.read().decode('utf-8')
return ET.fromstring(response_data)
except urllib.error.URLError as e:
print(f"Error connecting to Overpass API: {e}")
return None
except json.JSONDecodeError:
print("Error decoding the JSON response from Overpass API.")
return None
def parse_result(root: ET.Element, elem_type) -> List[Landmark]:
landmarks = []
if root is None :
return landmarks
for osm_type in ['node', 'way', 'relation'] :
for elem in root.findall(osm_type):
# Extract basic info from the landmark.
name = elem.find("tag[@k='name']").get('v') if elem.find("tag[@k='name']") is not None else None
center = elem.find('center')
tags = elem.findall('tag')
# Extract the center latitude and longitude if available.
if name is not None and center is not None:
lat = float(center.get('lat'))
lon = float(center.get('lon'))
coords = tuple((lat, lon))
else :
continue
# Convert this to Landmark object
landmark = Landmark(name=name,
type=elem_type,
location=coords,
osm_id=elem.get('id'),
osm_type=osm_type,
attractiveness=0,
n_tags=len(tags))
# Browse through tags to add information to landmark.
for tag in tags:
key = tag.get('k')
value = tag.get('v')
# Skip this landmark if not suitable.
if key == 'building:part' and value == 'yes' :
break
if 'disused:' in key :
break
if 'boundary:' in key :
break
if 'shop' in key and elem_type != 'shopping' :
break
# if value == 'apartments' :
# break
# Fill in the other attributes.
if key == 'image' :
landmark.image_url = value
if key == 'website' :
landmark.website_url = value
if key == 'place_of_worship' :
landmark.is_place_of_worship = True
if key == 'wikipedia' :
landmark.wiki_url = value
if key == 'name:en' :
landmark.name_en = value
if 'building:' in key or 'pay' in key :
landmark.n_tags -= 1
# Set the duration.
if value in ['museum', 'aquarium', 'planetarium'] :
landmark.duration = 60
elif value == 'viewpoint' :
landmark.is_viewpoint = True
landmark.duration = 10
elif value == 'cathedral' :
landmark.is_place_of_worship = False
landmark.duration = 10
else :
landmark.duration = 5
else:
set_score(landmark, elem_type)
landmarks.append(landmark)
continue
return landmarks
def set_score(landmark: Landmark, landmarktype: str) :
score = landmark.n_tags**1.15
if landmark.wiki_url :
score *= 1.1
if landmark.image_url :
score *= 1.1
if landmark.website_url :
score *= 1.1
if landmark.is_place_of_worship :
score *= 0.65
if landmark.is_viewpoint :
# print(f"{landmark.name}: n_tags={landmark.n_tags} and score={score*3*1.35*10}")
score *= 3
if landmarktype == 'nature' :
score *= 1.35
landmark.attractiveness = int(score * 10)