import math import yaml import logging from OSMPythonTools.overpass import Overpass, overpassQueryBuilder from OSMPythonTools.cachingStrategy import CachingStrategy, JSON from ..structs.preferences import Preferences from ..structs.landmark import Landmark from .take_most_important import take_most_important from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH, OSM_CACHE_DIR # silence the overpass logger logging.getLogger('OSMPythonTools').setLevel(level=logging.CRITICAL) class LandmarkManager: logger = logging.getLogger(__name__) radius_close_to: int # radius in meters church_coeff: float # coeff to adjsut score of churches nature_coeff: float # coeff to adjust score of parks overall_coeff: float # coeff to adjust weight of tags N_important: int # number of important landmarks to consider def __init__(self) -> None: with AMENITY_SELECTORS_PATH.open('r') as f: self.amenity_selectors = yaml.safe_load(f) with LANDMARK_PARAMETERS_PATH.open('r') as f: parameters = yaml.safe_load(f) self.max_bbox_side = parameters['city_bbox_side'] self.radius_close_to = parameters['radius_close_to'] self.church_coeff = parameters['church_coeff'] self.nature_coeff = parameters['nature_coeff'] self.overall_coeff = parameters['overall_coeff'] self.tag_exponent = parameters['tag_exponent'] self.image_bonus = parameters['image_bonus'] self.name_bonus = parameters['name_bonus'] self.wikipedia_bonus = parameters['wikipedia_bonus'] self.viewpoint_bonus = parameters['viewpoint_bonus'] self.pay_bonus = parameters['pay_bonus'] self.N_important = parameters['N_important'] with OPTIMIZER_PARAMETERS_PATH.open('r') as f: parameters = yaml.safe_load(f) self.walking_speed = parameters['average_walking_speed'] self.detour_factor = parameters['detour_factor'] self.overpass = Overpass() CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR) def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) -> tuple[list[Landmark], list[Landmark]]: """ Generate and prioritize a list of landmarks based on user preferences. This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important landmarks based on a predefined criterion. Parameters: center_coordinates (tuple[float, float]): The latitude and longitude of the center location around which to search. preferences (Preferences): The user's preference settings that influence the landmark selection. Returns: tuple[list[Landmark], list[Landmark]]: - A list of all existing landmarks. - A list of the most important landmarks based on the user's preferences. """ max_walk_dist = (preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor reachable_bbox_side = min(max_walk_dist, self.max_bbox_side) # use set to avoid duplicates, this requires some __methods__ to be set in Landmark all_landmarks = set() bbox = self.create_bbox(center_coordinates, reachable_bbox_side) # list for sightseeing if preferences.sightseeing.score != 0: score_function = lambda score: score * 10 * preferences.sightseeing.score / 5 current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, score_function) all_landmarks.update(current_landmarks) # list for nature if preferences.nature.score != 0: score_function = lambda score: score * 10 * self.nature_coeff * preferences.nature.score / 5 current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, score_function) all_landmarks.update(current_landmarks) # list for shopping if preferences.shopping.score != 0: score_function = lambda score: score * 10 * preferences.shopping.score / 5 current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, score_function) # set time for all shopping activites : for landmark in current_landmarks : landmark.duration = 45 all_landmarks.update(current_landmarks) landmarks_constrained = take_most_important(all_landmarks, self.N_important) self.logger.info(f'Generated {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.') return all_landmarks, landmarks_constrained def count_elements_close_to(self, coordinates: tuple[float, float]) -> int: """ Count the number of OpenStreetMap elements (nodes, ways, relations) within a specified radius of the given location. This function constructs a bounding box around the specified coordinates based on the radius. It then queries OpenStreetMap data to count the number of elements within that bounding box. Args: coordinates (tuple[float, float]): The latitude and longitude of the location to search around. Returns: int: The number of elements (nodes, ways, relations) within the specified radius. Returns 0 if no elements are found or if an error occurs during the query. """ lat = coordinates[0] lon = coordinates[1] radius = self.radius_close_to alpha = (180 * radius) / (6371000 * math.pi) bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha} # Build the query to find elements within the radius radius_query = overpassQueryBuilder( bbox=[bbox['latLower'], bbox['lonLower'], bbox['latHigher'], bbox['lonHigher']], elementType=['node', 'way', 'relation'] ) try: radius_result = self.overpass.query(radius_query) N_elem = radius_result.countWays() + radius_result.countRelations() self.logger.debug(f"There are {N_elem} ways/relations within 50m") if N_elem is None: return 0 return N_elem except: return 0 def create_bbox(self, coordinates: tuple[float, float], reachable_bbox_side: int) -> tuple[float, float, float, float]: """ Create a bounding box around the given coordinates. Args: coordinates (tuple[float, float]): The latitude and longitude of the center of the bounding box. reachable_bbox_side (int): The side length of the bounding box in meters. Returns: tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude defining the bounding box. """ lat = coordinates[0] lon = coordinates[1] # Half the side length in km (since it's a square bbox) half_side_length_km = reachable_bbox_side / 2 / 1000 # Convert distance to degrees lat_diff = half_side_length_km / 111 # 1 degree latitude is approximately 111 km lon_diff = half_side_length_km / (111 * math.cos(math.radians(lat))) # Adjust for longitude based on latitude # Calculate bbox min_lat = lat - lat_diff max_lat = lat + lat_diff min_lon = lon - lon_diff max_lon = lon + lon_diff return min_lat, min_lon, max_lat, max_lon def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, score_function: callable) -> list[Landmark]: """ Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates. Args: bbox (tuple[float, float, float, float]): The bounding box coordinates (min_lat, min_lon, max_lat, max_lon). amenity_selector (dict): The Overpass API query selector for the desired landmark type. landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping'). score_function (callable): The function to compute the score of the landmark based on its attributes. Returns: list[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria. Notes: - Landmarks are fetched using Overpass API queries. - Selectors are translated from the dictionary to the Overpass query format. (e.g., 'amenity'='place_of_worship') - Landmarks are filtered based on various conditions including tags and type. - Scores are assigned to landmarks based on their attributes and surrounding elements. """ return_list = [] if landmarktype == 'nature' : query_conditions = [] else : query_conditions = ['count_tags()>5'] # caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously # we need to split the selectors into separate queries and merge the results for sel in dict_to_selector_list(amenity_selector): self.logger.debug(f"Current selector: {sel}") query_conditions = ['count_tags()>5'] element_types = ['way', 'relation'] if 'viewpoint' in sel : query_conditions = [] element_types.append('node') query = overpassQueryBuilder( bbox = bbox, elementType = element_types, # selector can in principle be a list already, # but it generates the intersection of the queries # we want the union selector = sel, conditions = query_conditions, # except for nature.... includeCenter = True, out = 'body' ) self.logger.debug(f"Query: {query}") try: result = self.overpass.query(query) except Exception as e: self.logger.error(f"Error fetching landmarks: {e}") continue for elem in result.elements(): name = elem.tag('name') location = (elem.centerLat(), elem.centerLon()) osm_type = elem.type() # Add type: 'way' or 'relation' osm_id = elem.id() # Add OSM id # TODO: exclude these from the get go # handle unprecise and no-name locations if name is None or location[0] is None: if osm_type == 'node' and 'viewpoint' in elem.tags().values(): name = 'Viewpoint' name_en = 'Viewpoint' location = (elem.lat(), elem.lon()) else : continue # skip if part of another building if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes': continue elem_type = landmarktype # Add the landmark type as 'sightseeing, n_tags = len(elem.tags().keys()) # Add number of tags score = n_tags**self.tag_exponent # Add score website_url = None image_url = None name_en = None # Adjust scoring, browse through tag keys skip = False for tag_key in elem.tags().keys(): if "pay" in tag_key: # payment options are misleading and should not count for the scoring. score += self.pay_bonus if "disused" in tag_key: # skip disused amenities skip = True break if "name" in tag_key : score += self.name_bonus if "wiki" in tag_key: # wikipedia entries count more score += self.wikipedia_bonus if "image" in tag_key: # images must count more score += self.image_bonus if elem_type != "nature": if "leisure" in tag_key and elem.tag('leisure') == "park": elem_type = "nature" if landmarktype != "shopping": if "shop" in tag_key: skip = True break if tag_key == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']: skip = True break # Extract image, website and english name if tag_key in ['website', 'contact:website']: website_url = elem.tag(tag_key) if tag_key == 'image': image_url = elem.tag('image') if tag_key =='name:en': name_en = elem.tag('name:en') if skip: continue # Don't visit random apartments if 'apartments' in elem.tags().values(): continue score = score_function(score) if "place_of_worship" in elem.tags().values(): score = score * self.church_coeff duration = 10 if 'viewpoint' in elem.tags().values() : # viewpoints must count more score += self.viewpoint_bonus duration = 10 elif "museum" in elem.tags().values() or "aquarium" in elem.tags().values() or "planetarium" in elem.tags().values(): duration = 60 else: duration = 5 # finally create our own landmark object landmark = Landmark( name = name, type = elem_type, location = location, osm_type = osm_type, osm_id = osm_id, attractiveness = int(score), must_do = False, n_tags = int(n_tags), duration = int(duration), name_en = name_en, image_url = image_url, website_url = website_url ) return_list.append(landmark) self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}") return return_list def dict_to_selector_list(d: dict) -> list: """ Convert a dictionary of key-value pairs to a list of Overpass query strings. Args: d (dict): A dictionary of key-value pairs representing the selector. Returns: list: A list of strings representing the Overpass query selectors. """ return_list = [] for key, value in d.items(): if type(value) == list: val = '|'.join(value) return_list.append(f'{key}~"^({val})$"') elif type(value) == str and len(value) == 0: return_list.append(f'{key}') else: return_list.append(f'{key}={value}') return return_list