"""Module used to import data from OSM and arrange them in categories.""" import logging import yaml from ..structs.preferences import Preferences from ..structs.landmark import Landmark from ..utils.take_most_important import take_most_important from .cluster_manager import ClusterManager from ..overpass.overpass import Overpass, get_base_info from ..utils.bbox import create_bbox from ..constants import AMENITY_SELECTORS_PATH, LANDMARK_PARAMETERS_PATH, OPTIMIZER_PARAMETERS_PATH class LandmarkManager: """ Use this to manage landmarks. Uses the overpass api to fetch landmarks and classify them. """ logger = logging.getLogger(__name__) radius_close_to: int # radius in meters church_coeff: float # coeff to adjsut score of churches nature_coeff: float # coeff to adjust score of parks overall_coeff: float # coeff to adjust weight of tags n_important: int # number of important landmarks to consider def __init__(self) -> None: with AMENITY_SELECTORS_PATH.open('r') as f: self.amenity_selectors = yaml.safe_load(f) with LANDMARK_PARAMETERS_PATH.open('r') as f: parameters = yaml.safe_load(f) self.max_bbox_side = parameters['max_bbox_side'] self.church_coeff = parameters['church_coeff'] self.nature_coeff = parameters['nature_coeff'] self.overall_coeff = parameters['overall_coeff'] self.tag_exponent = parameters['tag_exponent'] self.image_bonus = parameters['image_bonus'] self.wikipedia_bonus = parameters['wikipedia_bonus'] self.viewpoint_bonus = parameters['viewpoint_bonus'] self.pay_bonus = parameters['pay_bonus'] self.n_important = parameters['N_important'] with OPTIMIZER_PARAMETERS_PATH.open('r') as f: parameters = yaml.safe_load(f) self.walking_speed = parameters['average_walking_speed'] self.detour_factor = parameters['detour_factor'] # Setup the caching in the Overpass class. self.overpass = Overpass() self.logger.info('LandmakManager successfully initialized.') def generate_landmarks_list(self, center_coordinates: tuple[float, float], preferences: Preferences) -> tuple[list[Landmark], list[Landmark]]: """ Generate and prioritize a list of landmarks based on user preferences. This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important landmarks based on a predefined criterion. Args: center_coordinates (tuple[float, float]): The latitude and longitude of the center location around which to search. preferences (Preferences): The user's preference settings that influence the landmark selection. Returns: tuple[list[Landmark], list[Landmark]]: - A list of all existing landmarks. - A list of the most important landmarks based on the user's preferences. """ self.logger.debug('Starting to fetch landmarks...') max_walk_dist = int((preferences.max_time_minute/2)/60*self.walking_speed*1000/self.detour_factor) radius = min(max_walk_dist, int(self.max_bbox_side/2)) # use set to avoid duplicates, this requires some __methods__ to be set in Landmark all_landmarks = set() # Create a bbox using the around technique, tuple of strings bbox = create_bbox(center_coordinates, radius) # list for sightseeing if preferences.sightseeing.score != 0: self.logger.debug('Fetching sightseeing landmarks...') current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['sightseeing'], preferences.sightseeing.type, preferences.sightseeing.score) all_landmarks.update(current_landmarks) self.logger.info(f'Found {len(current_landmarks)} sightseeing landmarks') # special pipeline for historic neighborhoods neighborhood_manager = ClusterManager(bbox, 'sightseeing') historic_clusters = neighborhood_manager.generate_clusters() all_landmarks.update(historic_clusters) # list for nature if preferences.nature.score != 0: self.logger.debug('Fetching nature landmarks...') current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['nature'], preferences.nature.type, preferences.nature.score) all_landmarks.update(current_landmarks) self.logger.info(f'Found {len(current_landmarks)} nature landmarks') # list for shopping if preferences.shopping.score != 0: self.logger.debug('Fetching shopping landmarks...') current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, preferences.shopping.score) self.logger.info(f'Found {len(current_landmarks)} shopping landmarks') # set time for all shopping activites : for landmark in current_landmarks : landmark.duration = 30 all_landmarks.update(current_landmarks) # special pipeline for shopping malls shopping_manager = ClusterManager(bbox, 'shopping') shopping_clusters = shopping_manager.generate_clusters() all_landmarks.update(shopping_clusters) landmarks_constrained = take_most_important(all_landmarks, self.n_important) # self.logger.info(f'All landmarks generated : {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.') return all_landmarks, landmarks_constrained def set_landmark_score(self, landmark: Landmark, landmarktype: str, preference_level: int) : """ Calculate and set the attractiveness score for a given landmark. This method evaluates the landmark's attractiveness based on its properties (number of tags, presence of Wikipedia URL, image, website, and whether it's a place of worship) and adjusts the score using the user's preference level. Args: landmark (Landmark): The landmark object to score. landmarktype (str): The type of the landmark (currently unused). preference_level (int): The user's preference level for this landmark type. """ score = landmark.n_tags**self.tag_exponent if landmark.wiki_url : score *= self.wikipedia_bonus if landmark.image_url : score *= self.image_bonus if landmark.website_url : score *= self.wikipedia_bonus if landmark.is_place_of_worship : score *= self.church_coeff if landmark.is_viewpoint : score *= self.viewpoint_bonus if landmarktype == 'nature' : score *= self.nature_coeff landmark.attractiveness = int(score * preference_level * 2) def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, preference_level: int) -> list[Landmark]: """ Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates. Args: bbox (tuple[float, float, float, float]): The bounding box coordinates (around:radius, center_lat, center_lon). amenity_selector (dict): The Overpass API query selector for the desired landmark type. landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping'). Returns: list[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria. Notes: - Landmarks are fetched using Overpass API queries. - Selectors are translated from the dictionary to the Overpass query format. (e.g., 'amenity'='place_of_worship') - Landmarks are filtered based on various conditions including tags and type. """ return_list = [] if landmarktype == 'nature' : query_conditions = None else : query_conditions = ['count_tags()>5'] # caution, when applying a list of selectors, overpass will search for elements that match ALL selectors simultaneously # we need to split the selectors into separate queries and merge the results for sel in dict_to_selector_list(amenity_selector): # self.logger.debug(f"Current selector: {sel}") osm_types = ['way', 'relation'] if 'viewpoint' in sel : query_conditions = None osm_types.append('node') # Send the overpass query try: result = self.overpass.send_query( bbox = bbox, osm_types = osm_types, selector = sel, conditions = query_conditions, # except for nature.... out = 'ids center tags' ) except Exception as e: self.logger.error(f"Error fetching landmarks: {str(e)}") continue return_list += self._to_landmarks(result, landmarktype, preference_level) # self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}") return return_list def _to_landmarks(self, elements: list, landmarktype, preference_level) -> list[Landmark]: """ Parse the Overpass API result and extract landmarks. This method processes the JSON elements returned by the Overpass API and extracts landmarks of types 'node', 'way', and 'relation'. It retrieves relevant information such as name, coordinates, and tags, and converts them into Landmark objects. Args: elements (list): The elements of json response from Overpass API. elem_type (str): The type of landmark (e.g., node, way, relation). Returns: list[Landmark]: A list of Landmark objects extracted from the JSON data. """ if elements is None : return [] landmarks = [] for elem in elements: osm_type = elem.get('type') id, coords, name = get_base_info(elem, osm_type, with_name=True) if name is None or coords is None : continue tags = elem.get('tags') # Convert this to Landmark object landmark = Landmark(name=name, type=landmarktype, location=coords, osm_id=id, osm_type=osm_type, attractiveness=0, n_tags=len(tags)) # Browse through tags to add information to landmark. for key, value in tags.items(): # Skip this landmark if not suitable. if key == 'building:part' and value == 'yes' : break if 'disused:' in key : break if 'boundary:' in key : break if 'shop' in key and landmarktype != 'shopping' : break # if value == 'apartments' : # break # Fill in the other attributes. if key == 'image' : landmark.image_url = value if key == 'website' : landmark.website_url = value if value == 'place_of_worship' : landmark.is_place_of_worship = True if key == 'wikipedia' : landmark.wiki_url = value if key == 'name:en' : landmark.name_en = value if 'building:' in key or 'pay' in key : landmark.n_tags -= 1 # Set the duration. if value in ['museum', 'aquarium', 'planetarium'] : landmark.duration = 60 elif value == 'viewpoint' : landmark.is_viewpoint = True landmark.duration = 10 elif value == 'cathedral' : landmark.is_place_of_worship = False landmark.duration = 10 landmark.description, landmark.keywords = self.description_and_keywords(tags) self.set_landmark_score(landmark, landmarktype, preference_level) landmarks.append(landmark) continue return landmarks def description_and_keywords(self, tags: dict): """ """ # Extract relevant fields name = tags.get('name') importance = tags.get('importance', None) n_visitors = tags.get('tourism:visitors', None) height = tags.get('height') place_type = self.get_place_type(tags) date = self.get_date(tags) if place_type is None : return None, None # Start the description. if importance is None : if len(tags.keys()) < 5 : return None, None elif len(tags.keys()) < 10 : description = f"{name} is a well known {place_type}." elif len(tags.keys()) < 17 : importance = 'national' description = f"{name} is a {place_type} of national importance." else : importance = 'international' description = f"{name} is an internationally famous {place_type}." else : description = f"{name} is a {place_type} of {importance} importance." if height is not None and date is not None : description += f" This {place_type} was constructed in {date} and is ca. {height} meters high." elif height is not None : description += f" This {place_type} stands ca. {height} meters tall." elif date is not None: description += f" It was constructed in {date}." # Format the visitor number if n_visitors is not None : n_visitors = int(n_visitors) if n_visitors < 1000000 : description += f" It welcomes {int(n_visitors/1000)} thousand visitors every year." else : description += f" It welcomes {round(n_visitors/1000000, 1)} million visitors every year." # Set the keywords. keywords = {"importance": importance, "height": height, "place_type": place_type, "date": date} return description, keywords def get_place_type(self, data): amenity = data.get('amenity', None) building = data.get('building', None) historic = data.get('historic', None) leisure = data.get('leisure') if historic and historic != "yes": return historic if building and building not in ["yes", "civic", "government", "apartments", "residential", "commericial", "industrial", "retail", "religious", "public", "service"]: return building if amenity: return amenity if leisure: return leisure return None def get_date(self, data): construction_date = data.get('construction_date', None) opening_date = data.get('opening_date', None) start_date = data.get('start_date', None) year_of_construction = data.get('year_of_construction', None) # Prioritize based on availability if construction_date: return construction_date if start_date: return start_date if year_of_construction: return year_of_construction if opening_date: return opening_date return None def dict_to_selector_list(d: dict) -> list: """ Convert a dictionary of key-value pairs to a list of Overpass query strings. Args: d (dict): A dictionary of key-value pairs representing the selector. Returns: list: A list of strings representing the Overpass query selectors. """ return_list = [] for key, value in d.items(): if isinstance(value, list): val = '|'.join(value) return_list.append(f'{key}~"^({val})$"') elif isinstance(value, str) and len(value) == 0: return_list.append(f'{key}') else: return_list.append(f'{key}={value}') return return_list