328 lines
14 KiB
Python
328 lines
14 KiB
Python
"""Module used to import data from OSM and arrange them in categories."""
|
|
import logging
|
|
import xml.etree.ElementTree as ET
|
|
import yaml
|
|
|
|
|
|
from ..structs.preferences import Preferences
|
|
from ..structs.landmark import Landmark
|
|
from .take_most_important import take_most_important
|
|
from .cluster_manager import ClusterManager
|
|
from ..overpass.overpass import Overpass, get_base_info
|
|
|
|
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR
|
|
|
|
# silence the overpass logger
|
|
logging.getLogger('Overpass').setLevel(level=logging.CRITICAL)
|
|
|
|
|
|
class LandmarkManager:
|
|
"""
|
|
Use this to manage landmarks.
|
|
Uses the overpass api to fetch landmarks and classify them.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
radius_close_to: int # radius in meters
|
|
church_coeff: float # coeff to adjsut score of churches
|
|
nature_coeff: float # coeff to adjust score of parks
|
|
overall_coeff: float # coeff to adjust weight of tags
|
|
n_important: int # number of important landmarks to consider
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
with AMENITY_SELECTORS_PATH.open('r') as f:
|
|
self.amenity_selectors = yaml.safe_load(f)
|
|
|
|
with LANDMARK_PARAMETERS_PATH.open('r') as f:
|
|
parameters = yaml.safe_load(f)
|
|
self.max_bbox_side = parameters['city_bbox_side']
|
|
self.radius_close_to = parameters['radius_close_to']
|
|
self.church_coeff = parameters['church_coeff']
|
|
self.nature_coeff = parameters['nature_coeff']
|
|
self.overall_coeff = parameters['overall_coeff']
|
|
self.tag_exponent = parameters['tag_exponent']
|
|
self.image_bonus = parameters['image_bonus']
|
|
self.name_bonus = parameters['name_bonus']
|
|
self.wikipedia_bonus = parameters['wikipedia_bonus']
|
|
self.viewpoint_bonus = parameters['viewpoint_bonus']
|
|
self.pay_bonus = parameters['pay_bonus']
|
|
self.n_important = parameters['N_important']
|
|
|
|
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
|
|
parameters = yaml.safe_load(f)
|
|
self.walking_speed = parameters['average_walking_speed']
|
|
self.detour_factor = parameters['detour_factor']
|
|
|
|
# Setup the caching in the Overpass class.
|
|
self.overpass = Overpass(caching_strategy='XML', cache_dir=OSM_CACHE_DIR)
|
|
|
|
self.logger.info('LandmakManager successfully initialized.')
|
|
|
|
|
|
def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) -> tuple[list[Landmark], list[Landmark]]:
|
|
"""
|
|
Generate and prioritize a list of landmarks based on user preferences.
|
|
|
|
This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences
|
|
and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important
|
|
landmarks based on a predefined criterion.
|
|
|
|
Args:
|
|
center_coordinates (tuple[float, float]): The latitude and longitude of the center location around which to search.
|
|
preferences (Preferences): The user's preference settings that influence the landmark selection.
|
|
|
|
Returns:
|
|
tuple[list[Landmark], list[Landmark]]:
|
|
- A list of all existing landmarks.
|
|
- A list of the most important landmarks based on the user's preferences.
|
|
"""
|
|
self.logger.debug('Starting to fetch landmarks...')
|
|
max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor)
|
|
reachable_bbox_side = min(max_walk_dist, self.max_bbox_side)
|
|
|
|
# use set to avoid duplicates, this requires some __methods__ to be set in Landmark
|
|
all_landmarks = set()
|
|
|
|
# Create a bbox using the around technique, tuple of strings
|
|
bbox = tuple((min(2000, reachable_bbox_side/2), center_coordinates[0], center_coordinates[1]))
|
|
|
|
# list for sightseeing
|
|
if preferences.sightseeing.score != 0:
|
|
self.logger.debug('Fetching sightseeing landmarks...')
|
|
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score)
|
|
all_landmarks.update(current_landmarks)
|
|
self.logger.debug('Fetching sightseeing clusters...')
|
|
|
|
# special pipeline for historic neighborhoods
|
|
neighborhood_manager = ClusterManager(bbox, 'sightseeing')
|
|
historic_clusters = neighborhood_manager.generate_clusters()
|
|
all_landmarks.update(historic_clusters)
|
|
self.logger.debug('Sightseeing clusters done')
|
|
|
|
# list for nature
|
|
if preferences.nature.score != 0:
|
|
self.logger.debug('Fetching nature landmarks...')
|
|
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score)
|
|
all_landmarks.update(current_landmarks)
|
|
|
|
|
|
# list for shopping
|
|
if preferences.shopping.score != 0:
|
|
self.logger.debug('Fetching shopping landmarks...')
|
|
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score)
|
|
self.logger.debug('Fetching shopping clusters...')
|
|
|
|
# set time for all shopping activites :
|
|
for landmark in current_landmarks :
|
|
landmark.duration = 30
|
|
all_landmarks.update(current_landmarks)
|
|
|
|
# special pipeline for shopping malls
|
|
shopping_manager = ClusterManager(bbox, 'shopping')
|
|
shopping_clusters = shopping_manager.generate_clusters()
|
|
all_landmarks.update(shopping_clusters)
|
|
self.logger.debug('Shopping clusters done')
|
|
|
|
|
|
|
|
landmarks_constrained = take_most_important(all_landmarks, self.n_important)
|
|
# self.logger.info(f'All landmarks generated : {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.')
|
|
|
|
return all_landmarks, landmarks_constrained
|
|
|
|
def set_landmark_score(self, landmark: Landmark, landmarktype: str, preference_level: int) :
|
|
"""
|
|
Calculate and set the attractiveness score for a given landmark.
|
|
|
|
This method evaluates the landmark's attractiveness based on its properties
|
|
(number of tags, presence of Wikipedia URL, image, website, and whether it's
|
|
a place of worship) and adjusts the score using the user's preference level.
|
|
|
|
Args:
|
|
landmark (Landmark): The landmark object to score.
|
|
landmarktype (str): The type of the landmark (currently unused).
|
|
preference_level (int): The user's preference level for this landmark type.
|
|
"""
|
|
score = landmark.n_tags**self.tag_exponent
|
|
if landmark.wiki_url :
|
|
score *= self.wikipedia_bonus
|
|
if landmark.image_url :
|
|
score *= self.image_bonus
|
|
if landmark.website_url :
|
|
score *= self.wikipedia_bonus
|
|
if landmark.is_place_of_worship :
|
|
score *= self.church_coeff
|
|
if landmarktype == 'nature' :
|
|
score *= self.nature_coeff
|
|
|
|
landmark.attractiveness = int(score * preference_level * 2)
|
|
|
|
|
|
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, preference_level: int) -> list[Landmark]:
|
|
"""
|
|
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
|
|
|
|
Args:
|
|
bbox (tuple[float, float, float, float]): The bounding box coordinates (around:radius, center_lat, center_lon).
|
|
amenity_selector (dict): The Overpass API query selector for the desired landmark type.
|
|
landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping').
|
|
|
|
Returns:
|
|
list[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria.
|
|
|
|
Notes:
|
|
- Landmarks are fetched using Overpass API queries.
|
|
- Selectors are translated from the dictionary to the Overpass query format. (e.g., 'amenity'='place_of_worship')
|
|
- Landmarks are filtered based on various conditions including tags and type.
|
|
"""
|
|
return_list = []
|
|
|
|
if landmarktype == 'nature' : query_conditions = []
|
|
else : query_conditions = ['count_tags()>5']
|
|
|
|
# caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
|
|
# we need to split the selectors into separate queries and merge the results
|
|
for sel in dict_to_selector_list(amenity_selector):
|
|
# self.logger.debug(f"Current selector: {sel}")
|
|
|
|
osm_types = ['way', 'relation']
|
|
|
|
if 'viewpoint' in sel :
|
|
query_conditions = []
|
|
osm_types.append('node')
|
|
|
|
query = self.overpass.build_query(
|
|
area = bbox,
|
|
osm_types = osm_types,
|
|
selector = sel,
|
|
conditions = query_conditions, # except for nature....
|
|
out = 'center'
|
|
)
|
|
self.logger.debug(f"Query: {query}")
|
|
|
|
try:
|
|
result = self.overpass.send_query(query)
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching landmarks: {e}")
|
|
continue
|
|
|
|
return_list += self.xml_to_landmarks(result, landmarktype, preference_level)
|
|
|
|
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
|
|
|
|
return return_list
|
|
|
|
|
|
def xml_to_landmarks(self, root: ET.Element, landmarktype, preference_level) -> list[Landmark]:
|
|
"""
|
|
Parse the Overpass API result and extract landmarks.
|
|
|
|
This method processes the XML root element returned by the Overpass API and
|
|
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
|
|
relevant information such as name, coordinates, and tags, and converts them
|
|
into Landmark objects.
|
|
|
|
Args:
|
|
root (ET.Element): The root element of the XML response from Overpass API.
|
|
elem_type (str): The type of landmark (e.g., node, way, relation).
|
|
|
|
Returns:
|
|
list[Landmark]: A list of Landmark objects extracted from the XML data.
|
|
"""
|
|
if root is None :
|
|
return []
|
|
|
|
landmarks = []
|
|
for osm_type in ['node', 'way', 'relation'] :
|
|
for elem in root.findall(osm_type):
|
|
|
|
id, coords, name = get_base_info(elem, osm_type, with_name=True)
|
|
|
|
if name is None or coords is None :
|
|
continue
|
|
|
|
tags = elem.findall('tag')
|
|
|
|
# Convert this to Landmark object
|
|
landmark = Landmark(name=name,
|
|
type=landmarktype,
|
|
location=coords,
|
|
osm_id=id,
|
|
osm_type=osm_type,
|
|
attractiveness=0,
|
|
n_tags=len(tags))
|
|
|
|
# Browse through tags to add information to landmark.
|
|
for tag in tags:
|
|
key = tag.get('k')
|
|
value = tag.get('v')
|
|
|
|
# Skip this landmark if not suitable.
|
|
if key == 'building:part' and value == 'yes' :
|
|
break
|
|
if 'disused:' in key :
|
|
break
|
|
if 'boundary:' in key :
|
|
break
|
|
if 'shop' in key and landmarktype != 'shopping' :
|
|
break
|
|
# if value == 'apartments' :
|
|
# break
|
|
|
|
# Fill in the other attributes.
|
|
if key == 'image' :
|
|
landmark.image_url = value
|
|
if key == 'website' :
|
|
landmark.website_url = value
|
|
if key == 'place_of_worship' :
|
|
landmark.is_place_of_worship = True
|
|
if key == 'wikipedia' :
|
|
landmark.wiki_url = value
|
|
if key == 'name:en' :
|
|
landmark.name_en = value
|
|
if 'building:' in key or 'pay' in key :
|
|
landmark.n_tags -= 1
|
|
|
|
# Set the duration.
|
|
if value in ['museum', 'aquarium', 'planetarium'] :
|
|
landmark.duration = 60
|
|
elif value == 'viewpoint' :
|
|
landmark.is_viewpoint = True
|
|
landmark.duration = 10
|
|
elif value == 'cathedral' :
|
|
landmark.is_place_of_worship = False
|
|
landmark.duration = 10
|
|
else :
|
|
landmark.duration = 5
|
|
|
|
else:
|
|
self.set_landmark_score(landmark, landmarktype, preference_level)
|
|
landmarks.append(landmark)
|
|
|
|
continue
|
|
|
|
return landmarks
|
|
|
|
def dict_to_selector_list(d: dict) -> list:
|
|
"""
|
|
Convert a dictionary of key-value pairs to a list of Overpass query strings.
|
|
|
|
Args:
|
|
d (dict): A dictionary of key-value pairs representing the selector.
|
|
|
|
Returns:
|
|
list: A list of strings representing the Overpass query selectors.
|
|
"""
|
|
return_list = []
|
|
for key, value in d.items():
|
|
if isinstance(value, list):
|
|
val = '|'.join(value)
|
|
return_list.append(f'{key}~"^({val})$"')
|
|
elif isinstance(value, str) and len(value) == 0:
|
|
return_list.append(f'{key}')
|
|
else:
|
|
return_list.append(f'{key}={value}')
|
|
return return_list
|