413 lines
17 KiB
Python
413 lines
17 KiB
Python
"""Module used to import data from OSM and arrange them in categories."""
|
|
import logging
|
|
import yaml
|
|
|
|
from ..structs.preferences import Preferences
|
|
from ..structs.landmark import Landmark
|
|
from .take_most_important import take_most_important
|
|
from .cluster_manager import ClusterManager
|
|
from ..overpass.overpass import Overpass, get_base_info
|
|
from .utils import create_bbox
|
|
|
|
from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH
|
|
|
|
|
|
class LandmarkManager:
|
|
"""
|
|
Use this to manage landmarks.
|
|
Uses the overpass api to fetch landmarks and classify them.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
radius_close_to: int # radius in meters
|
|
church_coeff: float # coeff to adjsut score of churches
|
|
nature_coeff: float # coeff to adjust score of parks
|
|
overall_coeff: float # coeff to adjust weight of tags
|
|
n_important: int # number of important landmarks to consider
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
with AMENITY_SELECTORS_PATH.open('r') as f:
|
|
self.amenity_selectors = yaml.safe_load(f)
|
|
|
|
with LANDMARK_PARAMETERS_PATH.open('r') as f:
|
|
parameters = yaml.safe_load(f)
|
|
self.max_bbox_side = parameters['max_bbox_side']
|
|
self.church_coeff = parameters['church_coeff']
|
|
self.nature_coeff = parameters['nature_coeff']
|
|
self.overall_coeff = parameters['overall_coeff']
|
|
self.tag_exponent = parameters['tag_exponent']
|
|
self.image_bonus = parameters['image_bonus']
|
|
self.wikipedia_bonus = parameters['wikipedia_bonus']
|
|
self.viewpoint_bonus = parameters['viewpoint_bonus']
|
|
self.pay_bonus = parameters['pay_bonus']
|
|
self.n_important = parameters['N_important']
|
|
|
|
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
|
|
parameters = yaml.safe_load(f)
|
|
self.walking_speed = parameters['average_walking_speed']
|
|
self.detour_factor = parameters['detour_factor']
|
|
|
|
# Setup the caching in the Overpass class.
|
|
self.overpass = Overpass()
|
|
|
|
self.logger.info('LandmakManager successfully initialized.')
|
|
|
|
|
|
def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) -> tuple[list[Landmark], list[Landmark]]:
|
|
"""
|
|
Generate and prioritize a list of landmarks based on user preferences.
|
|
|
|
This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences
|
|
and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important
|
|
landmarks based on a predefined criterion.
|
|
|
|
Args:
|
|
center_coordinates (tuple[float, float]): The latitude and longitude of the center location around which to search.
|
|
preferences (Preferences): The user's preference settings that influence the landmark selection.
|
|
|
|
Returns:
|
|
tuple[list[Landmark], list[Landmark]]:
|
|
- A list of all existing landmarks.
|
|
- A list of the most important landmarks based on the user's preferences.
|
|
"""
|
|
self.logger.debug('Starting to fetch landmarks...')
|
|
max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor)
|
|
radius = min(max_walk_dist, int(self.max_bbox_side/2))
|
|
|
|
# use set to avoid duplicates, this requires some __methods__ to be set in Landmark
|
|
all_landmarks = set()
|
|
|
|
# Create a bbox using the around technique, tuple of strings
|
|
bbox = create_bbox(center_coordinates, radius)
|
|
|
|
# list for sightseeing
|
|
if preferences.sightseeing.score != 0:
|
|
self.logger.debug('Fetching sightseeing landmarks...')
|
|
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score)
|
|
all_landmarks.update(current_landmarks)
|
|
self.logger.info(f'Found {len(current_landmarks)} sightseeing landmarks')
|
|
|
|
# special pipeline for historic neighborhoods
|
|
neighborhood_manager = ClusterManager(bbox, 'sightseeing')
|
|
historic_clusters = neighborhood_manager.generate_clusters()
|
|
all_landmarks.update(historic_clusters)
|
|
|
|
# list for nature
|
|
if preferences.nature.score != 0:
|
|
self.logger.debug('Fetching nature landmarks...')
|
|
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score)
|
|
all_landmarks.update(current_landmarks)
|
|
self.logger.info(f'Found {len(current_landmarks)} nature landmarks')
|
|
|
|
|
|
# list for shopping
|
|
if preferences.shopping.score != 0:
|
|
self.logger.debug('Fetching shopping landmarks...')
|
|
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score)
|
|
self.logger.info(f'Found {len(current_landmarks)} shopping landmarks')
|
|
|
|
# set time for all shopping activites :
|
|
for landmark in current_landmarks :
|
|
landmark.duration = 30
|
|
all_landmarks.update(current_landmarks)
|
|
|
|
# special pipeline for shopping malls
|
|
shopping_manager = ClusterManager(bbox, 'shopping')
|
|
shopping_clusters = shopping_manager.generate_clusters()
|
|
all_landmarks.update(shopping_clusters)
|
|
|
|
|
|
landmarks_constrained = take_most_important(all_landmarks, self.n_important)
|
|
# self.logger.info(f'All landmarks generated : {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.')
|
|
|
|
return all_landmarks, landmarks_constrained
|
|
|
|
def set_landmark_score(self, landmark: Landmark, landmarktype: str, preference_level: int) :
|
|
"""
|
|
Calculate and set the attractiveness score for a given landmark.
|
|
|
|
This method evaluates the landmark's attractiveness based on its properties
|
|
(number of tags, presence of Wikipedia URL, image, website, and whether it's
|
|
a place of worship) and adjusts the score using the user's preference level.
|
|
|
|
Args:
|
|
landmark (Landmark): The landmark object to score.
|
|
landmarktype (str): The type of the landmark (currently unused).
|
|
preference_level (int): The user's preference level for this landmark type.
|
|
"""
|
|
score = landmark.n_tags**self.tag_exponent
|
|
if landmark.wiki_url :
|
|
score *= self.wikipedia_bonus
|
|
if landmark.image_url :
|
|
score *= self.image_bonus
|
|
if landmark.website_url :
|
|
score *= self.wikipedia_bonus
|
|
if landmark.is_place_of_worship :
|
|
score *= self.church_coeff
|
|
if landmark.is_viewpoint :
|
|
score *= self.viewpoint_bonus
|
|
if landmarktype == 'nature' :
|
|
score *= self.nature_coeff
|
|
|
|
landmark.attractiveness = int(score * preference_level * 2)
|
|
|
|
|
|
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, preference_level: int) -> list[Landmark]:
|
|
"""
|
|
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
|
|
|
|
Args:
|
|
bbox (tuple[float, float, float, float]): The bounding box coordinates (around:radius, center_lat, center_lon).
|
|
amenity_selector (dict): The Overpass API query selector for the desired landmark type.
|
|
landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping').
|
|
|
|
Returns:
|
|
list[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria.
|
|
|
|
Notes:
|
|
- Landmarks are fetched using Overpass API queries.
|
|
- Selectors are translated from the dictionary to the Overpass query format. (e.g., 'amenity'='place_of_worship')
|
|
- Landmarks are filtered based on various conditions including tags and type.
|
|
"""
|
|
return_list = []
|
|
|
|
if landmarktype == 'nature' : query_conditions = None
|
|
else : query_conditions = ['count_tags()>5']
|
|
|
|
# caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously
|
|
# we need to split the selectors into separate queries and merge the results
|
|
for sel in dict_to_selector_list(amenity_selector):
|
|
# self.logger.debug(f"Current selector: {sel}")
|
|
|
|
osm_types = ['way', 'relation']
|
|
|
|
if 'viewpoint' in sel :
|
|
query_conditions = None
|
|
osm_types.append('node')
|
|
|
|
# Send the overpass query
|
|
try:
|
|
result = self.overpass.send_query(
|
|
bbox = bbox,
|
|
osm_types = osm_types,
|
|
selector = sel,
|
|
conditions = query_conditions, # except for nature....
|
|
out = 'ids center tags'
|
|
)
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching landmarks: {e}")
|
|
continue
|
|
|
|
return_list += self._to_landmarks(result, landmarktype, preference_level)
|
|
|
|
# self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
|
|
|
|
return return_list
|
|
|
|
|
|
def _to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]:
|
|
"""
|
|
Parse the Overpass API result and extract landmarks.
|
|
|
|
This method processes the JSON elements returned by the Overpass API and
|
|
extracts landmarks of types 'node', 'way', and 'relation'. It retrieves
|
|
relevant information such as name, coordinates, and tags, and converts them
|
|
into Landmark objects.
|
|
|
|
Args:
|
|
elements (list): The elements of json response from Overpass API.
|
|
elem_type (str): The type of landmark (e.g., node, way, relation).
|
|
|
|
Returns:
|
|
list[Landmark]: A list of Landmark objects extracted from the JSON data.
|
|
"""
|
|
if elements is None :
|
|
return []
|
|
|
|
landmarks = []
|
|
for elem in elements:
|
|
osm_type = elem.get('type')
|
|
|
|
id, coords, name = get_base_info(elem, osm_type, with_name=True)
|
|
|
|
if name is None or coords is None :
|
|
continue
|
|
|
|
tags = elem.get('tags')
|
|
|
|
# Convert this to Landmark object
|
|
landmark = Landmark(name=name,
|
|
type=landmarktype,
|
|
location=coords,
|
|
osm_id=id,
|
|
osm_type=osm_type,
|
|
attractiveness=0,
|
|
n_tags=len(tags))
|
|
|
|
# Browse through tags to add information to landmark.
|
|
for key, value in tags.items():
|
|
|
|
# Skip this landmark if not suitable.
|
|
if key == 'building:part' and value == 'yes' :
|
|
break
|
|
if 'disused:' in key :
|
|
break
|
|
if 'boundary:' in key :
|
|
break
|
|
if 'shop' in key and landmarktype != 'shopping' :
|
|
break
|
|
# if value == 'apartments' :
|
|
# break
|
|
|
|
# Fill in the other attributes.
|
|
if key == 'image' :
|
|
landmark.image_url = value
|
|
if key == 'website' :
|
|
landmark.website_url = value
|
|
if value == 'place_of_worship' :
|
|
landmark.is_place_of_worship = True
|
|
if key == 'wikipedia' :
|
|
landmark.wiki_url = value
|
|
if key == 'name:en' :
|
|
landmark.name_en = value
|
|
if 'building:' in key or 'pay' in key :
|
|
landmark.n_tags -= 1
|
|
|
|
|
|
# Set the duration.
|
|
if value in ['museum', 'aquarium', 'planetarium'] :
|
|
landmark.duration = 60
|
|
elif value == 'viewpoint' :
|
|
landmark.is_viewpoint = True
|
|
landmark.duration = 10
|
|
elif value == 'cathedral' :
|
|
landmark.is_place_of_worship = False
|
|
landmark.duration = 10
|
|
|
|
landmark.description, landmark.keywords = self.description_and_keywords(tags)
|
|
self.set_landmark_score(landmark, landmarktype, preference_level)
|
|
landmarks.append(landmark)
|
|
|
|
continue
|
|
|
|
|
|
return landmarks
|
|
|
|
|
|
def description_and_keywords(self, tags: dict):
|
|
# Extract relevant fields
|
|
name = tags.get('name')
|
|
importance = tags.get('importance', None)
|
|
n_visitors = tags.get('tourism:visitors', None)
|
|
height = tags.get('height')
|
|
place_type = self.get_place_type(tags)
|
|
date = self.get_date(tags)
|
|
|
|
if place_type is None :
|
|
return None, None
|
|
|
|
# Start the description.
|
|
if importance is None :
|
|
if len(tags.keys()) < 5 :
|
|
return None, None
|
|
if len(tags.keys()) < 10 :
|
|
description = f"{name} is a well known {place_type}."
|
|
|
|
if len(tags.keys()) < 17 :
|
|
importance = 'national'
|
|
description = f"{name} is a {place_type} of national importance."
|
|
else :
|
|
importance = 'international'
|
|
description = f"{name} is an internationally famous {place_type}."
|
|
else :
|
|
description = f"{name} is a {place_type} of {importance} importance."
|
|
|
|
if height is not None and date is not None :
|
|
description += f" This {place_type} was constructed in {date} and is ca. {height} meters high."
|
|
elif height is not None :
|
|
description += f" This {place_type} stands ca. {height} meters tall."
|
|
elif date is not None:
|
|
description += f" It was constructed in {date}."
|
|
|
|
# Format the visitor number
|
|
if n_visitors is not None :
|
|
n_visitors = int(n_visitors)
|
|
if n_visitors > 1000 :
|
|
description += f" {name} welcomes {int(n_visitors/1000)} thousand visitors every year."
|
|
if n_visitors > 1000000 :
|
|
description += f" {name} welcomes {round(n_visitors/1000000, 1)} millionvisitors every year."
|
|
|
|
# Set the keywords.
|
|
keywords = {"importance": importance,
|
|
"height": height,
|
|
"place_type": place_type,
|
|
"date": date}
|
|
|
|
return description, keywords
|
|
|
|
|
|
def get_place_type(self, data):
|
|
amenity = data.get('amenity', None)
|
|
building = data.get('building', None)
|
|
historic = data.get('historic', None)
|
|
leisure = data.get('leisure')
|
|
|
|
if historic and historic != "yes":
|
|
return historic
|
|
if building and building not in ["yes", "civic", "government", "apartments", "residential", "commericial", "industrial", "retail", "religious", "public", "service"]:
|
|
return building
|
|
if amenity:
|
|
return amenity
|
|
if leisure:
|
|
return leisure
|
|
|
|
|
|
return None
|
|
|
|
|
|
def get_date(self, data):
|
|
construction_date = data.get('construction_date', None)
|
|
opening_date = data.get('opening_date', None)
|
|
start_date = data.get('start_date', None)
|
|
year_of_construction = data.get('year_of_construction', None)
|
|
|
|
# Prioritize based on availability
|
|
if construction_date:
|
|
return construction_date
|
|
if start_date:
|
|
return start_date
|
|
if year_of_construction:
|
|
return year_of_construction
|
|
if opening_date:
|
|
return opening_date
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def dict_to_selector_list(d: dict) -> list:
|
|
"""
|
|
Convert a dictionary of key-value pairs to a list of Overpass query strings.
|
|
|
|
Args:
|
|
d (dict): A dictionary of key-value pairs representing the selector.
|
|
|
|
Returns:
|
|
list: A list of strings representing the Overpass query selectors.
|
|
"""
|
|
return_list = []
|
|
for key, value in d.items():
|
|
if isinstance(value, list):
|
|
val = '|'.join(value)
|
|
return_list.append(f'{key}~"^({val})$"')
|
|
elif isinstance(value, str) and len(value) == 0:
|
|
return_list.append(f'{key}')
|
|
else:
|
|
return_list.append(f'{key}={value}')
|
|
return return_list
|
|
|
|
|