diff --git a/backend/src/landmarks_manager.py b/backend/src/landmarks_manager.py index 9d4ce9b..53fa012 100644 --- a/backend/src/landmarks_manager.py +++ b/backend/src/landmarks_manager.py @@ -1,308 +1,326 @@ import math as m -import json, os +import yaml +import logging -from typing import List, Tuple, Optional +from typing import List, Tuple from OSMPythonTools.overpass import Overpass, overpassQueryBuilder +from OSMPythonTools import cachingStrategy from pywikibot import ItemPage, Site from pywikibot import config config.put_throttle = 0 config.maxlag = 0 -from structs.landmarks import Landmark, LandmarkType from structs.preferences import Preferences, Preference +from structs.landmarks import Landmark +from utils import take_most_important +import constants -SIGHTSEEING = LandmarkType(landmark_type='sightseeing') -NATURE = LandmarkType(landmark_type='nature') -SHOPPING = LandmarkType(landmark_type='shopping') +SIGHTSEEING = 'sightseeing' +NATURE = 'nature' +SHOPPING = 'shopping' -# Include the json here -# Create a list of all things to visit given some preferences and a city. Ready for the optimizer -def generate_landmarks(preferences: Preferences, coordinates: Tuple[float, float]) : - l_sights, l_nature, l_shop = get_amenities() - L = [] +class LandmarkManager: - # List for sightseeing - if preferences.sightseeing.score != 0 : - L1 = get_landmarks(l_sights, SIGHTSEEING, coordinates=coordinates) - correct_score(L1, preferences.sightseeing) - L += L1 - - # List for nature - if preferences.nature.score != 0 : - L2 = get_landmarks(l_nature, NATURE, coordinates=coordinates) - correct_score(L2, preferences.nature) - L += L2 - - # List for shopping - if preferences.shopping.score != 0 : - L3 = get_landmarks(l_shop, SHOPPING, coordinates=coordinates) - correct_score(L3, preferences.shopping) - L += L3 + logger = logging.getLogger(__name__) - L = remove_duplicates(L) + city_bbox_side: int # bbox side in meters + radius_close_to: int # radius in meters + church_coeff: float # coeff to adjsut score of churches + park_coeff: float # coeff to adjust score of parks + tag_coeff: float # coeff to adjust weight of tags + N_important: int # number of important landmarks to consider - return L, take_most_important(L) + preferences: Preferences # preferences of visit + location: Tuple[float] # coordinates around which to find a path + def __init__(self, preferences: Preferences, coordinates: Tuple[float, float]) -> None: -# Helper function to gather the amenities list -def get_amenities() -> List[List[str]] : - - # Get the list of amenities from the files - sightseeing = get_list('/amenities/sightseeing.am') - nature = get_list('/amenities/nature.am') - shopping = get_list('/amenities/shopping.am') + with constants.AMENITY_SELECTORS_PATH.open('r') as f: + self.amenity_selectors = yaml.safe_load(f) - return sightseeing, nature, shopping - - -# Helper function to read a .am file and generate the corresponding list -def get_list(path: str) -> List[str] : - - with open(os.path.dirname(os.path.abspath(__file__)) + path) as f : - content = f.readlines() - - amenities = [] - for line in content : - if not line.startswith('#') : - amenities.append(line.strip('\n')) - - return amenities - - -# Take the most important landmarks from the list -def take_most_important(L: List[Landmark], N = 0) -> List[Landmark] : - - # Read the parameters from the file - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f : - parameters = json.loads(f.read()) - N_important = parameters['N important'] - - L_copy = [] - L_clean = [] - scores = [0]*len(L) - names = [] - name_id = {} - - for i, elem in enumerate(L) : - if elem.name not in names : - names.append(elem.name) - name_id[elem.name] = [i] - L_copy.append(elem) - else : - name_id[elem.name] += [i] - scores = [] - for j in name_id[elem.name] : - scores.append(L[j].attractiveness) - best_id = max(range(len(scores)), key=scores.__getitem__) - t = name_id[elem.name][best_id] - if t == i : - for old in L_copy : - if old.name == elem.name : - old.attractiveness = L[t].attractiveness - - scores = [0]*len(L_copy) - for i, elem in enumerate(L_copy) : - scores[i] = elem.attractiveness - - res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-(N_important-N):] - - for i, elem in enumerate(L_copy) : - if i in res : - L_clean.append(elem) - - return L_clean - - -# Remove duplicate elements and elements with low score -def remove_duplicates(L: List[Landmark]) -> List[Landmark] : - """ - Removes duplicate landmarks based on their names from the given list. - - Parameters: - L (List[Landmark]): A list of Landmark objects. - - Returns: - List[Landmark]: A list of unique Landmark objects based on their names. - """ - - L_clean = [] - names = [] - - for landmark in L : - if landmark.name in names: - continue - - - else : - names.append(landmark.name) - L_clean.append(landmark) - - return L_clean - - -# Correct the score of a list of landmarks by taking into account preference settings -def correct_score(L: List[Landmark], preference: Preference) : - - if len(L) == 0 : - return - - if L[0].type != preference.type : - raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {L[0].name}") - - for elem in L : - elem.attractiveness = int(elem.attractiveness*preference.score/5) # arbitrary computation - - -# Function to count elements within a certain radius of a location -def count_elements_within_radius(coordinates: Tuple[float, float], radius: int) -> int: - - lat = coordinates[0] - lon = coordinates[1] - - alpha = (180*radius)/(6371000*m.pi) - bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha} + with constants.LANDMARK_PARAMETERS_PATH.open('r') as f: + parameters = yaml.safe_load(f) + self.city_bbox_side = parameters['city_bbox_side'] + self.radius_close_to = parameters['radius_close_to'] + self.church_coeff = parameters['church_coeff'] + self.park_coeff = parameters['park_coeff'] + self.tag_coeff = parameters['tag_coeff'] + self.N_important = parameters['N_important'] - # Build the query to find elements within the radius - radius_query = overpassQueryBuilder(bbox=[bbox['latLower'],bbox['lonLower'],bbox['latHigher'],bbox['lonHigher']], - elementType=['node', 'way', 'relation']) - - try : - overpass = Overpass() - radius_result = overpass.query(radius_query) - N_elem = radius_result.countWays() + radius_result.countRelations() - #print(f"There are {N_elem} ways/relations within 50m") - if N_elem is None : - return 0 - return N_elem - - except : - return 0 + self.preferences = preferences + self.location = coordinates -# Creates a bounding box around given coordinates, side_length in meters -def create_bbox(coordinates: Tuple[float, float], side_length: int) -> Tuple[float, float, float, float]: - - lat = coordinates[0] - lon = coordinates[1] + def generate_landmarks_list(self) -> Tuple[List[Landmark], List[Landmark]] : + """ + Generate and prioritize a list of landmarks based on user preferences. - # Half the side length in km (since it's a square bbox) - half_side_length_km = side_length / 2 / 1000 + This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences + and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important + landmarks based on a predefined criterion. - # Convert distance to degrees - lat_diff = half_side_length_km / 111 # 1 degree latitude is approximately 111 km - lon_diff = half_side_length_km / (111 * m.cos(m.radians(lat))) # Adjust for longitude based on latitude + Returns: + Tuple[List[Landmark], List[Landmark]]: + - A list of all existing landmarks. + - A list of the most important landmarks based on the user's preferences. + """ - # Calculate bbox - min_lat = lat - lat_diff - max_lat = lat + lat_diff - min_lon = lon - lon_diff - max_lon = lon + lon_diff + L = [] - return min_lat, min_lon, max_lat, max_lon - - -def get_landmarks(list_amenity: list, landmarktype: LandmarkType, coordinates: Tuple[float, float]) -> List[Landmark] : - - # Read the parameters from the file - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f : - parameters = json.loads(f.read()) - tag_coeff = parameters['tag coeff'] - park_coeff = parameters['park coeff'] - church_coeff = parameters['church coeff'] - radius = parameters['radius close to'] - bbox_side = parameters['city bbox side'] - - # Create bbox around start location - bbox = create_bbox(coordinates, bbox_side) - - # Initialize some variables - N = 0 - L = [] - overpass = Overpass() - - for amenity in list_amenity : - query = overpassQueryBuilder(bbox=bbox, elementType=['way', 'relation'], selector=amenity, includeCenter=True, out='body') - result = overpass.query(query) - N += result.countElements() - - for elem in result.elements(): - - name = elem.tag('name') # Add name - location = (elem.centerLat(), elem.centerLon()) # Add coordinates (lat, lon) - - # skip if unprecise location - if name is None or location[0] is None: - continue - - # skip if unused - if 'disused:leisure' in elem.tags().keys(): - continue - - # skip if part of another building - if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes': - continue + # List for sightseeing + if self.preferences.sightseeing.score != 0 : + L1 = self.fetch_landmarks(self.amenity_selectors['sightseeing'], SIGHTSEEING, coordinates=self.location) + self.correct_score(L1, self.preferences.sightseeing) + L += L1 + # List for nature + if self.preferences.nature.score != 0 : + L2 = self.fetch_landmarks(self.amenity_selectors['nature'], NATURE, coordinates=self.location) + self.correct_score(L2, self.preferences.nature) + L += L2 + + # List for shopping + if self.preferences.shopping.score != 0 : + L3 = self.fetch_landmarks(self.amenity_selectors['shopping'], SHOPPING, coordinates=self.location) + self.correct_score(L3, self.preferences.shopping) + L += L3 + + L = self.remove_duplicates(L) + + return L, take_most_important(L, self.N_important) + + + def remove_duplicates(self, landmarks: List[Landmark]) -> List[Landmark] : + """ + Removes duplicate landmarks based on their names from the given list. Only retains the landmark with highest score + + Parameters: + landmarks (List[Landmark]): A list of Landmark objects. + + Returns: + List[Landmark]: A list of unique Landmark objects based on their names. + """ + + L_clean = [] + names = [] + + for landmark in landmarks : + if landmark.name in names: + continue else : - osm_type = elem.type() # Add type : 'way' or 'relation' - osm_id = elem.id() # Add OSM id - elem_type = landmarktype # Add the landmark type as 'sightseeing, - n_tags = len(elem.tags().keys()) # Add number of tags + names.append(landmark.name) + L_clean.append(landmark) + + return L_clean + - # remove specific tags - skip = False - for tag in elem.tags().keys() : - if "pay" in tag : - n_tags -= 1 # discard payment options for tags + def correct_score(self, landmarks: List[Landmark], preference: Preference) : + """ + Adjust the attractiveness score of each landmark in the list based on user preferences. - if "disused" in tag : - skip = True # skip disused amenities - break + This method updates the attractiveness of each landmark by scaling it according to the user's preference score. + The score adjustment is computed using a simple linear transformation based on the preference score. - if "wikipedia" in tag : - n_tags += 3 # wikipedia entries count more + Args: + landmarks (List[Landmark]): A list of landmarks whose scores need to be corrected. + preference (Preference): The user's preference settings that influence the attractiveness score adjustment. - if tag == "wikidata" : - Q = elem.tag('wikidata') - site = Site("wikidata", "wikidata") - item = ItemPage(site, Q) - item.get() - n_languages = len(item.labels) - n_tags += n_languages/10 + Raises: + TypeError: If the type of any landmark in the list does not match the expected type in the preference. + """ - if elem_type != LandmarkType(landmark_type="nature") : - if "leisure" in tag and elem.tag('leisure') == "park": - elem_type = LandmarkType(landmark_type="nature") + if len(landmarks) == 0 : + return + + if landmarks[0].type != preference.type : + raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {landmarks[0].name}") - if amenity not in ["'shop'='department_store'", "'shop'='mall'"] : - if "shop" in tag : - skip = True - break + for elem in landmarks : + elem.attractiveness = int(elem.attractiveness*preference.score/5) # arbitrary computation - if tag == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']: - skip = True - break - if skip: + def count_elements_close_to(self, coordinates: Tuple[float, float]) -> int: + """ + Count the number of OpenStreetMap elements (nodes, ways, relations) within a specified radius of the given location. + + This function constructs a bounding box around the specified coordinates based on the radius. It then queries + OpenStreetMap data to count the number of elements within that bounding box. + + Args: + coordinates (Tuple[float, float]): The latitude and longitude of the location to search around. + + Returns: + int: The number of elements (nodes, ways, relations) within the specified radius. Returns 0 if no elements + are found or if an error occurs during the query. + """ + + lat = coordinates[0] + lon = coordinates[1] + + radius = self.radius_close_to + + alpha = (180*radius) / (6371000*m.pi) + bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha} + + # Build the query to find elements within the radius + radius_query = overpassQueryBuilder(bbox=[bbox['latLower'],bbox['lonLower'],bbox['latHigher'],bbox['lonHigher']], + elementType=['node', 'way', 'relation']) + + try : + overpass = Overpass() + radius_result = overpass.query(radius_query) + N_elem = radius_result.countWays() + radius_result.countRelations() + #print(f"There are {N_elem} ways/relations within 50m") + if N_elem is None : + return 0 + return N_elem + except : + return 0 + + + def create_bbox(self, coordinates: Tuple[float, float]) -> Tuple[float, float, float, float]: + """ + Create a bounding box around the given coordinates. + + Args: + coordinates (Tuple[float, float]): The latitude and longitude of the center of the bounding box. + + Returns: + Tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude + defining the bounding box. + """ + + lat = coordinates[0] + lon = coordinates[1] + + # Half the side length in km (since it's a square bbox) + half_side_length_km = self.city_bbox_side / 2 / 1000 + + # Convert distance to degrees + lat_diff = half_side_length_km / 111 # 1 degree latitude is approximately 111 km + lon_diff = half_side_length_km / (111 * m.cos(m.radians(lat))) # Adjust for longitude based on latitude + + # Calculate bbox + min_lat = lat - lat_diff + max_lat = lat + lat_diff + min_lon = lon - lon_diff + max_lon = lon + lon_diff + + return min_lat, min_lon, max_lat, max_lon + + + def fetch_landmarks(self, list_amenity: list, landmarktype: str, coordinates: Tuple[float, float]) -> List[Landmark] : + """ + Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates. + + Args: + list_amenity (list): A list of OSM amenity queries to be used for fetching landmarks. + These queries are typically used to filter results (e.g., [''amenity'='place_of_worship']). + landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping'). + coordinates (Tuple[float, float]): The central coordinates (latitude, longitude) for the bounding box. + + Returns: + List[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria. + + Notes: + - The bounding box is created around the given coordinates with a side length defined by `self.city_bbox_side`. + - Landmarks are fetched using Overpass API queries. + - Landmarks are filtered based on various conditions including tags and type. + - Scores are assigned to landmarks based on their attributes and surrounding elements. + """ + + # Create bbox around start location + bbox = self.create_bbox(coordinates) + + # Initialize some variables + N = 0 + L = [] + overpass = Overpass() + + for amenity in list_amenity : + query = overpassQueryBuilder(bbox=bbox, elementType=['way', 'relation'], selector=amenity, includeCenter=True, out='body') + result = overpass.query(query) + N += result.countElements() + + for elem in result.elements(): + + name = elem.tag('name') # Add name + location = (elem.centerLat(), elem.centerLon()) # Add coordinates (lat, lon) + + # skip if unprecise location + if name is None or location[0] is None: continue - # Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT - if amenity == "'amenity'='place_of_worship'" : - #score = int((count_elements_within_radius(location, radius) + (n_tags*tag_coeff) )*church_coeff) - score = int((count_elements_within_radius(location, radius) + ((n_tags**1.2)*tag_coeff) )*church_coeff) - elif amenity == "'leisure'='park'" : - score = int((count_elements_within_radius(location, radius) + ((n_tags**1.2)*tag_coeff) )*park_coeff) + # skip if unused + if 'disused:leisure' in elem.tags().keys(): + continue + + # skip if part of another building + if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes': + continue + else : - score = int(count_elements_within_radius(location, radius) + ((n_tags**1.2)*tag_coeff)) + osm_type = elem.type() # Add type : 'way' or 'relation' + osm_id = elem.id() # Add OSM id + elem_type = landmarktype # Add the landmark type as 'sightseeing, + n_tags = len(elem.tags().keys()) # Add number of tags - if score is not None : - # Generate the landmark and append it to the list - #print(f"There are {n_tags} tags on this Landmark. Total score : {score}\n") - landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=int(n_tags)) - L.append(landmark) + # remove specific tags + skip = False + for tag in elem.tags().keys() : + if "pay" in tag : + n_tags -= 1 # discard payment options for tags - return L + if "disused" in tag : + skip = True # skip disused amenities + break + + if "wikipedia" in tag : + n_tags += 3 # wikipedia entries count more + + if tag == "wikidata" : + Q = elem.tag('wikidata') + site = Site("wikidata", "wikidata") + item = ItemPage(site, Q) + item.get() + n_languages = len(item.labels) + n_tags += n_languages/10 + + if elem_type != "nature" : + if "leisure" in tag and elem.tag('leisure') == "park": + elem_type = "nature" + + if amenity not in ["'shop'='department_store'", "'shop'='mall'"] : + if "shop" in tag : + skip = True + break + + if tag == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']: + skip = True + break + + if skip: + continue + + # Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT + if amenity == "'amenity'='place_of_worship'" : + #score = int((count_elements_close_to(location, radius) + (n_tags*tag_coeff) )*church_coeff) + score = int((self.count_elements_close_to(location) + ((n_tags**1.2)*self.tag_coeff) )*self.church_coeff) + elif amenity == "'leisure'='park'" : + score = int((self.count_elements_close_to(location) + ((n_tags**1.2)*self.tag_coeff) )*self.park_coeff) + else : + score = int(self.count_elements_close_to(location) + ((n_tags**1.2)*self.tag_coeff)) + + if score is not None : + + # Generate the landmark and append it to the list + #print(f"There are {n_tags} tags on this Landmark. Total score : {score}\n") + landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=int(n_tags)) + L.append(landmark) + + return L diff --git a/backend/src/optimizer.py b/backend/src/optimizer.py new file mode 100644 index 0000000..6e52de1 --- /dev/null +++ b/backend/src/optimizer.py @@ -0,0 +1,570 @@ +import yaml, logging +import numpy as np + +from typing import List, Tuple +from scipy.optimize import linprog +from collections import defaultdict, deque +from geopy.distance import geodesic + +from structs.landmarks import Landmark +import constants + + + + + +class Optimizer: + + logger = logging.getLogger(__name__) + + landmarks: List[Landmark] # list of landmarks + max_time: int = None # max visit time (in minutes) + detour: int = None # accepted max detour time (in minutes) + detour_factor: float # detour factor of straight line vs real distance in cities + average_walking_speed: float # average walking speed of adult + max_landmarks: int # max number of landmarks to visit + + + def __init__(self, max_time: int, landmarks: List[Landmark]) : + self.max_time = max_time + self.landmarks = landmarks + + # load parameters from file + with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f: + parameters = yaml.safe_load(f) + self.detour_factor = parameters['detour_factor'] + self.average_walking_speed = parameters['average_walking_speed'] + self.max_landmarks = parameters['max_landmarks'] + + + def print_res(self, L: List[Landmark]): + """ + Print the suggested order of landmarks to visit and log the total time walked. + + Args: + L (List[Landmark]): List of landmarks in the suggested visit order. + """ + + self.logger.info(f'The following order is suggested : ') + dist = 0 + for elem in L : + if elem.time_to_reach_next is not None : + print('- ' + elem.name + ', time to reach next = ' + str(elem.time_to_reach_next)) + dist += elem.time_to_reach_next + else : + print('- ' + elem.name) + self.logger.info(f'Minutes walked : {dist}') + self.logger.info(f'Visited {len(L)-2} landmarks') + + + # Prevent the use of a particular solution + def prevent_config(self, resx): + """ + Prevent the use of a particular solution by adding constraints to the optimization. + + Args: + resx (List[float]): List of edge weights. + + Returns: + Tuple[List[int], List[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector. + """ + + for i, elem in enumerate(resx): + resx[i] = round(elem) + + N = len(resx) # Number of edges + L = int(np.sqrt(N)) # Number of landmarks + + nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0] + nonzero_tup = np.unravel_index(nonzeroind, (L,L)) + + ind_a = nonzero_tup[0].tolist() + vertices_visited = ind_a + vertices_visited.remove(0) + + ones = [1]*L + h = [0]*N + for i in range(L) : + if i in vertices_visited : + h[i*L:i*L+L] = ones + + return h, [len(vertices_visited)-1] + + + # Prevents the creation of the same circle (both directions) + def prevent_circle(self, circle_vertices: list, L: int) : + """ + Prevent circular paths by by adding constraints to the optimization. + + Args: + circle_vertices (list): List of vertices forming a circle. + L (int): Number of landmarks. + + Returns: + Tuple[np.ndarray, List[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector. + """ + + l1 = [0]*L*L + l2 = [0]*L*L + for i, node in enumerate(circle_vertices[:-1]) : + next = circle_vertices[i+1] + + l1[node*L + next] = 1 + l2[next*L + node] = 1 + + s = circle_vertices[0] + g = circle_vertices[-1] + + l1[g*L + s] = 1 + l2[s*L + g] = 1 + + return np.vstack((l1, l2)), [0, 0] + + + def is_connected(self, resx) : + """ + Determine the order of visits and detect any circular paths in the given configuration. + + Args: + resx (list): List of edge weights. + + Returns: + Tuple[List[int], Optional[List[List[int]]]]: A tuple containing the visit order and a list of any detected circles. + """ + + # first round the results to have only 0-1 values + for i, elem in enumerate(resx): + resx[i] = round(elem) + + N = len(resx) # length of res + L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def. + + nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0] + nonzero_tup = np.unravel_index(nonzeroind, (L,L)) + + ind_a = nonzero_tup[0].tolist() + ind_b = nonzero_tup[1].tolist() + + # Step 1: Create a graph representation + graph = defaultdict(list) + for a, b in zip(ind_a, ind_b): + graph[a].append(b) + + # Step 2: Function to perform BFS/DFS to extract journeys + def get_journey(start): + journey_nodes = [] + visited = set() + stack = deque([start]) + + while stack: + node = stack.pop() + if node not in visited: + visited.add(node) + journey_nodes.append(node) + for neighbor in graph[node]: + if neighbor not in visited: + stack.append(neighbor) + + return journey_nodes + + # Step 3: Extract all journeys + all_journeys_nodes = [] + visited_nodes = set() + + for node in ind_a: + if node not in visited_nodes: + journey_nodes = get_journey(node) + all_journeys_nodes.append(journey_nodes) + visited_nodes.update(journey_nodes) + + for l in all_journeys_nodes : + if 0 in l : + order = l + all_journeys_nodes.remove(l) + break + + if len(all_journeys_nodes) == 0 : + return order, None + + return order, all_journeys_nodes + + + def get_time(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> int : + """ + Calculate the time in minutes to travel from one location to another. + + Args: + p1 (Tuple[float, float]): Coordinates of the starting location. + p2 (Tuple[float, float]): Coordinates of the destination. + + Returns: + int: Time to travel from p1 to p2 in minutes. + """ + + # Compute the straight-line distance in km + if p1 == p2 : + return 0 + else: + dist = geodesic(p1, p2).kilometers + + # Consider the detour factor for average cityto deterline walking distance (in km) + walk_dist = dist*self.detour_factor + + # Time to walk this distance (in minutes) + walk_time = walk_dist/self.average_walking_speed*60 + + return round(walk_time) + + + def init_ub_dist(self, landmarks: List[Landmark], max_steps: int): + """ + Initialize the objective function coefficients and inequality constraints for the optimization problem. + + This function computes the distances between all landmarks and stores their attractiveness to maximize sightseeing. + The goal is to maximize the objective function subject to the constraints A*x < b and A_eq*x = b_eq. + + Args: + landmarks (List[Landmark]): List of landmarks. + max_steps (int): Maximum number of steps allowed. + + Returns: + Tuple[List[float], List[float], List[int]]: Objective function coefficients, inequality constraint coefficients, and the right-hand side of the inequality constraint. + """ + + # Objective function coefficients. a*x1 + b*x2 + c*x3 + ... + c = [] + # Coefficients of inequality constraints (left-hand side) + A_ub = [] + + for spot1 in landmarks : + dist_table = [0]*len(landmarks) + c.append(-spot1.attractiveness) + for j, spot2 in enumerate(landmarks) : + t = self.get_time(spot1.location, spot2.location) + dist_table[j] = t + closest = sorted(dist_table)[:22] + for i, dist in enumerate(dist_table) : + if dist not in closest : + dist_table[i] = 32700 + A_ub += dist_table + c = c*len(landmarks) + + return c, A_ub, [max_steps] + + + def respect_number(self, L: int): + """ + Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks. + + Args: + L (int): Number of landmarks. + + Returns: + Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. + """ + + ones = [1]*L + zeros = [0]*L + A = ones + zeros*(L-1) + b = [1] + for i in range(L-1) : + h_new = zeros*i + ones + zeros*(L-1-i) + A = np.vstack((A, h_new)) + b.append(1) + + A = np.vstack((A, ones*L)) + b.append(self.max_landmarks+1) + + return A, b + + + # Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements + def break_sym(self, L): + """ + Generate constraints to prevent simultaneous travel between two landmarks in both directions. + + Args: + L (int): Number of landmarks. + + Returns: + Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. + """ + + upper_ind = np.triu_indices(L,0,L) + + up_ind_x = upper_ind[0] + up_ind_y = upper_ind[1] + + A = [0]*L*L + b = [1] + + for i, _ in enumerate(up_ind_x[1:]) : + l = [0]*L*L + if up_ind_x[i] != up_ind_y[i] : + l[up_ind_x[i]*L + up_ind_y[i]] = 1 + l[up_ind_y[i]*L + up_ind_x[i]] = 1 + + A = np.vstack((A,l)) + b.append(1) + + return A, b + + + def init_eq_not_stay(self, L: int): + """ + Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.). + + Args: + L (int): Number of landmarks. + + Returns: + Tuple[List[np.ndarray], List[int]]: Equality constraint coefficients and the right-hand side of the equality constraints. + """ + + l = [0]*L*L + + for i in range(L) : + for j in range(L) : + if j == i : + l[j + i*L] = 1 + + l = np.array(np.array(l), dtype=np.int8) + + return [l], [0] + + + def respect_user_must_do(self, landmarks: List[Landmark]) : + """ + Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization. + + Args: + landmarks (List[Landmark]): List of landmarks, where some are marked as 'must_do'. + + Returns: + Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. + """ + + L = len(landmarks) + A = [0]*L*L + b = [0] + + for i, elem in enumerate(landmarks[1:]) : + if elem.must_do is True and elem.name not in ['finish', 'start']: + l = [0]*L*L + l[i*L:i*L+L] = [1]*L # set mandatory departures from landmarks tagged as 'must_do' + + A = np.vstack((A,l)) + b.append(1) + + return A, b + + + def respect_user_must_avoid(self, landmarks: List[Landmark]) : + """ + Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped in the optimization. + + Args: + landmarks (List[Landmark]): List of landmarks, where some are marked as 'must_avoid'. + + Returns: + Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. + """ + + L = len(landmarks) + A = [0]*L*L + b = [0] + + for i, elem in enumerate(landmarks[1:]) : + if elem.must_avoid is True and elem.name not in ['finish', 'start']: + l = [0]*L*L + l[i*L:i*L+L] = [1]*L + + A = np.vstack((A,l)) + b.append(0) # prevent departures from landmarks tagged as 'must_do' + + return A, b + + + # Constraint to ensure start at start and finish at goal + def respect_start_finish(self, L: int): + """ + Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark. + + Args: + L (int): Number of landmarks. + + Returns: + Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. + """ + + l_start = [1]*L + [0]*L*(L-1) # sets departures only for start (horizontal ones) + l_start[L-1] = 0 # prevents the jump from start to finish + l_goal = [0]*L*L # sets arrivals only for finish (vertical ones) + l_L = [0]*L*(L-1) + [1]*L # prevents arrivals at start and departures from goal + for k in range(L-1) : # sets only vertical ones for goal (go to) + l_L[k*L] = 1 + if k != 0 : + l_goal[k*L+L-1] = 1 + + A = np.vstack((l_start, l_goal)) + b = [1, 1] + A = np.vstack((A,l_L)) + b.append(0) + + return A, b + + + def respect_order(self, L: int): + """ + Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles. + + Args: + L (int): Number of landmarks. + + Returns: + Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. + """ + + A = [0]*L*L + b = [0] + for i in range(L-1) : # Prevent stacked ones + if i == 0 or i == L-1: # Don't touch start or finish + continue + else : + l = [0]*L + l[i] = -1 + l = l*L + for j in range(L) : + l[i*L + j] = 1 + + A = np.vstack((A,l)) + b.append(0) + + return A, b + + + def link_list(self, order: List[int], landmarks: List[Landmark])->List[Landmark] : + """ + Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times. + + Args: + order (List[int]): List of indices representing the order of landmarks to visit. + landmarks (List[Landmark]): List of all landmarks. + + Returns: + List[Landmark]]: The updated linked list of landmarks with travel times + """ + + L = [] + j = 0 + while j < len(order)-1 : + # get landmarks involved + elem = landmarks[order[j]] + next = landmarks[order[j+1]] + + # get attributes + elem.time_to_reach_next = self.get_time(elem.location, next.location) + elem.must_do = True + elem.location = (round(elem.location[0], 5), round(elem.location[1], 5)) + elem.next_uuid = next.uuid + L.append(elem) + j += 1 + + next.location = (round(next.location[0], 5), round(next.location[1], 5)) + next.must_do = True + L.append(next) + + return L + + + # Main optimization pipeline + def solve_optimization (self) : + """ + Main optimization pipeline to solve the landmark visiting problem. + + This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks, + considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present. + + Returns: + List[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found. + """ + + L = len(self.landmarks) + + # SET CONSTRAINTS FOR INEQUALITY + c, A_ub, b_ub = self.init_ub_dist(self.landmarks, self.max_time) # Add the distances from each landmark to the other + A, b = self.respect_number(L) # Respect max number of visits (no more possible stops than landmarks). + A_ub = np.vstack((A_ub, A), dtype=np.int16) + b_ub += b + A, b = self.break_sym(L) # break the 'zig-zag' symmetry + A_ub = np.vstack((A_ub, A), dtype=np.int16) + b_ub += b + + + # SET CONSTRAINTS FOR EQUALITY + A_eq, b_eq = self.init_eq_not_stay(L) # Force solution not to stay in same place + A, b = self.respect_user_must_do(self.landmarks) # Check if there are user_defined must_see. Also takes care of start/goal + A_eq = np.vstack((A_eq, A), dtype=np.int8) + b_eq += b + A, b = self.respect_start_finish(L) # Force start and finish positions + A_eq = np.vstack((A_eq, A), dtype=np.int8) + b_eq += b + A, b = self.respect_order(L) # Respect order of visit (only works when max_steps is limiting factor) + A_eq = np.vstack((A_eq, A), dtype=np.int8) + b_eq += b + + # SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1) + x_bounds = [(0, 1)]*L*L + + # Solve linear programming problem + res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) + + # Raise error if no solution is found + if not res.success : + raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos") + + # If there is a solution, we're good to go, just check for connectiveness + else : + order, circles = self.is_connected(res.x) + #nodes, edges = is_connected(res.x) + i = 0 + timeout = 80 + while circles is not None and i < timeout: + A, b = self.prevent_config(res.x) + A_ub = np.vstack((A_ub, A)) + b_ub += b + #A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub) + for circle in circles : + A, b = self.prevent_circle(circle, L) + A_eq = np.vstack((A_eq, A)) + b_eq += b + res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) + if not res.success : + self.logger.info("Solving failed because of overconstrained problem") + return None + order, circles = self.is_connected(res.x) + #nodes, edges = is_connected(res.x) + if circles is None : + break + print(i) + i += 1 + + if i == timeout : + raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.") + + # Add the times to reach and stop optimizing + tour = self.link_list(order, self.landmarks) + + # logging + if i != 0 : + self.logger.info(f"Neded to recompute paths {i} times because of unconnected loops...") + self.print_res(tour) # how to do better ? + self.logger.info(f"Total score : {int(-res.fun)}") + + return tour + + + + + + diff --git a/backend/src/optimizer_v4.py b/backend/src/optimizer_v4.py deleted file mode 100644 index 0eecb97..0000000 --- a/backend/src/optimizer_v4.py +++ /dev/null @@ -1,446 +0,0 @@ -import numpy as np -import json, os - -from typing import List, Tuple -from scipy.optimize import linprog -from collections import defaultdict, deque -from geopy.distance import geodesic - -from structs.landmarks import Landmark - - -# Function to print the result -def print_res(L: List[Landmark]): - - print('The following order is suggested : ') - - dist = 0 - for elem in L : - if elem.time_to_reach_next is not None : - print('- ' + elem.name + ', time to reach next = ' + str(elem.time_to_reach_next)) - dist += elem.time_to_reach_next - else : - print('- ' + elem.name) - - print("\nMinutes walked : " + str(dist)) - print(f"Visited {len(L)-2} landmarks") - - -# Prevent the use of a particular solution -def prevent_config(resx): - - for i, elem in enumerate(resx): - resx[i] = round(elem) - - N = len(resx) # Number of edges - L = int(np.sqrt(N)) # Number of landmarks - - nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0] - nonzero_tup = np.unravel_index(nonzeroind, (L,L)) - - ind_a = nonzero_tup[0].tolist() - vertices_visited = ind_a - vertices_visited.remove(0) - - ones = [1]*L - h = [0]*N - for i in range(L) : - if i in vertices_visited : - h[i*L:i*L+L] = ones - - return h, [len(vertices_visited)-1] - - -# Prevents the creation of the same circle (both directions) -def prevent_circle(circle_vertices: list, L: int) : - - l1 = [0]*L*L - l2 = [0]*L*L - for i, node in enumerate(circle_vertices[:-1]) : - next = circle_vertices[i+1] - - l1[node*L + next] = 1 - l2[next*L + node] = 1 - - s = circle_vertices[0] - g = circle_vertices[-1] - - l1[g*L + s] = 1 - l2[s*L + g] = 1 - - return np.vstack((l1, l2)), [0, 0] - - -# Returns the order of visit as well as any circles if there are some -def is_connected(resx) : - - # first round the results to have only 0-1 values - for i, elem in enumerate(resx): - resx[i] = round(elem) - - N = len(resx) # length of res - L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def. - n_edges = resx.sum() # number of edges - - nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0] - - nonzero_tup = np.unravel_index(nonzeroind, (L,L)) - - ind_a = nonzero_tup[0].tolist() - ind_b = nonzero_tup[1].tolist() - - # Step 1: Create a graph representation - graph = defaultdict(list) - for a, b in zip(ind_a, ind_b): - graph[a].append(b) - - # Step 2: Function to perform BFS/DFS to extract journeys - def get_journey(start): - journey_nodes = [] - visited = set() - stack = deque([start]) - - while stack: - node = stack.pop() - if node not in visited: - visited.add(node) - journey_nodes.append(node) - for neighbor in graph[node]: - if neighbor not in visited: - stack.append(neighbor) - - return journey_nodes - - # Step 3: Extract all journeys - all_journeys_nodes = [] - visited_nodes = set() - - for node in ind_a: - if node not in visited_nodes: - journey_nodes = get_journey(node) - all_journeys_nodes.append(journey_nodes) - visited_nodes.update(journey_nodes) - - for l in all_journeys_nodes : - if 0 in l : - order = l - all_journeys_nodes.remove(l) - break - - if len(all_journeys_nodes) == 0 : - return order, None - - return order, all_journeys_nodes - - -# Function that returns the time in minutes from one location to another -def get_time(p1: Tuple[float, float], p2: Tuple[float, float], detour: float, speed: float) : - - # Compute the straight-line distance in km - if p1 == p2 : - return 0 - else: - dist = geodesic(p1, p2).kilometers - - # Consider the detour factor for average cityto deterline walking distance (in km) - walk_dist = dist*detour - - # Time to walk this distance (in minutes) - walk_time = walk_dist/speed*60 - - return round(walk_time) - - -# Initialize A and c. Compute the distances from all landmarks to each other and store attractiveness -# We want to maximize the sightseeing : max(c) st. A*x < b and A_eq*x = b_eq -def init_ub_dist(landmarks: List[Landmark], max_steps: int): - - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : - parameters = json.loads(f.read()) - detour = parameters['detour factor'] - speed = parameters['average walking speed'] - - # Objective function coefficients. a*x1 + b*x2 + c*x3 + ... - c = [] - # Coefficients of inequality constraints (left-hand side) - A_ub = [] - - for spot1 in landmarks : - dist_table = [0]*len(landmarks) - c.append(-spot1.attractiveness) - for j, spot2 in enumerate(landmarks) : - t = get_time(spot1.location, spot2.location, detour, speed) - dist_table[j] = t - closest = sorted(dist_table)[:22] - for i, dist in enumerate(dist_table) : - if dist not in closest : - dist_table[i] = 32700 - A_ub += dist_table - c = c*len(landmarks) - - return c, A_ub, [max_steps] - - -# Constraint to respect only one travel per landmark. Also caps the total number of visited landmarks -def respect_number(L: int, max_landmarks: int): - - ones = [1]*L - zeros = [0]*L - A = ones + zeros*(L-1) - b = [1] - for i in range(L-1) : - h_new = zeros*i + ones + zeros*(L-1-i) - A = np.vstack((A, h_new)) - b.append(1) - - if max_landmarks is None : - # Read the parameters from the file - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : - parameters = json.loads(f.read()) - max_landmarks = parameters['max landmarks'] - - A = np.vstack((A, ones*L)) - b.append(max_landmarks+1) - - return A, b - - -# Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements -def break_sym(L): - - upper_ind = np.triu_indices(L,0,L) - - up_ind_x = upper_ind[0] - up_ind_y = upper_ind[1] - - A = [0]*L*L # useless row to prevent overhead ? better solution welcomed - # A[up_ind_x[0]*L + up_ind_y[0]] = 1 - # A[up_ind_y[0]*L + up_ind_x[0]] = 1 - b = [1] - - for i, _ in enumerate(up_ind_x[1:]) : - l = [0]*L*L - if up_ind_x[i] != up_ind_y[i] : - l[up_ind_x[i]*L + up_ind_y[i]] = 1 - l[up_ind_y[i]*L + up_ind_x[i]] = 1 - - A = np.vstack((A,l)) - b.append(1) - - return A, b - - -# Constraint to not stay in position. Removes d11, d22, d33, etc. -def init_eq_not_stay(L: int): - l = [0]*L*L - - for i in range(L) : - for j in range(L) : - if j == i : - l[j + i*L] = 1 - - l = np.array(np.array(l), dtype=np.int8) - - return [l], [0] - - -# Go through the landmarks and force the optimizer to use landmarks marked as must_do -def respect_user_must_do(landmarks: List[Landmark]) : - L = len(landmarks) - - A = [0]*L*L - b = [0] - for i, elem in enumerate(landmarks[1:]) : - if elem.must_do is True and elem.name not in ['finish', 'start']: - l = [0]*L*L - l[i*L:i*L+L] = [1]*L # set mandatory departures from landmarks tagged as 'must_do' - - A = np.vstack((A,l)) - b.append(1) - - return A, b - - -# Constraint to ensure start at start and finish at goal -def respect_start_finish(L: int): - l_start = [1]*L + [0]*L*(L-1) # sets departures only for start (horizontal ones) - l_start[L-1] = 0 # prevents the jump from start to finish - l_goal = [0]*L*L # sets arrivals only for finish (vertical ones) - l_L = [0]*L*(L-1) + [1]*L # prevents arrivals at start and departures from goal - for k in range(L-1) : # sets only vertical ones for goal (go to) - l_L[k*L] = 1 - if k != 0 : - l_goal[k*L+L-1] = 1 - - - A = np.vstack((l_start, l_goal)) - b = [1, 1] - A = np.vstack((A,l_L)) - b.append(0) - - return A, b - - -# Constraint to tie the problem together. Necessary but not sufficient to avoid circles -def respect_order(L: int): - - A = [0]*L*L # useless row to reduce overhead ? better solution is welcome - b = [0] - for i in range(L-1) : # Prevent stacked ones - if i == 0 or i == L-1: # Don't touch start or finish - continue - else : - l = [0]*L - l[i] = -1 - l = l*L - for j in range(L) : - l[i*L + j] = 1 - - A = np.vstack((A,l)) - b.append(0) - - return A, b - - -# Computes the time to reach from each landmark to the next -def link_list(order: List[int], landmarks: List[Landmark])->List[Landmark] : - - # Read the parameters from the file - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : - parameters = json.loads(f.read()) - detour_factor = parameters['detour factor'] - speed = parameters['average walking speed'] - - L = [] - j = 0 - total_dist = 0 - while j < len(order)-1 : - elem = landmarks[order[j]] - next = landmarks[order[j+1]] - - d = get_time(elem.location, next.location, detour_factor, speed) - elem.time_to_reach_next = d - elem.must_do = True - elem.location = (round(elem.location[0], 5), round(elem.location[1], 5)) - L.append(elem) - j += 1 - total_dist += d - - next.location = (round(next.location[0], 5), round(next.location[1], 5)) - L.append(next) - - return L, total_dist - - -# Same as link_list but does it on a already ordered list -def link_list_simple(ordered_visit: List[Landmark])-> List[Landmark] : - - # Read the parameters from the file - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : - parameters = json.loads(f.read()) - detour_factor = parameters['detour factor'] - speed = parameters['average walking speed'] - - L = [] - j = 0 - total_dist = 0 - while j < len(ordered_visit)-1 : - elem = ordered_visit[j] - next = ordered_visit[j+1] - - elem.next_uuid = next.uuid - d = get_time(elem.location, next.location, detour_factor, speed) - elem.time_to_reach_next = d - if elem.name not in ['start', 'finish'] : - elem.must_do = True - L.append(elem) - j += 1 - total_dist += d - - L.append(next) - - return L, total_dist - - -# Main optimization pipeline -def solve_optimization (landmarks :List[Landmark], max_steps: int, printing_details: bool, max_landmarks = None) : - - L = len(landmarks) - - # SET CONSTRAINTS FOR INEQUALITY - c, A_ub, b_ub = init_ub_dist(landmarks, max_steps) # Add the distances from each landmark to the other - A, b = respect_number(L, max_landmarks) # Respect max number of visits (no more possible stops than landmarks). - A_ub = np.vstack((A_ub, A), dtype=np.int16) - b_ub += b - A, b = break_sym(L) # break the 'zig-zag' symmetry - A_ub = np.vstack((A_ub, A), dtype=np.int16) - b_ub += b - - - # SET CONSTRAINTS FOR EQUALITY - A_eq, b_eq = init_eq_not_stay(L) # Force solution not to stay in same place - A, b = respect_user_must_do(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal - A_eq = np.vstack((A_eq, A), dtype=np.int8) - b_eq += b - A, b = respect_start_finish(L) # Force start and finish positions - A_eq = np.vstack((A_eq, A), dtype=np.int8) - b_eq += b - A, b = respect_order(L) # Respect order of visit (only works when max_steps is limiting factor) - A_eq = np.vstack((A_eq, A), dtype=np.int8) - b_eq += b - - # SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1) - x_bounds = [(0, 1)]*L*L - - # Solve linear programming problem - res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) - - # Raise error if no solution is found - if not res.success : - raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos") - - # If there is a solution, we're good to go, just check for connectiveness - else : - order, circles = is_connected(res.x) - #nodes, edges = is_connected(res.x) - i = 0 - timeout = 80 - while circles is not None and i < timeout: - A, b = prevent_config(res.x) - A_ub = np.vstack((A_ub, A)) - b_ub += b - #A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub) - for circle in circles : - A, b = prevent_circle(circle, len(landmarks)) - A_eq = np.vstack((A_eq, A)) - b_eq += b - res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) - if not res.success : - print("Solving failed because of overconstrained problem") - return None - order, circles = is_connected(res.x) - #nodes, edges = is_connected(res.x) - if circles is None : - break - print(i) - i += 1 - - if i == timeout : - raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.") - - # Add the times to reach and stop optimizing - L, _ = link_list(order, landmarks) - - if printing_details is True : - if i != 0 : - print(f"Neded to recompute paths {i} times because of unconnected loops...") - print_res(L) - print("\nTotal score : " + str(int(-res.fun))) - - return L - - - - - - diff --git a/backend/src/parameters/landmark_parameters.yaml b/backend/src/parameters/landmark_parameters.yaml index b85c89a..5de9c48 100644 --- a/backend/src/parameters/landmark_parameters.yaml +++ b/backend/src/parameters/landmark_parameters.yaml @@ -1,6 +1,6 @@ city_bbox_side: 5000 #m -radius_close_to: 30 -church_coeff: 0.6 -park_coeff: 1.5 -tag_coeff: 100 +radius_close_to: 50 +church_coeff: 0.8 +park_coeff: 1.2 +tag_coeff: 10 N_important: 40 diff --git a/backend/src/refiner.py b/backend/src/refiner.py index 1971170..8fbb0f5 100644 --- a/backend/src/refiner.py +++ b/backend/src/refiner.py @@ -1,244 +1,354 @@ -import os, json +import yaml, logging from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull from typing import List, Tuple from math import pi from structs.landmarks import Landmark -from landmarks_manager import take_most_important -from optimizer_v4 import solve_optimization, link_list_simple, print_res, get_time +from optimizer import Optimizer +from utils import get_time, link_list_simple, take_most_important +import constants -# Create corridor from tour -def create_corridor(landmarks: List[Landmark], width: float) : - corrected_width = (180*width)/(6371000*pi) - - path = create_linestring(landmarks) - obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2) +class Refiner : - return obj + logger = logging.getLogger(__name__) + + all_landmarks: List[Landmark] # list of all landmarks + base_tour: List[Landmark] # base tour that needs to be refined + max_time: int = None # max visit time (in minutes) + detour: int = None # accepted max detour time (in minutes) + detour_factor: float # detour factor of straight line vs real distance in cities + average_walking_speed: float # average walking speed of adult + max_landmarks: int # max number of landmarks to visit -# Create linestring from tour -def create_linestring(landmarks: List[Landmark])->List[Point] : + def __init__(self, max_time: int, detour: int, all_landmarks: List[Landmark], base_tour: List[Landmark]) : + self.max_time = max_time + self.detour = detour + self.all_landmarks = all_landmarks + self.base_tour = base_tour - points = [] - - for landmark in landmarks : - points.append(Point(landmark.location)) - - return LineString(points) + # load parameters from file + with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f: + parameters = yaml.safe_load(f) + self.detour_factor = parameters['detour_factor'] + self.average_walking_speed = parameters['average_walking_speed'] + self.max_landmarks = parameters['max_landmarks'] + 4 -# Check if some coordinates are in area. Used for the corridor -def is_in_area(area: Polygon, coordinates) -> bool : - point = Point(coordinates) - return point.within(area) + def create_corridor(self, landmarks: List[Landmark], width: float) : + """ + Create a corridor around the path connecting the landmarks. + + Args: + landmarks (List[Landmark]): the landmark path around which to create the corridor + width (float): Width of the corridor in meters. + + Returns: + Geometry: A buffered geometry object representing the corridor around the path. + """ + + corrected_width = (180*width)/(6371000*pi) + + path = self.create_linestring(landmarks) + obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2) + + return obj -# Function to determine if two landmarks are close to each other -def is_close_to(location1: Tuple[float], location2: Tuple[float]): - """Determine if two locations are close by rounding their coordinates to 3 decimals.""" - absx = abs(location1[0] - location2[0]) - absy = abs(location1[1] - location2[1]) + def create_linestring(self, tour: List[Landmark])->LineString : + """ + Create a `LineString` object from a tour. - return absx < 0.001 and absy < 0.001 - #return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3)) + Args: + tour (List[Landmark]): An ordered sequence of landmarks that represents the visiting order. + + Returns: + LineString: A `LineString` object representing the path through the landmarks. + """ + + points = [] + for landmark in tour : + points.append(Point(landmark.location)) + + return LineString(points) -# Rearrange some landmarks in the order of visit to group visit -def rearrange(landmarks: List[Landmark]) -> List[Landmark]: - - i = 1 - while i < len(landmarks): - j = i+1 - while j < len(landmarks): - if is_close_to(landmarks[i].location, landmarks[j].location) and landmarks[i].name not in ['start', 'finish'] and landmarks[j].name not in ['start', 'finish']: - # If they are not adjacent, move the j-th element to be adjacent to the i-th element - if j != i + 1: - landmarks.insert(i + 1, landmarks.pop(j)) - break # Move to the next i-th element after rearrangement - j += 1 - i += 1 - - return landmarks + # Check if some coordinates are in area. Used for the corridor + def is_in_area(self, area: Polygon, coordinates) -> bool : + """ + Check if a given point is within a specified area. + + Args: + area (Polygon): The polygon defining the area. + coordinates (Tuple[float, float]): The coordinates of the point to check. + + Returns: + bool: True if the point is within the area, otherwise False. + """ + point = Point(coordinates) + return point.within(area) -# Simple nearest neighbour planner to try to fix the path -def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> Tuple[List[Landmark], Polygon]: + # Function to determine if two landmarks are close to each other + def is_close_to(self, location1: Tuple[float], location2: Tuple[float]): + """ + Determine if two locations are close to each other by rounding their coordinates to 3 decimal places. + + Args: + location1 (Tuple[float, float]): The coordinates of the first location. + location2 (Tuple[float, float]): The coordinates of the second location. + + Returns: + bool: True if the locations are within 0.001 degrees of each other, otherwise False. + """ + + absx = abs(location1[0] - location2[0]) + absy = abs(location1[1] - location2[1]) + + return absx < 0.001 and absy < 0.001 + #return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3)) + + + def rearrange(self, tour: List[Landmark]) -> List[Landmark]: + """ + Rearrange landmarks to group nearby visits together. + + This function reorders landmarks so that nearby landmarks are adjacent to each other in the list, + while keeping 'start' and 'finish' landmarks in their original positions. + + Args: + tour (List[Landmark]): Ordered list of landmarks to be rearranged. + + Returns: + List[Landmark]: The rearranged list of landmarks with grouped nearby visits. + """ - # Read from data - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : - parameters = json.loads(f.read()) - detour = parameters['detour factor'] - speed = parameters['average walking speed'] + i = 1 + while i < len(tour): + j = i+1 + while j < len(tour): + if self.is_close_to(tour[i].location, tour[j].location) and tour[i].name not in ['start', 'finish'] and tour[j].name not in ['start', 'finish']: + # If they are not adjacent, move the j-th element to be adjacent to the i-th element + if j != i + 1: + tour.insert(i + 1, tour.pop(j)) + break # Move to the next i-th element after rearrangement + j += 1 + i += 1 - # Step 1: Find 'start' and 'finish' landmarks - start_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'start') - finish_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'finish') - - start_landmark = landmarks[start_idx] - finish_landmark = landmarks[finish_idx] - - - # Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish' - unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]] - - # Step 3: Initialize the path with the 'start' landmark - path = [start_landmark] - coordinates = [landmarks[start_idx].location] - - current_landmark = start_landmark - - # Step 4: Use nearest neighbor heuristic to visit all landmarks - while unvisited_landmarks: - nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location, detour, speed)) - path.append(nearest_landmark) - coordinates.append(nearest_landmark.location) - current_landmark = nearest_landmark - unvisited_landmarks.remove(nearest_landmark) - - # Step 5: Finally add the 'finish' landmark to the path - path.append(finish_landmark) - coordinates.append(landmarks[finish_idx].location) - - path_poly = Polygon(coordinates) - - return path, path_poly + return tour -# Returns a list of minor landmarks around the planned path to enhance experience -def get_minor_landmarks(all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] : + def find_shortest_path_through_all_landmarks(self, landmarks: List[Landmark]) -> Tuple[List[Landmark], Polygon]: + """ + Find the shortest path through all landmarks using a nearest neighbor heuristic. - second_order_landmarks = [] - visited_names = [] - area = create_corridor(visited_landmarks, width) + This function constructs a path that starts from the 'start' landmark, visits all other landmarks in the order + of their proximity, and ends at the 'finish' landmark. It returns both the ordered list of landmarks and a + polygon representing the path. - for visited in visited_landmarks : - visited_names.append(visited.name) - - for landmark in all_landmarks : - if is_in_area(area, landmark.location) and landmark.name not in visited_names: - second_order_landmarks.append(landmark) + Args: + landmarks (List[Landmark]): List of all landmarks including 'start' and 'finish'. - return take_most_important(second_order_landmarks, len(visited_landmarks)) + Returns: + Tuple[List[Landmark], Polygon]: A tuple where the first element is the list of landmarks in the order they + should be visited, and the second element is a `Polygon` representing + the path connecting all landmarks. + """ + + # Step 1: Find 'start' and 'finish' landmarks + start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start') + finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish') + + start_landmark = landmarks[start_idx] + finish_landmark = landmarks[finish_idx] + + + # Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish' + unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]] + + # Step 3: Initialize the path with the 'start' landmark + path = [start_landmark] + coordinates = [landmarks[start_idx].location] + + current_landmark = start_landmark + + # Step 4: Use nearest neighbor heuristic to visit all landmarks + while unvisited_landmarks: + nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location)) + path.append(nearest_landmark) + coordinates.append(nearest_landmark.location) + current_landmark = nearest_landmark + unvisited_landmarks.remove(nearest_landmark) + + # Step 5: Finally add the 'finish' landmark to the path + path.append(finish_landmark) + coordinates.append(landmarks[finish_idx].location) + + path_poly = Polygon(coordinates) + + return path, path_poly -# Try fix the shortest path using shapely -def fix_using_polygon(tour: List[Landmark])-> List[Landmark] : + # Returns a list of minor landmarks around the planned path to enhance experience + def get_minor_landmarks(self, all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] : + """ + Identify landmarks within a specified corridor that have not been visited yet. - coords = [] - coords_dict = {} - for landmark in tour : - coords.append(landmark.location) - if landmark.name != 'finish' : - coords_dict[landmark.location] = landmark + This function creates a corridor around the path defined by visited landmarks and then finds landmarks that fall + within this corridor. It returns a list of these landmarks, excluding those already visited, sorted by their importance. - tour_poly = Polygon(coords) - - better_tour_poly = tour_poly.buffer(0) - try : - xs, ys = better_tour_poly.exterior.xy + Args: + all_landmarks (List[Landmark]): List of all available landmarks. + visited_landmarks (List[Landmark]): List of landmarks that have already been visited. + width (float): Width of the corridor around the visited landmarks. - if len(xs) != len(tour) : - better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish - xs, ys = better_tour_poly.exterior.xy + Returns: + List[Landmark]: List of important landmarks within the corridor that have not been visited yet. + """ - except : - better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish - xs, ys = better_tour_poly.exterior.xy + second_order_landmarks = [] + visited_names = [] + area = self.create_corridor(visited_landmarks, width) + + for visited in visited_landmarks : + visited_names.append(visited.name) + + for landmark in all_landmarks : + if self.is_in_area(area, landmark.location) and landmark.name not in visited_names: + second_order_landmarks.append(landmark) + + return take_most_important(second_order_landmarks, len(visited_landmarks)) - # reverse the xs and ys - xs.reverse() - ys.reverse() + # Try fix the shortest path using shapely + def fix_using_polygon(self, tour: List[Landmark])-> List[Landmark] : + """ + Improve the tour path using geometric methods to ensure it follows a more optimal shape. - better_tour = [] # List of ordered visit - name_index = {} # Maps the name of a landmark to its index in the concave polygon + This function creates a polygon from the given tour and attempts to refine it using a concave hull. It reorders + the landmarks to fit within this refined polygon and adjusts the tour to ensure the 'start' landmark is at the + beginning. It also checks if the final polygon is simple and rearranges the tour if necessary. - # Loop through the polygon and generate the better (ordered) tour - for i,x in enumerate(xs[:-1]) : - y = ys[i] - better_tour.append(coords_dict[tuple((x,y))]) - name_index[coords_dict[tuple((x,y))].name] = i + Args: + tour (List[Landmark]): List of landmarks representing the current tour path. + + Returns: + List[Landmark]: Refined list of landmarks in the order of visit to produce a better tour path. + """ + + coords = [] + coords_dict = {} + for landmark in tour : + coords.append(landmark.location) + if landmark.name != 'finish' : + coords_dict[landmark.location] = landmark + + tour_poly = Polygon(coords) + + better_tour_poly = tour_poly.buffer(0) + try : + xs, ys = better_tour_poly.exterior.xy + + if len(xs) != len(tour) : + better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish + xs, ys = better_tour_poly.exterior.xy + + except : + better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish + xs, ys = better_tour_poly.exterior.xy - # Scroll the list to have start in front again - start_index = name_index['start'] - better_tour = better_tour[start_index:] + better_tour[:start_index] + # reverse the xs and ys + xs.reverse() + ys.reverse() - # Append the finish back and correct the time to reach - better_tour.append(tour[-1]) + better_tour = [] # List of ordered visit + name_index = {} # Maps the name of a landmark to its index in the concave polygon - # Rearrange only if polygon still not simple - if not better_tour_poly.is_simple : - better_tour = rearrange(better_tour) - - return better_tour + # Loop through the polygon and generate the better (ordered) tour + for i,x in enumerate(xs[:-1]) : + y = ys[i] + better_tour.append(coords_dict[tuple((x,y))]) + name_index[coords_dict[tuple((x,y))].name] = i -# Second stage of the optimization. Use linear programming again to refine the path -def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, detour: int, print_infos: bool) -> List[Landmark] : + # Scroll the list to have start in front again + start_index = name_index['start'] + better_tour = better_tour[start_index:] + better_tour[:start_index] - # Read from the file - with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : - parameters = json.loads(f.read()) - max_landmarks = parameters['max landmarks'] + 4 + # Append the finish back and correct the time to reach + better_tour.append(tour[-1]) - # No need to refine if no detour is taken - # if detour == 0 : - if False : - new_tour = base_tour - - else : - minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200) - - if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path") - - # full set of visitable landmarks - full_set = base_tour[:-1] + minor_landmarks # create full set of possible landmarks (without finish) - full_set.append(base_tour[-1]) # add finish back - - # get a new tour - new_tour = solve_optimization(full_set, max_time+detour, False, max_landmarks) - if new_tour is None : - new_tour = base_tour - - # Link the new tour - new_tour, new_dist = link_list_simple(new_tour) - - # If the tour contains only one landmark, return - if len(new_tour) < 4 : - return new_tour - - # Find shortest path using the nearest neighbor heuristic - better_tour, better_poly = find_shortest_path_through_all_landmarks(new_tour) - - # Fix the tour using Polygons if the path looks weird - if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid : - better_tour = fix_using_polygon(better_tour) - - # Link the tour again - better_tour, better_dist = link_list_simple(better_tour) - - # Choose the better tour depending on walked distance - if new_dist < better_dist : - final_tour = new_tour - else : - final_tour = better_tour - - if print_infos : - print("\n\n\nRefined tour (result of second stage optimization): ") - print_res(final_tour) - total_score = 0 - for elem in final_tour : - total_score += elem.attractiveness - - print("\nTotal score : " + str(total_score)) + # Rearrange only if polygon still not simple + if not better_tour_poly.is_simple : + better_tour = self.rearrange(better_tour) + + return better_tour - - return final_tour + # Second stage of the optimization. Use linear programming again to refine the path + def refine_optimization(self) -> List[Landmark] : + """ + Refine the initial tour path by considering additional minor landmarks and optimizing the path. + + This method evaluates the need for further optimization based on the initial tour. If a detour is required, it adds + minor landmarks around the initial predicted path and solves a new optimization problem to find a potentially better + tour. It then links the new tour and adjusts it using a nearest neighbor heuristic and polygon-based methods to + ensure a valid path. The final tour is chosen based on the shortest distance. + + Returns: + List[Landmark]: The refined list of landmarks representing the optimized tour path. + """ + + # No need to refine if no detour is taken + # if detour == 0 : + if False : + new_tour = base_tour + + else : + minor_landmarks = self.get_minor_landmarks(self.all_landmarks, self.base_tour, 200) + + self.logger.info(f"Using {len(minor_landmarks)} minor landmarks around the predicted path") + + # full set of visitable landmarks + full_set = self.base_tour[:-1] + minor_landmarks # create full set of possible landmarks (without finish) + full_set.append(self.base_tour[-1]) # add finish back + + # get a new tour + optimizer = Optimizer(self.max_time + self.detour, full_set) + new_tour = optimizer.solve_optimization() + + if new_tour is None : + new_tour = self.base_tour + + # Link the new tour + new_tour, new_dist = link_list_simple(new_tour) + + # If the tour contains only one landmark, return + if len(new_tour) < 4 : + return new_tour + + # Find shortest path using the nearest neighbor heuristic + better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour) + + # Fix the tour using Polygons if the path looks weird + if self.base_tour[0].location == self.base_tour[-1].location and not better_poly.is_valid : + better_tour = self.fix_using_polygon(better_tour) + + # Link the tour again + better_tour, better_dist = link_list_simple(better_tour) + + # Choose the better tour depending on walked distance + if new_dist < better_dist : + final_tour = new_tour + else : + final_tour = better_tour + + self.logger.info("Refined tour (result of second stage optimization): ") + + return final_tour diff --git a/backend/src/structs/landmarks.py b/backend/src/structs/landmarks.py index f6db9ac..6f449a8 100644 --- a/backend/src/structs/landmarks.py +++ b/backend/src/structs/landmarks.py @@ -8,7 +8,7 @@ class Landmark(BaseModel) : # Properties of the landmark name : str - type: Literal['sightseeing', 'nature', 'shopping'] + type: Literal['sightseeing', 'nature', 'shopping', 'start', 'finish'] location : tuple osm_type : str osm_id : int @@ -22,8 +22,8 @@ class Landmark(BaseModel) : uuid: str = Field(default_factory=uuid4) # TODO implement this ASAP # Additional properties depending on specific tour - must_do : bool - must_avoid : bool + must_do : Optional[bool] = False + must_avoid : Optional[bool] = False is_secondary : Optional[bool] = False # TODO future time_to_reach_next : Optional[int] = 0 # TODO fix this in existing code diff --git a/backend/src/structs/preferences.py b/backend/src/structs/preferences.py index ff25007..d370fc8 100644 --- a/backend/src/structs/preferences.py +++ b/backend/src/structs/preferences.py @@ -3,7 +3,7 @@ from typing import Optional, Literal class Preference(BaseModel) : name: str - type: Literal['sightseeing', 'nature', 'shopping'] + type: Literal['sightseeing', 'nature', 'shopping', 'start', 'finish'] score: int # score could be from 1 to 5 # Input for optimization diff --git a/backend/src/tester.py b/backend/src/tester.py index cea6527..8f1f718 100644 --- a/backend/src/tester.py +++ b/backend/src/tester.py @@ -4,10 +4,9 @@ from typing import List from landmarks_manager import LandmarkManager from fastapi.encoders import jsonable_encoder -from optimizer_v4 import solve_optimization -from refiner import refine_optimization +from optimizer import Optimizer +from refiner import Refiner from structs.landmarks import Landmark -from structs.landmarktype import LandmarkType from structs.preferences import Preferences, Preference @@ -24,58 +23,66 @@ def write_data(L: List[Landmark], file_name: str): data.to_json(file_name, indent = 2, force_ascii=False) -def test4(coordinates: tuple[float, float]) -> List[Landmark]: +def test(start_coords: tuple[float, float], finish_coords: tuple[float, float] = None) -> List[Landmark]: - manager = LandmarkManager() + preferences = Preferences( sightseeing=Preference( name='sightseeing', - type=LandmarkType(landmark_type='sightseeing'), + type='sightseeing', score = 5), nature=Preference( name='nature', - type=LandmarkType(landmark_type='nature'), + type='nature', score = 0), shopping=Preference( name='shopping', - type=LandmarkType(landmark_type='shopping'), - score = 0)) + type='shopping', + score = 0), + max_time_minute=180, + detour_tolerance_minute=0 + ) # Create start and finish - start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=coordinates, osm_type='start', osm_id=0, attractiveness=0, must_do=False, n_tags = 0) - finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=coordinates, osm_type='finish', osm_id=0, attractiveness=0, must_do=False, n_tags = 0) + if finish_coords is None : + finish_coords = start_coords + start = Landmark(name='start', type='start', location=start_coords, osm_type='', osm_id=0, attractiveness=0, n_tags = 0) + finish = Landmark(name='finish', type='start', location=finish_coords, osm_type='', osm_id=0, attractiveness=0, n_tags = 0) #finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.8777055, 2.3640967), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) #start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=(48.847132, 2.312359), osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) #finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.843185, 2.344533), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) #finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.847132, 2.312359), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) + + manager = LandmarkManager(preferences=preferences, coordinates=start_coords) + # Generate the landmarks from the start location - landmarks, landmarks_short = generate_landmarks(preferences=preferences, coordinates=start.location) + landmarks, landmarks_short = manager.generate_landmarks_list() + + # Store data to file for debug purposes #write_data(landmarks, "landmarks_Wien.txt") # Insert start and finish to the landmarks list landmarks_short.insert(0, start) landmarks_short.append(finish) - max_walking_time = 180 # minutes - detour = 0 # minutes - # First stage optimization - base_tour = solve_optimization(landmarks_short, max_walking_time, True) + optimizer = Optimizer(max_time=preferences.max_time_minute, landmarks=landmarks_short) + base_tour = optimizer.solve_optimization() # Second stage using linear optimization - - refined_tour = refine_optimization(landmarks, base_tour, max_walking_time, detour, True) + refiner = Refiner(max_time = preferences.max_time_minute, detour = preferences.detour_tolerance_minute, all_landmarks=landmarks, base_tour=base_tour) + refined_tour = refiner.refine_optimization() return refined_tour -#test4(tuple((48.8344400, 2.3220540))) # Café Chez César -#test4(tuple((48.8375946, 2.2949904))) # Point random -#test4(tuple((47.377859, 8.540585))) # Zurich HB -#test4(tuple((45.7576485, 4.8330241))) # Lyon Bellecour -test4(tuple((48.5848435, 7.7332974))) # Strasbourg Gare -#test4(tuple((48.2067858, 16.3692340))) # Vienne +#test(tuple((48.8344400, 2.3220540))) # Café Chez César +#test(tuple((48.8375946, 2.2949904))) # Point random +#test(tuple((47.377859, 8.540585))) # Zurich HB +#test(tuple((45.7576485, 4.8330241))) # Lyon Bellecour +test(tuple((48.5848435, 7.7332974))) # Strasbourg Gare +#test(tuple((48.2067858, 16.3692340))) # Vienne diff --git a/backend/src/utils.py b/backend/src/utils.py new file mode 100644 index 0000000..d145c6f --- /dev/null +++ b/backend/src/utils.py @@ -0,0 +1,106 @@ +import yaml +from typing import List, Tuple +from geopy.distance import geodesic + +from structs.landmarks import Landmark +import constants + +def get_time(p1: Tuple[float, float], p2: Tuple[float, float]) -> int : + """ + Calculate the time in minutes to travel from one location to another. + + Args: + p1 (Tuple[float, float]): Coordinates of the starting location. + p2 (Tuple[float, float]): Coordinates of the destination. + detour (float): Detour factor affecting the distance. + speed (float): Walking speed in kilometers per hour. + + Returns: + int: Time to travel from p1 to p2 in minutes. + """ + + with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f: + parameters = yaml.safe_load(f) + detour_factor = parameters['detour_factor'] + average_walking_speed = parameters['average_walking_speed'] + + # Compute the straight-line distance in km + if p1 == p2 : + return 0 + else: + dist = geodesic(p1, p2).kilometers + + # Consider the detour factor for average cityto deterline walking distance (in km) + walk_dist = dist*detour_factor + + # Time to walk this distance (in minutes) + walk_time = walk_dist/average_walking_speed*60 + + return round(walk_time) + + + + +# Same as link_list but does it on a already ordered list +def link_list_simple(ordered_visit: List[Landmark])-> List[Landmark] : + + L = [] + j = 0 + total_dist = 0 + while j < len(ordered_visit)-1 : + elem = ordered_visit[j] + next = ordered_visit[j+1] + + elem.next_uuid = next.uuid + d = get_time(elem.location, next.location) + elem.time_to_reach_next = d + if elem.name not in ['start', 'finish'] : + elem.must_do = True + L.append(elem) + j += 1 + total_dist += d + + L.append(next) + + return L, total_dist + + + +# Take the most important landmarks from the list +def take_most_important(landmarks: List[Landmark], N_important) -> List[Landmark] : + + L = len(landmarks) + L_copy = [] + L_clean = [] + scores = [0]*len(landmarks) + names = [] + name_id = {} + + for i, elem in enumerate(landmarks) : + if elem.name not in names : + names.append(elem.name) + name_id[elem.name] = [i] + L_copy.append(elem) + else : + name_id[elem.name] += [i] + scores = [] + for j in name_id[elem.name] : + scores.append(L[j].attractiveness) + best_id = max(range(len(scores)), key=scores.__getitem__) + t = name_id[elem.name][best_id] + if t == i : + for old in L_copy : + if old.name == elem.name : + old.attractiveness = L[t].attractiveness + + scores = [0]*len(L_copy) + for i, elem in enumerate(L_copy) : + scores[i] = elem.attractiveness + + res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-(N_important-L):] + + for i, elem in enumerate(L_copy) : + if i in res : + L_clean.append(elem) + + return L_clean \ No newline at end of file