cleanup-backend #13
| @@ -1,128 +1,104 @@ | ||||
| import math as m | ||||
| import json, os | ||||
| import yaml | ||||
| import logging | ||||
|  | ||||
| from typing import List, Tuple, Optional | ||||
| from typing import List, Tuple | ||||
| from OSMPythonTools.overpass import Overpass, overpassQueryBuilder | ||||
| from OSMPythonTools import cachingStrategy | ||||
| from pywikibot import ItemPage, Site | ||||
| from pywikibot import config | ||||
| config.put_throttle = 0 | ||||
| config.maxlag = 0 | ||||
|  | ||||
| from structs.landmarks import Landmark, LandmarkType | ||||
| from structs.preferences import Preferences, Preference | ||||
| from structs.landmarks import Landmark | ||||
| from utils import take_most_important | ||||
| import constants | ||||
|  | ||||
|  | ||||
| SIGHTSEEING = LandmarkType(landmark_type='sightseeing') | ||||
| NATURE = LandmarkType(landmark_type='nature') | ||||
| SHOPPING = LandmarkType(landmark_type='shopping') | ||||
| SIGHTSEEING = 'sightseeing' | ||||
| NATURE = 'nature' | ||||
| SHOPPING = 'shopping' | ||||
|  | ||||
|  | ||||
| # Include the json here | ||||
| # Create a list of all things to visit given some preferences and a city. Ready for the optimizer | ||||
| def generate_landmarks(preferences: Preferences, coordinates: Tuple[float, float]) : | ||||
|  | ||||
|     l_sights, l_nature, l_shop = get_amenities() | ||||
| class LandmarkManager: | ||||
|  | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|     city_bbox_side: int     # bbox side in meters | ||||
|     radius_close_to: int    # radius in meters | ||||
|     church_coeff: float     # coeff to adjsut score of churches | ||||
|     park_coeff: float       # coeff to adjust score of parks | ||||
|     tag_coeff: float        # coeff to adjust weight of tags | ||||
|     N_important: int        # number of important landmarks to consider | ||||
|  | ||||
|     preferences: Preferences    # preferences of visit | ||||
|     location: Tuple[float]      # coordinates around which to find a path | ||||
|  | ||||
|     def __init__(self, preferences: Preferences, coordinates: Tuple[float, float]) -> None: | ||||
|  | ||||
|         with constants.AMENITY_SELECTORS_PATH.open('r') as f: | ||||
|             self.amenity_selectors = yaml.safe_load(f) | ||||
|  | ||||
|         with constants.LANDMARK_PARAMETERS_PATH.open('r') as f: | ||||
|             parameters = yaml.safe_load(f) | ||||
|             self.city_bbox_side = parameters['city_bbox_side'] | ||||
|             self.radius_close_to = parameters['radius_close_to'] | ||||
|             self.church_coeff = parameters['church_coeff'] | ||||
|             self.park_coeff = parameters['park_coeff'] | ||||
|             self.tag_coeff = parameters['tag_coeff'] | ||||
|             self.N_important = parameters['N_important'] | ||||
|          | ||||
|         self.preferences = preferences | ||||
|         self.location = coordinates | ||||
|  | ||||
|  | ||||
|     def generate_landmarks_list(self) -> Tuple[List[Landmark], List[Landmark]] : | ||||
|         """ | ||||
|         Generate and prioritize a list of landmarks based on user preferences. | ||||
|  | ||||
|         This method fetches landmarks from various categories (sightseeing, nature, shopping) based on the user's preferences | ||||
|         and current location. It scores and corrects these landmarks, removes duplicates, and then selects the most important | ||||
|         landmarks based on a predefined criterion. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[List[Landmark], List[Landmark]]: | ||||
|                 - A list of all existing landmarks. | ||||
|                 - A list of the most important landmarks based on the user's preferences. | ||||
|         """ | ||||
|  | ||||
|         L = [] | ||||
|  | ||||
|         # List for sightseeing | ||||
|     if preferences.sightseeing.score != 0 : | ||||
|         L1 = get_landmarks(l_sights, SIGHTSEEING, coordinates=coordinates) | ||||
|         correct_score(L1, preferences.sightseeing) | ||||
|         if self.preferences.sightseeing.score != 0 : | ||||
|             L1 = self.fetch_landmarks(self.amenity_selectors['sightseeing'], SIGHTSEEING, coordinates=self.location) | ||||
|             self.correct_score(L1, self.preferences.sightseeing) | ||||
|             L += L1 | ||||
|          | ||||
|         # List for nature | ||||
|     if preferences.nature.score != 0 : | ||||
|         L2 = get_landmarks(l_nature, NATURE, coordinates=coordinates) | ||||
|         correct_score(L2, preferences.nature) | ||||
|         if self.preferences.nature.score != 0 : | ||||
|             L2 = self.fetch_landmarks(self.amenity_selectors['nature'], NATURE, coordinates=self.location) | ||||
|             self.correct_score(L2, self.preferences.nature) | ||||
|             L += L2 | ||||
|          | ||||
|         # List for shopping | ||||
|     if preferences.shopping.score != 0 : | ||||
|         L3 = get_landmarks(l_shop, SHOPPING, coordinates=coordinates) | ||||
|         correct_score(L3, preferences.shopping) | ||||
|         if self.preferences.shopping.score != 0 : | ||||
|             L3 = self.fetch_landmarks(self.amenity_selectors['shopping'], SHOPPING, coordinates=self.location) | ||||
|             self.correct_score(L3, self.preferences.shopping) | ||||
|             L += L3 | ||||
|  | ||||
|     L = remove_duplicates(L) | ||||
|         L = self.remove_duplicates(L) | ||||
|  | ||||
|     return L, take_most_important(L) | ||||
|         return L, take_most_important(L, self.N_important) | ||||
|  | ||||
|  | ||||
| # Helper function to gather the amenities list | ||||
| def get_amenities() -> List[List[str]] : | ||||
|      | ||||
|     # Get the list of amenities from the files | ||||
|     sightseeing = get_list('/amenities/sightseeing.am') | ||||
|     nature = get_list('/amenities/nature.am') | ||||
|     shopping = get_list('/amenities/shopping.am') | ||||
|  | ||||
|     return sightseeing, nature, shopping | ||||
|  | ||||
|  | ||||
| # Helper function to read a .am file and generate the corresponding list | ||||
| def get_list(path: str) -> List[str] : | ||||
|  | ||||
|     with open(os.path.dirname(os.path.abspath(__file__)) + path) as f : | ||||
|         content = f.readlines() | ||||
|  | ||||
|         amenities = [] | ||||
|         for line in content : | ||||
|             if not line.startswith('#') : | ||||
|                 amenities.append(line.strip('\n')) | ||||
|  | ||||
|     return amenities | ||||
|  | ||||
|  | ||||
| # Take the most important landmarks from the list | ||||
| def take_most_important(L: List[Landmark], N = 0) -> List[Landmark] : | ||||
|      | ||||
|     # Read the parameters from the file | ||||
|     with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f : | ||||
|         parameters = json.loads(f.read()) | ||||
|         N_important = parameters['N important'] | ||||
|      | ||||
|     L_copy = [] | ||||
|     L_clean = [] | ||||
|     scores = [0]*len(L) | ||||
|     names = [] | ||||
|     name_id = {} | ||||
|  | ||||
|     for i, elem in enumerate(L) : | ||||
|         if elem.name not in names : | ||||
|             names.append(elem.name) | ||||
|             name_id[elem.name] = [i] | ||||
|             L_copy.append(elem) | ||||
|         else : | ||||
|             name_id[elem.name] += [i] | ||||
|             scores = [] | ||||
|             for j in name_id[elem.name] : | ||||
|                 scores.append(L[j].attractiveness) | ||||
|             best_id = max(range(len(scores)), key=scores.__getitem__) | ||||
|             t = name_id[elem.name][best_id] | ||||
|             if t == i : | ||||
|                 for old in L_copy : | ||||
|                     if old.name == elem.name : | ||||
|                         old.attractiveness = L[t].attractiveness | ||||
|      | ||||
|     scores = [0]*len(L_copy) | ||||
|     for i, elem in enumerate(L_copy) : | ||||
|         scores[i] = elem.attractiveness | ||||
|  | ||||
|     res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-(N_important-N):] | ||||
|  | ||||
|     for i, elem in enumerate(L_copy) : | ||||
|         if i in res : | ||||
|             L_clean.append(elem) | ||||
|  | ||||
|     return L_clean | ||||
|  | ||||
|  | ||||
| # Remove duplicate elements and elements with low score | ||||
| def remove_duplicates(L: List[Landmark]) -> List[Landmark] : | ||||
|     def remove_duplicates(self, landmarks: List[Landmark]) -> List[Landmark] : | ||||
|         """ | ||||
|     Removes duplicate landmarks based on their names from the given list. | ||||
|         Removes duplicate landmarks based on their names from the given list. Only retains the landmark with highest score | ||||
|  | ||||
|         Parameters: | ||||
|     L (List[Landmark]): A list of Landmark objects. | ||||
|         landmarks (List[Landmark]): A list of Landmark objects. | ||||
|  | ||||
|         Returns: | ||||
|         List[Landmark]: A list of unique Landmark objects based on their names. | ||||
| @@ -131,11 +107,9 @@ def remove_duplicates(L: List[Landmark]) -> List[Landmark] : | ||||
|         L_clean = [] | ||||
|         names = [] | ||||
|  | ||||
|     for landmark in L : | ||||
|         for landmark in landmarks : | ||||
|             if landmark.name in names:  | ||||
|                 continue   | ||||
|  | ||||
|  | ||||
|             else : | ||||
|                 names.append(landmark.name) | ||||
|                 L_clean.append(landmark) | ||||
| @@ -143,25 +117,51 @@ def remove_duplicates(L: List[Landmark]) -> List[Landmark] : | ||||
|         return L_clean | ||||
|          | ||||
|  | ||||
| # Correct the score of a list of landmarks by taking into account preference settings | ||||
| def correct_score(L: List[Landmark], preference: Preference) : | ||||
|     def correct_score(self, landmarks: List[Landmark], preference: Preference) : | ||||
|         """ | ||||
|         Adjust the attractiveness score of each landmark in the list based on user preferences. | ||||
|  | ||||
|     if len(L) == 0 : | ||||
|         This method updates the attractiveness of each landmark by scaling it according to the user's preference score. | ||||
|         The score adjustment is computed using a simple linear transformation based on the preference score. | ||||
|  | ||||
|         Args: | ||||
|             landmarks (List[Landmark]): A list of landmarks whose scores need to be corrected. | ||||
|             preference (Preference): The user's preference settings that influence the attractiveness score adjustment. | ||||
|  | ||||
|         Raises: | ||||
|             TypeError: If the type of any landmark in the list does not match the expected type in the preference. | ||||
|         """ | ||||
|  | ||||
|         if len(landmarks) == 0 : | ||||
|             return | ||||
|          | ||||
|     if L[0].type != preference.type : | ||||
|         raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {L[0].name}") | ||||
|         if landmarks[0].type != preference.type : | ||||
|             raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {landmarks[0].name}") | ||||
|  | ||||
|     for elem in L : | ||||
|         for elem in landmarks : | ||||
|             elem.attractiveness = int(elem.attractiveness*preference.score/5)     # arbitrary computation | ||||
|  | ||||
|  | ||||
| # Function to count elements within a certain radius of a location | ||||
| def count_elements_within_radius(coordinates: Tuple[float, float], radius: int) -> int: | ||||
|     def count_elements_close_to(self, coordinates: Tuple[float, float]) -> int: | ||||
|         """ | ||||
|         Count the number of OpenStreetMap elements (nodes, ways, relations) within a specified radius of the given location. | ||||
|  | ||||
|         This function constructs a bounding box around the specified coordinates based on the radius. It then queries | ||||
|         OpenStreetMap data to count the number of elements within that bounding box. | ||||
|  | ||||
|         Args: | ||||
|             coordinates (Tuple[float, float]): The latitude and longitude of the location to search around. | ||||
|  | ||||
|         Returns: | ||||
|             int: The number of elements (nodes, ways, relations) within the specified radius. Returns 0 if no elements | ||||
|                 are found or if an error occurs during the query. | ||||
|         """ | ||||
|          | ||||
|         lat = coordinates[0] | ||||
|         lon = coordinates[1] | ||||
|  | ||||
|         radius = self.radius_close_to | ||||
|  | ||||
|         alpha = (180*radius) / (6371000*m.pi) | ||||
|         bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha} | ||||
|              | ||||
| @@ -177,19 +177,27 @@ def count_elements_within_radius(coordinates: Tuple[float, float], radius: int) | ||||
|             if N_elem is None : | ||||
|                 return 0 | ||||
|             return N_elem | ||||
|      | ||||
|         except : | ||||
|             return 0 | ||||
|  | ||||
|  | ||||
| # Creates a bounding box around given coordinates, side_length in meters | ||||
| def create_bbox(coordinates: Tuple[float, float], side_length: int) -> Tuple[float, float, float, float]: | ||||
|     def create_bbox(self, coordinates: Tuple[float, float]) -> Tuple[float, float, float, float]: | ||||
|         """ | ||||
|         Create a bounding box around the given coordinates. | ||||
|  | ||||
|         Args: | ||||
|             coordinates (Tuple[float, float]): The latitude and longitude of the center of the bounding box. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude | ||||
|                                                 defining the bounding box. | ||||
|         """ | ||||
|          | ||||
|         lat = coordinates[0] | ||||
|         lon = coordinates[1] | ||||
|  | ||||
|         # Half the side length in km (since it's a square bbox) | ||||
|     half_side_length_km = side_length / 2 / 1000 | ||||
|         half_side_length_km = self.city_bbox_side / 2 / 1000 | ||||
|  | ||||
|         # Convert distance to degrees | ||||
|         lat_diff = half_side_length_km / 111  # 1 degree latitude is approximately 111 km | ||||
| @@ -204,19 +212,28 @@ def create_bbox(coordinates: Tuple[float, float], side_length: int) -> Tuple[flo | ||||
|         return min_lat, min_lon, max_lat, max_lon | ||||
|  | ||||
|  | ||||
| def get_landmarks(list_amenity: list, landmarktype: LandmarkType, coordinates: Tuple[float, float]) -> List[Landmark] : | ||||
|     def fetch_landmarks(self, list_amenity: list, landmarktype: str, coordinates: Tuple[float, float]) -> List[Landmark] : | ||||
|         """ | ||||
|         Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates. | ||||
|  | ||||
|     # Read the parameters from the file | ||||
|     with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f : | ||||
|         parameters = json.loads(f.read()) | ||||
|         tag_coeff = parameters['tag coeff'] | ||||
|         park_coeff = parameters['park coeff'] | ||||
|         church_coeff = parameters['church coeff'] | ||||
|         radius = parameters['radius close to'] | ||||
|         bbox_side = parameters['city bbox side'] | ||||
|         Args: | ||||
|             list_amenity (list): A list of OSM amenity queries to be used for fetching landmarks.  | ||||
|                                 These queries are typically used to filter results (e.g., [''amenity'='place_of_worship']). | ||||
|             landmarktype (str): The type of the landmark (e.g., 'sightseeing', 'nature', 'shopping'). | ||||
|             coordinates (Tuple[float, float]): The central coordinates (latitude, longitude) for the bounding box. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]: A list of Landmark objects that were fetched and filtered based on the provided criteria. | ||||
|  | ||||
|         Notes: | ||||
|             - The bounding box is created around the given coordinates with a side length defined by `self.city_bbox_side`. | ||||
|             - Landmarks are fetched using Overpass API queries. | ||||
|             - Landmarks are filtered based on various conditions including tags and type. | ||||
|             - Scores are assigned to landmarks based on their attributes and surrounding elements. | ||||
|         """ | ||||
|          | ||||
|         # Create bbox around start location | ||||
|     bbox = create_bbox(coordinates, bbox_side) | ||||
|         bbox = self.create_bbox(coordinates) | ||||
|  | ||||
|         # Initialize some variables | ||||
|         N = 0 | ||||
| @@ -272,9 +289,9 @@ def get_landmarks(list_amenity: list, landmarktype: LandmarkType, coordinates: T | ||||
|                             n_languages = len(item.labels) | ||||
|                             n_tags += n_languages/10 | ||||
|  | ||||
|                     if elem_type != LandmarkType(landmark_type="nature") : | ||||
|                         if elem_type != "nature" : | ||||
|                             if "leisure" in tag and elem.tag('leisure') == "park": | ||||
|                             elem_type = LandmarkType(landmark_type="nature") | ||||
|                                 elem_type = "nature" | ||||
|  | ||||
|                         if amenity not in ["'shop'='department_store'", "'shop'='mall'"] : | ||||
|                             if "shop" in tag : | ||||
| @@ -290,14 +307,15 @@ def get_landmarks(list_amenity: list, landmarktype: LandmarkType, coordinates: T | ||||
|  | ||||
|                     # Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT | ||||
|                     if amenity == "'amenity'='place_of_worship'" : | ||||
|                     #score = int((count_elements_within_radius(location, radius) + (n_tags*tag_coeff) )*church_coeff)   | ||||
|                     score = int((count_elements_within_radius(location, radius) + ((n_tags**1.2)*tag_coeff) )*church_coeff)   | ||||
|                         #score = int((count_elements_close_to(location, radius) + (n_tags*tag_coeff) )*church_coeff)   | ||||
|                         score = int((self.count_elements_close_to(location) + ((n_tags**1.2)*self.tag_coeff) )*self.church_coeff)   | ||||
|                     elif amenity == "'leisure'='park'" : | ||||
|                     score = int((count_elements_within_radius(location, radius) + ((n_tags**1.2)*tag_coeff) )*park_coeff)   | ||||
|                         score = int((self.count_elements_close_to(location) + ((n_tags**1.2)*self.tag_coeff) )*self.park_coeff)   | ||||
|                     else : | ||||
|                     score = int(count_elements_within_radius(location, radius) + ((n_tags**1.2)*tag_coeff)) | ||||
|                         score = int(self.count_elements_close_to(location) + ((n_tags**1.2)*self.tag_coeff)) | ||||
|  | ||||
|                     if score is not None : | ||||
|  | ||||
|                         # Generate the landmark and append it to the list | ||||
|                         #print(f"There are {n_tags} tags on this Landmark. Total score : {score}\n") | ||||
|                         landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=int(n_tags)) | ||||
|   | ||||
							
								
								
									
										570
									
								
								backend/src/optimizer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										570
									
								
								backend/src/optimizer.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,570 @@ | ||||
| import yaml, logging | ||||
| import numpy as np | ||||
|  | ||||
| from typing import List, Tuple | ||||
| from scipy.optimize import linprog | ||||
| from collections import defaultdict, deque | ||||
| from geopy.distance import geodesic | ||||
|  | ||||
| from structs.landmarks import Landmark | ||||
| import constants | ||||
|  | ||||
|      | ||||
|  | ||||
|  | ||||
|  | ||||
| class Optimizer: | ||||
|  | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|     landmarks: List[Landmark]   # list of landmarks | ||||
|     max_time: int = None        # max visit time (in minutes) | ||||
|     detour: int = None          # accepted max detour time (in minutes) | ||||
|     detour_factor: float        # detour factor of straight line vs real distance in cities | ||||
|     average_walking_speed: float        # average walking speed of adult | ||||
|     max_landmarks: int          # max number of landmarks to visit | ||||
|  | ||||
|  | ||||
|     def __init__(self, max_time: int, landmarks: List[Landmark]) : | ||||
|         self.max_time = max_time | ||||
|         self.landmarks = landmarks | ||||
|  | ||||
|         # load parameters from file | ||||
|         with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f: | ||||
|             parameters = yaml.safe_load(f) | ||||
|             self.detour_factor = parameters['detour_factor'] | ||||
|             self.average_walking_speed = parameters['average_walking_speed'] | ||||
|             self.max_landmarks = parameters['max_landmarks'] | ||||
|          | ||||
|  | ||||
|     def print_res(self, L: List[Landmark]): | ||||
|         """ | ||||
|         Print the suggested order of landmarks to visit and log the total time walked. | ||||
|  | ||||
|         Args: | ||||
|             L (List[Landmark]): List of landmarks in the suggested visit order. | ||||
|         """ | ||||
|  | ||||
|         self.logger.info(f'The following order is suggested : ') | ||||
|         dist = 0 | ||||
|         for elem in L :  | ||||
|             if elem.time_to_reach_next is not None : | ||||
|                 print('- ' + elem.name + ', time to reach next = ' + str(elem.time_to_reach_next)) | ||||
|                 dist += elem.time_to_reach_next | ||||
|             else :  | ||||
|                 print('- ' + elem.name) | ||||
|         self.logger.info(f'Minutes walked : {dist}') | ||||
|         self.logger.info(f'Visited {len(L)-2} landmarks') | ||||
|  | ||||
|  | ||||
|     # Prevent the use of a particular solution | ||||
|     def prevent_config(self, resx): | ||||
|         """ | ||||
|         Prevent the use of a particular solution by adding constraints to the optimization. | ||||
|  | ||||
|         Args: | ||||
|             resx (List[float]): List of edge weights. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[List[int], List[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector. | ||||
|         """ | ||||
|          | ||||
|         for i, elem in enumerate(resx): | ||||
|             resx[i] = round(elem) | ||||
|          | ||||
|         N = len(resx)               # Number of edges | ||||
|         L = int(np.sqrt(N))         # Number of landmarks | ||||
|  | ||||
|         nonzeroind = np.nonzero(resx)[0]                    # the return is a little funky so I use the [0] | ||||
|         nonzero_tup = np.unravel_index(nonzeroind, (L,L)) | ||||
|  | ||||
|         ind_a = nonzero_tup[0].tolist() | ||||
|         vertices_visited = ind_a | ||||
|         vertices_visited.remove(0) | ||||
|  | ||||
|         ones = [1]*L | ||||
|         h = [0]*N | ||||
|         for i in range(L) : | ||||
|             if i in vertices_visited : | ||||
|                 h[i*L:i*L+L] = ones | ||||
|  | ||||
|         return h, [len(vertices_visited)-1] | ||||
|  | ||||
|  | ||||
|     # Prevents the creation of the same circle (both directions) | ||||
|     def prevent_circle(self, circle_vertices: list, L: int) : | ||||
|         """ | ||||
|         Prevent circular paths by by adding constraints to the optimization. | ||||
|  | ||||
|         Args: | ||||
|             circle_vertices (list): List of vertices forming a circle. | ||||
|             L (int): Number of landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector. | ||||
|         """ | ||||
|  | ||||
|         l1 = [0]*L*L | ||||
|         l2 = [0]*L*L | ||||
|         for i, node in enumerate(circle_vertices[:-1]) : | ||||
|             next = circle_vertices[i+1] | ||||
|  | ||||
|             l1[node*L + next] = 1 | ||||
|             l2[next*L + node] = 1 | ||||
|  | ||||
|         s = circle_vertices[0] | ||||
|         g = circle_vertices[-1] | ||||
|  | ||||
|         l1[g*L + s] = 1 | ||||
|         l2[s*L + g] = 1 | ||||
|  | ||||
|         return np.vstack((l1, l2)), [0, 0] | ||||
|  | ||||
|  | ||||
|     def is_connected(self, resx) : | ||||
|         """ | ||||
|         Determine the order of visits and detect any circular paths in the given configuration. | ||||
|  | ||||
|         Args: | ||||
|             resx (list): List of edge weights. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[List[int], Optional[List[List[int]]]]: A tuple containing the visit order and a list of any detected circles. | ||||
|         """ | ||||
|  | ||||
|         # first round the results to have only 0-1 values | ||||
|         for i, elem in enumerate(resx): | ||||
|             resx[i] = round(elem) | ||||
|          | ||||
|         N = len(resx)               # length of res | ||||
|         L = int(np.sqrt(N))         # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def. | ||||
|  | ||||
|         nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0] | ||||
|         nonzero_tup = np.unravel_index(nonzeroind, (L,L)) | ||||
|  | ||||
|         ind_a = nonzero_tup[0].tolist() | ||||
|         ind_b = nonzero_tup[1].tolist() | ||||
|  | ||||
|         # Step 1: Create a graph representation | ||||
|         graph = defaultdict(list) | ||||
|         for a, b in zip(ind_a, ind_b): | ||||
|             graph[a].append(b) | ||||
|  | ||||
|         # Step 2: Function to perform BFS/DFS to extract journeys | ||||
|         def get_journey(start): | ||||
|             journey_nodes = [] | ||||
|             visited = set() | ||||
|             stack = deque([start]) | ||||
|  | ||||
|             while stack: | ||||
|                 node = stack.pop() | ||||
|                 if node not in visited: | ||||
|                     visited.add(node) | ||||
|                     journey_nodes.append(node) | ||||
|                     for neighbor in graph[node]: | ||||
|                         if neighbor not in visited: | ||||
|                             stack.append(neighbor) | ||||
|  | ||||
|             return journey_nodes | ||||
|  | ||||
|         # Step 3: Extract all journeys | ||||
|         all_journeys_nodes = [] | ||||
|         visited_nodes = set() | ||||
|  | ||||
|         for node in ind_a: | ||||
|             if node not in visited_nodes: | ||||
|                 journey_nodes = get_journey(node) | ||||
|                 all_journeys_nodes.append(journey_nodes) | ||||
|                 visited_nodes.update(journey_nodes) | ||||
|  | ||||
|         for l in all_journeys_nodes : | ||||
|             if 0 in l : | ||||
|                 order = l | ||||
|                 all_journeys_nodes.remove(l) | ||||
|                 break | ||||
|  | ||||
|         if len(all_journeys_nodes) == 0 : | ||||
|             return order, None | ||||
|  | ||||
|         return order, all_journeys_nodes | ||||
|  | ||||
|  | ||||
|     def get_time(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> int : | ||||
|         """ | ||||
|         Calculate the time in minutes to travel from one location to another. | ||||
|  | ||||
|         Args: | ||||
|             p1 (Tuple[float, float]): Coordinates of the starting location. | ||||
|             p2 (Tuple[float, float]): Coordinates of the destination. | ||||
|  | ||||
|         Returns: | ||||
|             int: Time to travel from p1 to p2 in minutes. | ||||
|         """ | ||||
|  | ||||
|         # Compute the straight-line distance in km | ||||
|         if p1 == p2 : | ||||
|             return 0 | ||||
|         else:  | ||||
|             dist = geodesic(p1, p2).kilometers | ||||
|  | ||||
|         # Consider the detour factor for average cityto deterline walking distance (in km) | ||||
|         walk_dist = dist*self.detour_factor | ||||
|  | ||||
|         # Time to walk this distance (in minutes) | ||||
|         walk_time = walk_dist/self.average_walking_speed*60 | ||||
|  | ||||
|         return round(walk_time) | ||||
|  | ||||
|  | ||||
|     def init_ub_dist(self, landmarks: List[Landmark], max_steps: int): | ||||
|         """ | ||||
|         Initialize the objective function coefficients and inequality constraints for the optimization problem. | ||||
|  | ||||
|         This function computes the distances between all landmarks and stores their attractiveness to maximize sightseeing.  | ||||
|         The goal is to maximize the objective function subject to the constraints A*x < b and A_eq*x = b_eq. | ||||
|  | ||||
|         Args: | ||||
|             landmarks (List[Landmark]): List of landmarks. | ||||
|             max_steps (int): Maximum number of steps allowed. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[List[float], List[float], List[int]]: Objective function coefficients, inequality constraint coefficients, and the right-hand side of the inequality constraint. | ||||
|         """ | ||||
|          | ||||
|         # Objective function coefficients. a*x1 + b*x2 + c*x3 + ... | ||||
|         c = [] | ||||
|         # Coefficients of inequality constraints (left-hand side) | ||||
|         A_ub = [] | ||||
|  | ||||
|         for spot1 in landmarks : | ||||
|             dist_table = [0]*len(landmarks) | ||||
|             c.append(-spot1.attractiveness) | ||||
|             for j, spot2 in enumerate(landmarks) : | ||||
|                 t = self.get_time(spot1.location, spot2.location) | ||||
|                 dist_table[j] = t | ||||
|             closest = sorted(dist_table)[:22] | ||||
|             for i, dist in enumerate(dist_table) : | ||||
|                 if dist not in closest : | ||||
|                     dist_table[i] = 32700 | ||||
|             A_ub += dist_table | ||||
|         c = c*len(landmarks) | ||||
|  | ||||
|         return c, A_ub, [max_steps] | ||||
|  | ||||
|  | ||||
|     def respect_number(self, L: int): | ||||
|         """ | ||||
|         Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks. | ||||
|  | ||||
|         Args: | ||||
|             L (int): Number of landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. | ||||
|         """ | ||||
|  | ||||
|         ones = [1]*L | ||||
|         zeros = [0]*L | ||||
|         A = ones + zeros*(L-1) | ||||
|         b = [1] | ||||
|         for i in range(L-1) : | ||||
|             h_new = zeros*i + ones + zeros*(L-1-i) | ||||
|             A = np.vstack((A, h_new)) | ||||
|             b.append(1) | ||||
|  | ||||
|         A = np.vstack((A, ones*L)) | ||||
|         b.append(self.max_landmarks+1) | ||||
|  | ||||
|         return A, b | ||||
|  | ||||
|  | ||||
|     # Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements | ||||
|     def break_sym(self, L): | ||||
|         """ | ||||
|         Generate constraints to prevent simultaneous travel between two landmarks in both directions. | ||||
|  | ||||
|         Args: | ||||
|             L (int): Number of landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. | ||||
|         """ | ||||
|  | ||||
|         upper_ind = np.triu_indices(L,0,L) | ||||
|  | ||||
|         up_ind_x = upper_ind[0] | ||||
|         up_ind_y = upper_ind[1] | ||||
|  | ||||
|         A = [0]*L*L | ||||
|         b = [1] | ||||
|  | ||||
|         for i, _ in enumerate(up_ind_x[1:]) : | ||||
|             l = [0]*L*L | ||||
|             if up_ind_x[i] != up_ind_y[i] : | ||||
|                 l[up_ind_x[i]*L + up_ind_y[i]] = 1 | ||||
|                 l[up_ind_y[i]*L + up_ind_x[i]] = 1 | ||||
|  | ||||
|                 A = np.vstack((A,l)) | ||||
|                 b.append(1) | ||||
|  | ||||
|         return A, b | ||||
|  | ||||
|  | ||||
|     def init_eq_not_stay(self, L: int):  | ||||
|         """ | ||||
|         Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.). | ||||
|  | ||||
|         Args: | ||||
|             L (int): Number of landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[List[np.ndarray], List[int]]: Equality constraint coefficients and the right-hand side of the equality constraints. | ||||
|         """ | ||||
|  | ||||
|         l = [0]*L*L | ||||
|  | ||||
|         for i in range(L) : | ||||
|             for j in range(L) : | ||||
|                 if j == i : | ||||
|                     l[j + i*L] = 1 | ||||
|          | ||||
|         l = np.array(np.array(l), dtype=np.int8) | ||||
|  | ||||
|         return [l], [0] | ||||
|  | ||||
|  | ||||
|     def respect_user_must_do(self, landmarks: List[Landmark]) : | ||||
|         """ | ||||
|         Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization. | ||||
|  | ||||
|         Args: | ||||
|             landmarks (List[Landmark]): List of landmarks, where some are marked as 'must_do'. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. | ||||
|         """ | ||||
|  | ||||
|         L = len(landmarks) | ||||
|         A = [0]*L*L | ||||
|         b = [0] | ||||
|  | ||||
|         for i, elem in enumerate(landmarks[1:]) : | ||||
|             if elem.must_do is True and elem.name not in ['finish', 'start']: | ||||
|                 l = [0]*L*L | ||||
|                 l[i*L:i*L+L] = [1]*L        # set mandatory departures from landmarks tagged as 'must_do' | ||||
|  | ||||
|                 A = np.vstack((A,l)) | ||||
|                 b.append(1) | ||||
|  | ||||
|         return A, b | ||||
|      | ||||
|  | ||||
|     def respect_user_must_avoid(self, landmarks: List[Landmark]) : | ||||
|         """ | ||||
|         Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped in the optimization. | ||||
|  | ||||
|         Args: | ||||
|             landmarks (List[Landmark]): List of landmarks, where some are marked as 'must_avoid'. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. | ||||
|         """ | ||||
|  | ||||
|         L = len(landmarks) | ||||
|         A = [0]*L*L | ||||
|         b = [0] | ||||
|  | ||||
|         for i, elem in enumerate(landmarks[1:]) : | ||||
|             if elem.must_avoid is True and elem.name not in ['finish', 'start']: | ||||
|                 l = [0]*L*L | ||||
|                 l[i*L:i*L+L] = [1]*L         | ||||
|  | ||||
|                 A = np.vstack((A,l)) | ||||
|                 b.append(0)             # prevent departures from landmarks tagged as 'must_do' | ||||
|  | ||||
|         return A, b | ||||
|  | ||||
|  | ||||
|     # Constraint to ensure start at start and finish at goal | ||||
|     def respect_start_finish(self, L: int): | ||||
|         """ | ||||
|         Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark. | ||||
|  | ||||
|         Args: | ||||
|             L (int): Number of landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. | ||||
|         """ | ||||
|  | ||||
|         l_start = [1]*L + [0]*L*(L-1)   # sets departures only for start (horizontal ones) | ||||
|         l_start[L-1] = 0                # prevents the jump from start to finish | ||||
|         l_goal = [0]*L*L                # sets arrivals only for finish (vertical ones) | ||||
|         l_L = [0]*L*(L-1) + [1]*L       # prevents arrivals at start and departures from goal | ||||
|         for k in range(L-1) :           # sets only vertical ones for goal (go to) | ||||
|             l_L[k*L] = 1 | ||||
|             if k != 0 : | ||||
|                 l_goal[k*L+L-1] = 1      | ||||
|  | ||||
|         A = np.vstack((l_start, l_goal)) | ||||
|         b = [1, 1] | ||||
|         A = np.vstack((A,l_L)) | ||||
|         b.append(0) | ||||
|  | ||||
|         return A, b | ||||
|  | ||||
|  | ||||
|     def respect_order(self, L: int):  | ||||
|         """ | ||||
|         Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles. | ||||
|  | ||||
|         Args: | ||||
|             L (int): Number of landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[np.ndarray, List[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. | ||||
|         """ | ||||
|  | ||||
|         A = [0]*L*L | ||||
|         b = [0] | ||||
|         for i in range(L-1) :           # Prevent stacked ones | ||||
|             if i == 0 or i == L-1:      # Don't touch start or finish | ||||
|                 continue | ||||
|             else :  | ||||
|                 l = [0]*L | ||||
|                 l[i] = -1 | ||||
|                 l = l*L | ||||
|                 for j in range(L) : | ||||
|                     l[i*L + j] = 1 | ||||
|  | ||||
|                 A = np.vstack((A,l)) | ||||
|                 b.append(0) | ||||
|  | ||||
|         return A, b | ||||
|  | ||||
|  | ||||
|     def link_list(self, order: List[int], landmarks: List[Landmark])->List[Landmark] : | ||||
|         """ | ||||
|         Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times. | ||||
|  | ||||
|         Args: | ||||
|             order (List[int]): List of indices representing the order of landmarks to visit. | ||||
|             landmarks (List[Landmark]): List of all landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]]: The updated linked list of landmarks with travel times | ||||
|         """ | ||||
|          | ||||
|         L =  [] | ||||
|         j = 0 | ||||
|         while j < len(order)-1 : | ||||
|             # get landmarks involved | ||||
|             elem = landmarks[order[j]] | ||||
|             next = landmarks[order[j+1]] | ||||
|  | ||||
|             # get attributes | ||||
|             elem.time_to_reach_next = self.get_time(elem.location, next.location) | ||||
|             elem.must_do = True | ||||
|             elem.location = (round(elem.location[0], 5), round(elem.location[1], 5)) | ||||
|             elem.next_uuid = next.uuid | ||||
|             L.append(elem) | ||||
|             j += 1 | ||||
|  | ||||
|         next.location = (round(next.location[0], 5), round(next.location[1], 5)) | ||||
|         next.must_do = True    | ||||
|         L.append(next) | ||||
|          | ||||
|         return L | ||||
|  | ||||
|  | ||||
|     # Main optimization pipeline | ||||
|     def solve_optimization (self) : | ||||
|         """ | ||||
|         Main optimization pipeline to solve the landmark visiting problem. | ||||
|  | ||||
|         This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks, | ||||
|         considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found. | ||||
|         """ | ||||
|  | ||||
|         L = len(self.landmarks) | ||||
|  | ||||
|         # SET CONSTRAINTS FOR INEQUALITY | ||||
|         c, A_ub, b_ub = self.init_ub_dist(self.landmarks, self.max_time)          # Add the distances from each landmark to the other | ||||
|         A, b = self.respect_number(L)                     # Respect max number of visits (no more possible stops than landmarks).  | ||||
|         A_ub = np.vstack((A_ub, A), dtype=np.int16) | ||||
|         b_ub += b | ||||
|         A, b = self.break_sym(L)                                         # break the 'zig-zag' symmetry | ||||
|         A_ub = np.vstack((A_ub, A), dtype=np.int16) | ||||
|         b_ub += b | ||||
|  | ||||
|  | ||||
|         # SET CONSTRAINTS FOR EQUALITY | ||||
|         A_eq, b_eq = self.init_eq_not_stay(L)                            # Force solution not to stay in same place | ||||
|         A, b = self.respect_user_must_do(self.landmarks)                      # Check if there are user_defined must_see. Also takes care of start/goal | ||||
|         A_eq = np.vstack((A_eq, A), dtype=np.int8) | ||||
|         b_eq += b | ||||
|         A, b = self.respect_start_finish(L)                  # Force start and finish positions | ||||
|         A_eq = np.vstack((A_eq, A), dtype=np.int8) | ||||
|         b_eq += b | ||||
|         A, b = self.respect_order(L)                         # Respect order of visit (only works when max_steps is limiting factor) | ||||
|         A_eq = np.vstack((A_eq, A), dtype=np.int8) | ||||
|         b_eq += b | ||||
|          | ||||
|         # SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1) | ||||
|         x_bounds = [(0, 1)]*L*L | ||||
|  | ||||
|         # Solve linear programming problem | ||||
|         res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) | ||||
|  | ||||
|         # Raise error if no solution is found | ||||
|         if not res.success : | ||||
|                 raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos") | ||||
|  | ||||
|         # If there is a solution, we're good to go, just check for connectiveness | ||||
|         else : | ||||
|             order, circles = self.is_connected(res.x) | ||||
|             #nodes, edges = is_connected(res.x) | ||||
|             i = 0 | ||||
|             timeout = 80 | ||||
|             while circles is not None and i < timeout: | ||||
|                 A, b = self.prevent_config(res.x) | ||||
|                 A_ub = np.vstack((A_ub, A)) | ||||
|                 b_ub += b | ||||
|                 #A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub) | ||||
|                 for circle in circles : | ||||
|                     A, b = self.prevent_circle(circle, L) | ||||
|                     A_eq = np.vstack((A_eq, A)) | ||||
|                     b_eq += b | ||||
|                 res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) | ||||
|                 if not res.success : | ||||
|                     self.logger.info("Solving failed because of overconstrained problem") | ||||
|                     return None | ||||
|                 order, circles = self.is_connected(res.x) | ||||
|                 #nodes, edges = is_connected(res.x) | ||||
|                 if circles is None : | ||||
|                     break | ||||
|                 print(i) | ||||
|                 i += 1 | ||||
|              | ||||
|             if i == timeout : | ||||
|                 raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.") | ||||
|  | ||||
|             # Add the times to reach and stop optimizing | ||||
|             tour = self.link_list(order, self.landmarks) | ||||
|              | ||||
|             # logging | ||||
|             if i != 0 : | ||||
|                 self.logger.info(f"Neded to recompute paths {i} times because of unconnected loops...") | ||||
|             self.print_res(tour)  # how to do better ?   | ||||
|             self.logger.info(f"Total score : {int(-res.fun)}") | ||||
|  | ||||
|             return tour | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| @@ -1,446 +0,0 @@ | ||||
| import numpy as np | ||||
| import json, os | ||||
|  | ||||
| from typing import List, Tuple | ||||
| from scipy.optimize import linprog | ||||
| from collections import defaultdict, deque | ||||
| from geopy.distance import geodesic | ||||
|  | ||||
| from structs.landmarks import Landmark | ||||
|  | ||||
|      | ||||
| # Function to print the result | ||||
| def print_res(L: List[Landmark]): | ||||
|  | ||||
|     print('The following order is suggested : ') | ||||
|  | ||||
|     dist = 0 | ||||
|     for elem in L :  | ||||
|         if elem.time_to_reach_next is not None : | ||||
|             print('- ' + elem.name + ', time to reach next = ' + str(elem.time_to_reach_next)) | ||||
|             dist += elem.time_to_reach_next | ||||
|         else :  | ||||
|             print('- ' + elem.name) | ||||
|  | ||||
|     print("\nMinutes walked : " + str(dist)) | ||||
|     print(f"Visited {len(L)-2} landmarks") | ||||
|  | ||||
|  | ||||
| # Prevent the use of a particular solution | ||||
| def prevent_config(resx): | ||||
|      | ||||
|     for i, elem in enumerate(resx): | ||||
|         resx[i] = round(elem) | ||||
|      | ||||
|     N = len(resx)               # Number of edges | ||||
|     L = int(np.sqrt(N))         # Number of landmarks | ||||
|  | ||||
|     nonzeroind = np.nonzero(resx)[0]                    # the return is a little funky so I use the [0] | ||||
|     nonzero_tup = np.unravel_index(nonzeroind, (L,L)) | ||||
|  | ||||
|     ind_a = nonzero_tup[0].tolist() | ||||
|     vertices_visited = ind_a | ||||
|     vertices_visited.remove(0) | ||||
|  | ||||
|     ones = [1]*L | ||||
|     h = [0]*N | ||||
|     for i in range(L) : | ||||
|         if i in vertices_visited : | ||||
|             h[i*L:i*L+L] = ones | ||||
|  | ||||
|     return h, [len(vertices_visited)-1] | ||||
|  | ||||
|  | ||||
| # Prevents the creation of the same circle (both directions) | ||||
| def prevent_circle(circle_vertices: list, L: int) : | ||||
|  | ||||
|     l1 = [0]*L*L | ||||
|     l2 = [0]*L*L | ||||
|     for i, node in enumerate(circle_vertices[:-1]) : | ||||
|         next = circle_vertices[i+1] | ||||
|  | ||||
|         l1[node*L + next] = 1 | ||||
|         l2[next*L + node] = 1 | ||||
|  | ||||
|     s = circle_vertices[0] | ||||
|     g = circle_vertices[-1] | ||||
|  | ||||
|     l1[g*L + s] = 1 | ||||
|     l2[s*L + g] = 1 | ||||
|  | ||||
|     return np.vstack((l1, l2)), [0, 0] | ||||
|  | ||||
|  | ||||
| # Returns the order of visit as well as any circles if there are some | ||||
| def is_connected(resx) : | ||||
|  | ||||
|     # first round the results to have only 0-1 values | ||||
|     for i, elem in enumerate(resx): | ||||
|         resx[i] = round(elem) | ||||
|      | ||||
|     N = len(resx)               # length of res | ||||
|     L = int(np.sqrt(N))         # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def. | ||||
|     n_edges = resx.sum()        # number of edges | ||||
|  | ||||
|     nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0] | ||||
|  | ||||
|     nonzero_tup = np.unravel_index(nonzeroind, (L,L)) | ||||
|  | ||||
|     ind_a = nonzero_tup[0].tolist() | ||||
|     ind_b = nonzero_tup[1].tolist() | ||||
|  | ||||
|     # Step 1: Create a graph representation | ||||
|     graph = defaultdict(list) | ||||
|     for a, b in zip(ind_a, ind_b): | ||||
|         graph[a].append(b) | ||||
|  | ||||
|     # Step 2: Function to perform BFS/DFS to extract journeys | ||||
|     def get_journey(start): | ||||
|         journey_nodes = [] | ||||
|         visited = set() | ||||
|         stack = deque([start]) | ||||
|  | ||||
|         while stack: | ||||
|             node = stack.pop() | ||||
|             if node not in visited: | ||||
|                 visited.add(node) | ||||
|                 journey_nodes.append(node) | ||||
|                 for neighbor in graph[node]: | ||||
|                     if neighbor not in visited: | ||||
|                         stack.append(neighbor) | ||||
|  | ||||
|         return journey_nodes | ||||
|  | ||||
|     # Step 3: Extract all journeys | ||||
|     all_journeys_nodes = [] | ||||
|     visited_nodes = set() | ||||
|  | ||||
|     for node in ind_a: | ||||
|         if node not in visited_nodes: | ||||
|             journey_nodes = get_journey(node) | ||||
|             all_journeys_nodes.append(journey_nodes) | ||||
|             visited_nodes.update(journey_nodes) | ||||
|  | ||||
|     for l in all_journeys_nodes : | ||||
|         if 0 in l : | ||||
|             order = l | ||||
|             all_journeys_nodes.remove(l) | ||||
|             break | ||||
|  | ||||
|     if len(all_journeys_nodes) == 0 : | ||||
|         return order, None | ||||
|  | ||||
|     return order, all_journeys_nodes | ||||
|  | ||||
|  | ||||
| # Function that returns the time in minutes from one location to another | ||||
| def get_time(p1: Tuple[float, float], p2: Tuple[float, float], detour: float, speed: float) : | ||||
|      | ||||
|     # Compute the straight-line distance in km | ||||
|     if p1 == p2 : | ||||
|         return 0 | ||||
|     else:  | ||||
|         dist = geodesic(p1, p2).kilometers | ||||
|  | ||||
|     # Consider the detour factor for average cityto deterline walking distance (in km) | ||||
|     walk_dist = dist*detour | ||||
|  | ||||
|     # Time to walk this distance (in minutes) | ||||
|     walk_time = walk_dist/speed*60 | ||||
|  | ||||
|     return round(walk_time) | ||||
|  | ||||
|  | ||||
| # Initialize A and c. Compute the distances from all landmarks to each other and store attractiveness | ||||
| # We want to maximize the sightseeing :  max(c) st. A*x < b   and   A_eq*x = b_eq | ||||
| def init_ub_dist(landmarks: List[Landmark], max_steps: int): | ||||
|      | ||||
|     with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : | ||||
|         parameters = json.loads(f.read()) | ||||
|         detour = parameters['detour factor'] | ||||
|         speed = parameters['average walking speed'] | ||||
|      | ||||
|     # Objective function coefficients. a*x1 + b*x2 + c*x3 + ... | ||||
|     c = [] | ||||
|     # Coefficients of inequality constraints (left-hand side) | ||||
|     A_ub = [] | ||||
|  | ||||
|     for spot1 in landmarks : | ||||
|         dist_table = [0]*len(landmarks) | ||||
|         c.append(-spot1.attractiveness) | ||||
|         for j, spot2 in enumerate(landmarks) : | ||||
|             t = get_time(spot1.location, spot2.location, detour, speed) | ||||
|             dist_table[j] = t | ||||
|         closest = sorted(dist_table)[:22] | ||||
|         for i, dist in enumerate(dist_table) : | ||||
|             if dist not in closest : | ||||
|                 dist_table[i] = 32700 | ||||
|         A_ub += dist_table | ||||
|     c = c*len(landmarks) | ||||
|  | ||||
|     return c, A_ub, [max_steps] | ||||
|  | ||||
|  | ||||
| # Constraint to respect only one travel per landmark. Also caps the total number of visited landmarks | ||||
| def respect_number(L: int, max_landmarks: int): | ||||
|  | ||||
|     ones = [1]*L | ||||
|     zeros = [0]*L | ||||
|     A = ones + zeros*(L-1) | ||||
|     b = [1] | ||||
|     for i in range(L-1) : | ||||
|         h_new = zeros*i + ones + zeros*(L-1-i) | ||||
|         A = np.vstack((A, h_new)) | ||||
|         b.append(1) | ||||
|  | ||||
|     if max_landmarks is None : | ||||
|         # Read the parameters from the file | ||||
|         with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : | ||||
|             parameters = json.loads(f.read()) | ||||
|             max_landmarks = parameters['max landmarks'] | ||||
|  | ||||
|     A = np.vstack((A, ones*L)) | ||||
|     b.append(max_landmarks+1) | ||||
|  | ||||
|     return A, b | ||||
|  | ||||
|  | ||||
| # Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements | ||||
| def break_sym(L): | ||||
|  | ||||
|     upper_ind = np.triu_indices(L,0,L) | ||||
|  | ||||
|     up_ind_x = upper_ind[0] | ||||
|     up_ind_y = upper_ind[1] | ||||
|  | ||||
|     A = [0]*L*L         # useless row to prevent overhead ? better solution welcomed | ||||
|     # A[up_ind_x[0]*L + up_ind_y[0]] = 1 | ||||
|     # A[up_ind_y[0]*L + up_ind_x[0]] = 1 | ||||
|     b = [1] | ||||
|  | ||||
|     for i, _ in enumerate(up_ind_x[1:]) : | ||||
|         l = [0]*L*L | ||||
|         if up_ind_x[i] != up_ind_y[i] : | ||||
|             l[up_ind_x[i]*L + up_ind_y[i]] = 1 | ||||
|             l[up_ind_y[i]*L + up_ind_x[i]] = 1 | ||||
|  | ||||
|             A = np.vstack((A,l)) | ||||
|             b.append(1) | ||||
|  | ||||
|     return A, b | ||||
|  | ||||
|  | ||||
| # Constraint to not stay in position. Removes d11, d22, d33, etc. | ||||
| def init_eq_not_stay(L: int):  | ||||
|     l = [0]*L*L | ||||
|  | ||||
|     for i in range(L) : | ||||
|         for j in range(L) : | ||||
|             if j == i : | ||||
|                 l[j + i*L] = 1 | ||||
|      | ||||
|     l = np.array(np.array(l), dtype=np.int8) | ||||
|  | ||||
|     return [l], [0] | ||||
|  | ||||
|  | ||||
| # Go through the landmarks and force the optimizer to use landmarks marked as must_do | ||||
| def respect_user_must_do(landmarks: List[Landmark]) : | ||||
|     L = len(landmarks) | ||||
|  | ||||
|     A = [0]*L*L | ||||
|     b = [0] | ||||
|     for i, elem in enumerate(landmarks[1:]) : | ||||
|         if elem.must_do is True and elem.name not in ['finish', 'start']: | ||||
|             l = [0]*L*L | ||||
|             l[i*L:i*L+L] = [1]*L        # set mandatory departures from landmarks tagged as 'must_do' | ||||
|  | ||||
|             A = np.vstack((A,l)) | ||||
|             b.append(1) | ||||
|  | ||||
|     return A, b | ||||
|  | ||||
|  | ||||
| # Constraint to ensure start at start and finish at goal | ||||
| def respect_start_finish(L: int): | ||||
|     l_start = [1]*L + [0]*L*(L-1)   # sets departures only for start (horizontal ones) | ||||
|     l_start[L-1] = 0                # prevents the jump from start to finish | ||||
|     l_goal = [0]*L*L                # sets arrivals only for finish (vertical ones) | ||||
|     l_L = [0]*L*(L-1) + [1]*L       # prevents arrivals at start and departures from goal | ||||
|     for k in range(L-1) :           # sets only vertical ones for goal (go to) | ||||
|         l_L[k*L] = 1 | ||||
|         if k != 0 : | ||||
|             l_goal[k*L+L-1] = 1  | ||||
|              | ||||
|  | ||||
|     A = np.vstack((l_start, l_goal)) | ||||
|     b = [1, 1] | ||||
|     A = np.vstack((A,l_L)) | ||||
|     b.append(0) | ||||
|  | ||||
|     return A, b | ||||
|  | ||||
|  | ||||
| # Constraint to tie the problem together. Necessary but not sufficient to avoid circles | ||||
| def respect_order(L: int):  | ||||
|  | ||||
|     A = [0]*L*L                     # useless row to reduce overhead ? better solution is welcome | ||||
|     b = [0] | ||||
|     for i in range(L-1) :           # Prevent stacked ones | ||||
|         if i == 0 or i == L-1:      # Don't touch start or finish | ||||
|             continue | ||||
|         else :  | ||||
|             l = [0]*L | ||||
|             l[i] = -1 | ||||
|             l = l*L | ||||
|             for j in range(L) : | ||||
|                 l[i*L + j] = 1 | ||||
|  | ||||
|             A = np.vstack((A,l)) | ||||
|             b.append(0) | ||||
|  | ||||
|     return A, b | ||||
|  | ||||
|  | ||||
| # Computes the time to reach from each landmark to the next | ||||
| def link_list(order: List[int], landmarks: List[Landmark])->List[Landmark] : | ||||
|      | ||||
|     # Read the parameters from the file | ||||
|     with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : | ||||
|         parameters = json.loads(f.read()) | ||||
|         detour_factor = parameters['detour factor'] | ||||
|         speed = parameters['average walking speed'] | ||||
|      | ||||
|     L =  [] | ||||
|     j = 0 | ||||
|     total_dist = 0 | ||||
|     while j < len(order)-1 : | ||||
|         elem = landmarks[order[j]] | ||||
|         next = landmarks[order[j+1]] | ||||
|  | ||||
|         d = get_time(elem.location, next.location, detour_factor, speed) | ||||
|         elem.time_to_reach_next = d | ||||
|         elem.must_do = True | ||||
|         elem.location = (round(elem.location[0], 5), round(elem.location[1], 5)) | ||||
|         L.append(elem) | ||||
|         j += 1 | ||||
|         total_dist += d | ||||
|  | ||||
|     next.location = (round(next.location[0], 5), round(next.location[1], 5))    | ||||
|     L.append(next) | ||||
|      | ||||
|     return L, total_dist | ||||
|  | ||||
|  | ||||
| # Same as link_list but does it on a already ordered list | ||||
| def link_list_simple(ordered_visit: List[Landmark])-> List[Landmark] : | ||||
|      | ||||
|     # Read the parameters from the file | ||||
|     with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : | ||||
|         parameters = json.loads(f.read()) | ||||
|         detour_factor = parameters['detour factor'] | ||||
|         speed = parameters['average walking speed'] | ||||
|  | ||||
|     L =  [] | ||||
|     j = 0 | ||||
|     total_dist = 0 | ||||
|     while j < len(ordered_visit)-1 : | ||||
|         elem = ordered_visit[j] | ||||
|         next = ordered_visit[j+1] | ||||
|  | ||||
|         elem.next_uuid = next.uuid | ||||
|         d = get_time(elem.location, next.location, detour_factor, speed) | ||||
|         elem.time_to_reach_next = d | ||||
|         if elem.name not in ['start', 'finish'] : | ||||
|             elem.must_do = True | ||||
|         L.append(elem) | ||||
|         j += 1 | ||||
|         total_dist += d | ||||
|  | ||||
|     L.append(next) | ||||
|      | ||||
|     return L, total_dist | ||||
|  | ||||
|  | ||||
| # Main optimization pipeline | ||||
| def solve_optimization (landmarks :List[Landmark], max_steps: int, printing_details: bool, max_landmarks = None) : | ||||
|  | ||||
|     L = len(landmarks) | ||||
|  | ||||
|     # SET CONSTRAINTS FOR INEQUALITY | ||||
|     c, A_ub, b_ub = init_ub_dist(landmarks, max_steps)          # Add the distances from each landmark to the other | ||||
|     A, b = respect_number(L, max_landmarks)                     # Respect max number of visits (no more possible stops than landmarks).  | ||||
|     A_ub = np.vstack((A_ub, A), dtype=np.int16) | ||||
|     b_ub += b | ||||
|     A, b = break_sym(L)                                         # break the 'zig-zag' symmetry | ||||
|     A_ub = np.vstack((A_ub, A), dtype=np.int16) | ||||
|     b_ub += b | ||||
|  | ||||
|  | ||||
|     # SET CONSTRAINTS FOR EQUALITY | ||||
|     A_eq, b_eq = init_eq_not_stay(L)                            # Force solution not to stay in same place | ||||
|     A, b = respect_user_must_do(landmarks)                      # Check if there are user_defined must_see. Also takes care of start/goal | ||||
|     A_eq = np.vstack((A_eq, A), dtype=np.int8) | ||||
|     b_eq += b | ||||
|     A, b = respect_start_finish(L)                  # Force start and finish positions | ||||
|     A_eq = np.vstack((A_eq, A), dtype=np.int8) | ||||
|     b_eq += b | ||||
|     A, b = respect_order(L)                         # Respect order of visit (only works when max_steps is limiting factor) | ||||
|     A_eq = np.vstack((A_eq, A), dtype=np.int8) | ||||
|     b_eq += b | ||||
|      | ||||
|     # SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1) | ||||
|     x_bounds = [(0, 1)]*L*L | ||||
|  | ||||
|     # Solve linear programming problem | ||||
|     res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) | ||||
|  | ||||
|     # Raise error if no solution is found | ||||
|     if not res.success : | ||||
|             raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos") | ||||
|  | ||||
|     # If there is a solution, we're good to go, just check for connectiveness | ||||
|     else : | ||||
|         order, circles = is_connected(res.x) | ||||
|         #nodes, edges = is_connected(res.x) | ||||
|         i = 0 | ||||
|         timeout = 80 | ||||
|         while circles is not None and i < timeout: | ||||
|             A, b = prevent_config(res.x) | ||||
|             A_ub = np.vstack((A_ub, A)) | ||||
|             b_ub += b | ||||
|             #A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub) | ||||
|             for circle in circles : | ||||
|                 A, b = prevent_circle(circle, len(landmarks)) | ||||
|                 A_eq = np.vstack((A_eq, A)) | ||||
|                 b_eq += b | ||||
|             res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3) | ||||
|             if not res.success : | ||||
|                 print("Solving failed because of overconstrained problem") | ||||
|                 return None | ||||
|             order, circles = is_connected(res.x) | ||||
|             #nodes, edges = is_connected(res.x) | ||||
|             if circles is None : | ||||
|                 break | ||||
|             print(i) | ||||
|             i += 1 | ||||
|          | ||||
|         if i == timeout : | ||||
|             raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.") | ||||
|  | ||||
|         # Add the times to reach and stop optimizing | ||||
|         L, _ = link_list(order, landmarks) | ||||
|              | ||||
|         if printing_details is True : | ||||
|             if i != 0 : | ||||
|                 print(f"Neded to recompute paths {i} times because of unconnected loops...") | ||||
|             print_res(L) | ||||
|             print("\nTotal score : " + str(int(-res.fun))) | ||||
|  | ||||
|         return L | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| @@ -1,6 +1,6 @@ | ||||
| city_bbox_side: 5000 #m | ||||
| radius_close_to: 30 | ||||
| church_coeff: 0.6 | ||||
| park_coeff: 1.5 | ||||
| tag_coeff: 100 | ||||
| radius_close_to: 50 | ||||
| church_coeff: 0.8 | ||||
| park_coeff: 1.2 | ||||
| tag_coeff: 10 | ||||
| N_important: 40 | ||||
|   | ||||
| @@ -1,45 +1,110 @@ | ||||
| import os, json | ||||
| import yaml, logging | ||||
|  | ||||
| from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull | ||||
| from typing import List, Tuple | ||||
| from math import pi | ||||
|  | ||||
| from structs.landmarks import Landmark | ||||
| from landmarks_manager import take_most_important | ||||
| from optimizer_v4 import solve_optimization, link_list_simple, print_res, get_time | ||||
| from optimizer import Optimizer | ||||
| from utils import get_time, link_list_simple, take_most_important | ||||
| import constants | ||||
|  | ||||
|  | ||||
| # Create corridor from tour | ||||
| def create_corridor(landmarks: List[Landmark], width: float) : | ||||
|  | ||||
| class Refiner : | ||||
|  | ||||
|     logger = logging.getLogger(__name__) | ||||
|  | ||||
|     all_landmarks: List[Landmark]   # list of all landmarks | ||||
|     base_tour: List[Landmark]       # base tour that needs to be refined | ||||
|     max_time: int = None            # max visit time (in minutes) | ||||
|     detour: int = None              # accepted max detour time (in minutes) | ||||
|     detour_factor: float            # detour factor of straight line vs real distance in cities | ||||
|     average_walking_speed: float    # average walking speed of adult | ||||
|     max_landmarks: int              # max number of landmarks to visit | ||||
|  | ||||
|  | ||||
|     def __init__(self, max_time: int, detour: int, all_landmarks: List[Landmark], base_tour: List[Landmark]) : | ||||
|         self.max_time = max_time | ||||
|         self.detour = detour | ||||
|         self.all_landmarks = all_landmarks | ||||
|         self.base_tour = base_tour | ||||
|  | ||||
|         # load parameters from file | ||||
|         with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f: | ||||
|             parameters = yaml.safe_load(f) | ||||
|             self.detour_factor = parameters['detour_factor'] | ||||
|             self.average_walking_speed = parameters['average_walking_speed'] | ||||
|             self.max_landmarks = parameters['max_landmarks'] + 4 | ||||
|  | ||||
|  | ||||
|     def create_corridor(self, landmarks: List[Landmark], width: float) : | ||||
|         """ | ||||
|         Create a corridor around the path connecting the landmarks. | ||||
|  | ||||
|         Args: | ||||
|             landmarks (List[Landmark]): the landmark path around which to create the corridor | ||||
|             width (float): Width of the corridor in meters. | ||||
|  | ||||
|         Returns: | ||||
|             Geometry: A buffered geometry object representing the corridor around the path. | ||||
|         """ | ||||
|  | ||||
|         corrected_width = (180*width)/(6371000*pi) | ||||
|          | ||||
|   path = create_linestring(landmarks) | ||||
|         path = self.create_linestring(landmarks) | ||||
|         obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2) | ||||
|  | ||||
|         return obj | ||||
|  | ||||
|  | ||||
| # Create linestring from tour | ||||
| def create_linestring(landmarks: List[Landmark])->List[Point] : | ||||
|     def create_linestring(self, tour: List[Landmark])->LineString : | ||||
|         """ | ||||
|         Create a `LineString` object from a tour. | ||||
|  | ||||
|         Args: | ||||
|             tour (List[Landmark]): An ordered sequence of landmarks that represents the visiting order. | ||||
|  | ||||
|         Returns: | ||||
|             LineString: A `LineString` object representing the path through the landmarks. | ||||
|         """ | ||||
|  | ||||
|         points = [] | ||||
|  | ||||
|   for landmark in landmarks : | ||||
|         for landmark in tour : | ||||
|             points.append(Point(landmark.location)) | ||||
|  | ||||
|         return LineString(points) | ||||
|  | ||||
|  | ||||
|     # Check if some coordinates are in area. Used for the corridor  | ||||
| def is_in_area(area: Polygon, coordinates) -> bool : | ||||
|     def is_in_area(self, area: Polygon, coordinates) -> bool : | ||||
|         """ | ||||
|         Check if a given point is within a specified area. | ||||
|  | ||||
|         Args: | ||||
|             area (Polygon): The polygon defining the area. | ||||
|             coordinates (Tuple[float, float]): The coordinates of the point to check. | ||||
|  | ||||
|         Returns: | ||||
|             bool: True if the point is within the area, otherwise False. | ||||
|         """ | ||||
|         point = Point(coordinates) | ||||
|         return point.within(area) | ||||
|  | ||||
|  | ||||
|     # Function to determine if two landmarks are close to each other  | ||||
| def is_close_to(location1: Tuple[float], location2: Tuple[float]): | ||||
|     """Determine if two locations are close by rounding their coordinates to 3 decimals.""" | ||||
|     def is_close_to(self, location1: Tuple[float], location2: Tuple[float]): | ||||
|         """ | ||||
|         Determine if two locations are close to each other by rounding their coordinates to 3 decimal places. | ||||
|  | ||||
|         Args: | ||||
|             location1 (Tuple[float, float]): The coordinates of the first location. | ||||
|             location2 (Tuple[float, float]): The coordinates of the second location. | ||||
|  | ||||
|         Returns: | ||||
|             bool: True if the locations are within 0.001 degrees of each other, otherwise False. | ||||
|         """ | ||||
|  | ||||
|         absx = abs(location1[0] - location2[0]) | ||||
|         absy = abs(location1[1] - location2[1]) | ||||
|  | ||||
| @@ -47,36 +112,55 @@ def is_close_to(location1: Tuple[float], location2: Tuple[float]): | ||||
|         #return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3)) | ||||
|  | ||||
|  | ||||
| # Rearrange some landmarks in the order of visit to group visit  | ||||
| def rearrange(landmarks: List[Landmark]) -> List[Landmark]: | ||||
|     def rearrange(self, tour: List[Landmark]) -> List[Landmark]: | ||||
|         """ | ||||
|         Rearrange landmarks to group nearby visits together. | ||||
|  | ||||
|         This function reorders landmarks so that nearby landmarks are adjacent to each other in the list, | ||||
|         while keeping 'start' and 'finish' landmarks in their original positions. | ||||
|  | ||||
|         Args: | ||||
|             tour (List[Landmark]): Ordered list of landmarks to be rearranged. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]: The rearranged list of landmarks with grouped nearby visits. | ||||
|         """ | ||||
|      | ||||
|         i = 1 | ||||
|   while i < len(landmarks): | ||||
|         while i < len(tour): | ||||
|             j = i+1 | ||||
|     while j < len(landmarks): | ||||
|       if is_close_to(landmarks[i].location, landmarks[j].location) and landmarks[i].name not in ['start', 'finish'] and landmarks[j].name not in ['start', 'finish']: | ||||
|             while j < len(tour): | ||||
|                 if self.is_close_to(tour[i].location, tour[j].location) and tour[i].name not in ['start', 'finish'] and tour[j].name not in ['start', 'finish']: | ||||
|                     # If they are not adjacent, move the j-th element to be adjacent to the i-th element | ||||
|                     if j != i + 1: | ||||
|           landmarks.insert(i + 1, landmarks.pop(j)) | ||||
|                         tour.insert(i + 1, tour.pop(j)) | ||||
|                         break  # Move to the next i-th element after rearrangement | ||||
|             j += 1 | ||||
|             i += 1 | ||||
|      | ||||
|   return landmarks | ||||
|         return tour | ||||
|  | ||||
|  | ||||
| # Simple nearest neighbour planner to try to fix the path | ||||
| def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> Tuple[List[Landmark], Polygon]: | ||||
|     def find_shortest_path_through_all_landmarks(self, landmarks: List[Landmark]) -> Tuple[List[Landmark], Polygon]: | ||||
|         """ | ||||
|         Find the shortest path through all landmarks using a nearest neighbor heuristic. | ||||
|  | ||||
|   # Read from data | ||||
|   with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : | ||||
|     parameters = json.loads(f.read()) | ||||
|     detour = parameters['detour factor'] | ||||
|     speed = parameters['average walking speed'] | ||||
|         This function constructs a path that starts from the 'start' landmark, visits all other landmarks in the order | ||||
|         of their proximity, and ends at the 'finish' landmark. It returns both the ordered list of landmarks and a  | ||||
|         polygon representing the path. | ||||
|  | ||||
|         Args: | ||||
|             landmarks (List[Landmark]): List of all landmarks including 'start' and 'finish'. | ||||
|  | ||||
|         Returns: | ||||
|             Tuple[List[Landmark], Polygon]: A tuple where the first element is the list of landmarks in the order they  | ||||
|                                              should be visited, and the second element is a `Polygon` representing  | ||||
|                                              the path connecting all landmarks. | ||||
|         """ | ||||
|              | ||||
|         # Step 1: Find 'start' and 'finish' landmarks | ||||
|   start_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'start') | ||||
|   finish_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'finish') | ||||
|         start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start') | ||||
|         finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish') | ||||
|          | ||||
|         start_landmark = landmarks[start_idx] | ||||
|         finish_landmark = landmarks[finish_idx] | ||||
| @@ -93,7 +177,7 @@ def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> Tuple | ||||
|          | ||||
|         # Step 4: Use nearest neighbor heuristic to visit all landmarks | ||||
|         while unvisited_landmarks: | ||||
|     nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location, detour, speed)) | ||||
|             nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location)) | ||||
|             path.append(nearest_landmark) | ||||
|             coordinates.append(nearest_landmark.location) | ||||
|             current_landmark = nearest_landmark | ||||
| @@ -109,24 +193,51 @@ def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> Tuple | ||||
|  | ||||
|  | ||||
|     # Returns a list of minor landmarks around the planned path to enhance experience | ||||
| def get_minor_landmarks(all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] : | ||||
|     def get_minor_landmarks(self, all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] : | ||||
|         """ | ||||
|         Identify landmarks within a specified corridor that have not been visited yet. | ||||
|  | ||||
|         This function creates a corridor around the path defined by visited landmarks and then finds landmarks that fall | ||||
|         within this corridor. It returns a list of these landmarks, excluding those already visited, sorted by their importance. | ||||
|  | ||||
|         Args: | ||||
|             all_landmarks (List[Landmark]): List of all available landmarks. | ||||
|             visited_landmarks (List[Landmark]): List of landmarks that have already been visited. | ||||
|             width (float): Width of the corridor around the visited landmarks. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]: List of important landmarks within the corridor that have not been visited yet. | ||||
|         """ | ||||
|  | ||||
|         second_order_landmarks = [] | ||||
|         visited_names = [] | ||||
|   area = create_corridor(visited_landmarks, width) | ||||
|         area = self.create_corridor(visited_landmarks, width) | ||||
|  | ||||
|         for visited in visited_landmarks : | ||||
|             visited_names.append(visited.name) | ||||
|          | ||||
|         for landmark in all_landmarks : | ||||
|     if is_in_area(area, landmark.location) and landmark.name not in visited_names: | ||||
|             if self.is_in_area(area, landmark.location) and landmark.name not in visited_names: | ||||
|                 second_order_landmarks.append(landmark) | ||||
|  | ||||
|         return take_most_important(second_order_landmarks, len(visited_landmarks)) | ||||
|  | ||||
|  | ||||
|     # Try fix the shortest path using shapely | ||||
| def fix_using_polygon(tour: List[Landmark])-> List[Landmark] : | ||||
|     def fix_using_polygon(self, tour: List[Landmark])-> List[Landmark] : | ||||
|         """ | ||||
|         Improve the tour path using geometric methods to ensure it follows a more optimal shape. | ||||
|  | ||||
|         This function creates a polygon from the given tour and attempts to refine it using a concave hull. It reorders | ||||
|         the landmarks to fit within this refined polygon and adjusts the tour to ensure the 'start' landmark is at the | ||||
|         beginning. It also checks if the final polygon is simple and rearranges the tour if necessary. | ||||
|  | ||||
|         Args: | ||||
|             tour (List[Landmark]): List of landmarks representing the current tour path. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]: Refined list of landmarks in the order of visit to produce a better tour path. | ||||
|         """ | ||||
|  | ||||
|         coords = [] | ||||
|         coords_dict = {} | ||||
| @@ -173,18 +284,24 @@ def fix_using_polygon(tour: List[Landmark])-> List[Landmark] : | ||||
|  | ||||
|         # Rearrange only if polygon still not simple | ||||
|         if not better_tour_poly.is_simple : | ||||
|     better_tour = rearrange(better_tour) | ||||
|             better_tour = self.rearrange(better_tour) | ||||
|          | ||||
|         return better_tour | ||||
|  | ||||
|  | ||||
|     # Second stage of the optimization. Use linear programming again to refine the path | ||||
| def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, detour: int, print_infos: bool) -> List[Landmark] : | ||||
|     def refine_optimization(self) -> List[Landmark] : | ||||
|         """ | ||||
|         Refine the initial tour path by considering additional minor landmarks and optimizing the path. | ||||
|  | ||||
|   # Read from the file | ||||
|   with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : | ||||
|     parameters = json.loads(f.read()) | ||||
|     max_landmarks = parameters['max landmarks'] + 4 | ||||
|         This method evaluates the need for further optimization based on the initial tour. If a detour is required, it adds | ||||
|         minor landmarks around the initial predicted path and solves a new optimization problem to find a potentially better | ||||
|         tour. It then links the new tour and adjusts it using a nearest neighbor heuristic and polygon-based methods to | ||||
|         ensure a valid path. The final tour is chosen based on the shortest distance. | ||||
|  | ||||
|         Returns: | ||||
|             List[Landmark]: The refined list of landmarks representing the optimized tour path. | ||||
|         """ | ||||
|  | ||||
|         # No need to refine if no detour is taken | ||||
|         # if detour == 0 : | ||||
| @@ -192,18 +309,20 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma | ||||
|             new_tour = base_tour | ||||
|          | ||||
|         else : | ||||
|     minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200) | ||||
|             minor_landmarks = self.get_minor_landmarks(self.all_landmarks, self.base_tour, 200) | ||||
|  | ||||
|     if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path") | ||||
|             self.logger.info(f"Using {len(minor_landmarks)} minor landmarks around the predicted path") | ||||
|  | ||||
|             # full set of visitable landmarks | ||||
|     full_set = base_tour[:-1] + minor_landmarks   # create full set of possible landmarks (without finish) | ||||
|     full_set.append(base_tour[-1])                # add finish back | ||||
|             full_set = self.base_tour[:-1] + minor_landmarks   # create full set of possible landmarks (without finish) | ||||
|             full_set.append(self.base_tour[-1])                # add finish back | ||||
|  | ||||
|             # get a new tour | ||||
|     new_tour = solve_optimization(full_set, max_time+detour, False, max_landmarks) | ||||
|             optimizer = Optimizer(self.max_time + self.detour, full_set) | ||||
|             new_tour = optimizer.solve_optimization() | ||||
|  | ||||
|             if new_tour is None : | ||||
|       new_tour = base_tour | ||||
|                 new_tour = self.base_tour | ||||
|  | ||||
|         # Link the new tour | ||||
|         new_tour, new_dist = link_list_simple(new_tour) | ||||
| @@ -213,11 +332,11 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma | ||||
|             return new_tour | ||||
|          | ||||
|         # Find shortest path using the nearest neighbor heuristic | ||||
|   better_tour, better_poly = find_shortest_path_through_all_landmarks(new_tour) | ||||
|         better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour) | ||||
|  | ||||
|         # Fix the tour using Polygons if the path looks weird | ||||
|   if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid : | ||||
|     better_tour = fix_using_polygon(better_tour) | ||||
|         if self.base_tour[0].location == self.base_tour[-1].location and not better_poly.is_valid : | ||||
|             better_tour = self.fix_using_polygon(better_tour) | ||||
|          | ||||
|         # Link the tour again | ||||
|         better_tour, better_dist = link_list_simple(better_tour) | ||||
| @@ -228,16 +347,7 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma | ||||
|         else : | ||||
|             final_tour = better_tour | ||||
|  | ||||
|   if print_infos : | ||||
|     print("\n\n\nRefined tour (result of second stage optimization): ") | ||||
|     print_res(final_tour) | ||||
|     total_score = 0 | ||||
|     for elem in final_tour :  | ||||
|       total_score += elem.attractiveness | ||||
|  | ||||
|     print("\nTotal score : " + str(total_score)) | ||||
|  | ||||
|  | ||||
|         self.logger.info("Refined tour (result of second stage optimization): ") | ||||
|          | ||||
|         return final_tour | ||||
|  | ||||
|   | ||||
| @@ -8,7 +8,7 @@ class Landmark(BaseModel) : | ||||
|      | ||||
|     # Properties of the landmark | ||||
|     name : str | ||||
|     type: Literal['sightseeing', 'nature', 'shopping'] | ||||
|     type: Literal['sightseeing', 'nature', 'shopping', 'start', 'finish'] | ||||
|     location : tuple | ||||
|     osm_type : str | ||||
|     osm_id : int | ||||
| @@ -22,8 +22,8 @@ class Landmark(BaseModel) : | ||||
|     uuid: str = Field(default_factory=uuid4)                    # TODO implement this ASAP | ||||
|      | ||||
|     # Additional properties depending on specific tour | ||||
|     must_do : bool | ||||
|     must_avoid : bool | ||||
|     must_do : Optional[bool] = False | ||||
|     must_avoid : Optional[bool] = False | ||||
|     is_secondary : Optional[bool] = False                       # TODO future    | ||||
|      | ||||
|     time_to_reach_next : Optional[int] = 0                      # TODO fix this in existing code | ||||
|   | ||||
| @@ -3,7 +3,7 @@ from typing import Optional, Literal | ||||
|  | ||||
| class Preference(BaseModel) : | ||||
|     name: str | ||||
|     type: Literal['sightseeing', 'nature', 'shopping'] | ||||
|     type: Literal['sightseeing', 'nature', 'shopping', 'start', 'finish'] | ||||
|     score: int          # score could be from 1 to 5 | ||||
|  | ||||
| # Input for optimization | ||||
|   | ||||
| @@ -4,10 +4,9 @@ from typing import List | ||||
| from landmarks_manager import LandmarkManager | ||||
| from fastapi.encoders import jsonable_encoder | ||||
|  | ||||
| from optimizer_v4 import solve_optimization | ||||
| from refiner import refine_optimization | ||||
| from optimizer import Optimizer | ||||
| from refiner import Refiner | ||||
| from structs.landmarks import Landmark | ||||
| from structs.landmarktype import LandmarkType | ||||
| from structs.preferences import Preferences, Preference | ||||
|  | ||||
|  | ||||
| @@ -24,58 +23,66 @@ def write_data(L: List[Landmark], file_name: str): | ||||
|     data.to_json(file_name, indent = 2, force_ascii=False) | ||||
|  | ||||
|  | ||||
| def test4(coordinates: tuple[float, float]) -> List[Landmark]: | ||||
| def test(start_coords: tuple[float, float], finish_coords: tuple[float, float] = None) -> List[Landmark]: | ||||
|  | ||||
|      | ||||
|     manager = LandmarkManager() | ||||
|  | ||||
|     preferences = Preferences( | ||||
|                     sightseeing=Preference( | ||||
|                                   name='sightseeing',  | ||||
|                                   type=LandmarkType(landmark_type='sightseeing'), | ||||
|                                   type='sightseeing', | ||||
|                                   score = 5), | ||||
|                     nature=Preference( | ||||
|                                   name='nature',  | ||||
|                                   type=LandmarkType(landmark_type='nature'), | ||||
|                                   type='nature', | ||||
|                                   score = 0), | ||||
|                     shopping=Preference( | ||||
|                                   name='shopping',  | ||||
|                                   type=LandmarkType(landmark_type='shopping'), | ||||
|                                   score = 0)) | ||||
|                                   type='shopping', | ||||
|                                   score = 0), | ||||
|  | ||||
|                     max_time_minute=180, | ||||
|                     detour_tolerance_minute=0 | ||||
|                     ) | ||||
|  | ||||
|     # Create start and finish  | ||||
|     start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=coordinates, osm_type='start', osm_id=0, attractiveness=0, must_do=False, n_tags = 0) | ||||
|     finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=coordinates, osm_type='finish', osm_id=0, attractiveness=0, must_do=False, n_tags = 0) | ||||
|     if finish_coords is None : | ||||
|         finish_coords = start_coords | ||||
|     start = Landmark(name='start', type='start', location=start_coords, osm_type='', osm_id=0, attractiveness=0, n_tags = 0) | ||||
|     finish = Landmark(name='finish', type='start', location=finish_coords, osm_type='', osm_id=0, attractiveness=0, n_tags = 0) | ||||
|     #finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.8777055, 2.3640967), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) | ||||
|     #start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=(48.847132, 2.312359), osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) | ||||
|     #finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.843185, 2.344533), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) | ||||
|     #finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.847132, 2.312359), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0) | ||||
|      | ||||
|  | ||||
|     manager = LandmarkManager(preferences=preferences, coordinates=start_coords) | ||||
|  | ||||
|     # Generate the landmarks from the start location | ||||
|     landmarks, landmarks_short = generate_landmarks(preferences=preferences, coordinates=start.location) | ||||
|     landmarks, landmarks_short = manager.generate_landmarks_list() | ||||
|      | ||||
|     # Store data to file for debug purposes | ||||
|     #write_data(landmarks, "landmarks_Wien.txt") | ||||
|  | ||||
|     # Insert start and finish to the landmarks list | ||||
|     landmarks_short.insert(0, start) | ||||
|     landmarks_short.append(finish) | ||||
|  | ||||
|     max_walking_time = 180      # minutes | ||||
|     detour = 0                 # minutes | ||||
|  | ||||
|     # First stage optimization | ||||
|     base_tour = solve_optimization(landmarks_short, max_walking_time, True) | ||||
|     optimizer = Optimizer(max_time=preferences.max_time_minute, landmarks=landmarks_short) | ||||
|     base_tour = optimizer.solve_optimization() | ||||
|  | ||||
|     # Second stage using linear optimization | ||||
|      | ||||
|     refined_tour = refine_optimization(landmarks, base_tour, max_walking_time, detour, True) | ||||
|     refiner = Refiner(max_time = preferences.max_time_minute, detour = preferences.detour_tolerance_minute, all_landmarks=landmarks, base_tour=base_tour) | ||||
|     refined_tour = refiner.refine_optimization() | ||||
|  | ||||
|  | ||||
|     return refined_tour | ||||
|  | ||||
|  | ||||
| #test4(tuple((48.8344400, 2.3220540)))       # Café Chez César  | ||||
| #test4(tuple((48.8375946, 2.2949904)))       # Point random | ||||
| #test4(tuple((47.377859, 8.540585)))         # Zurich HB | ||||
| #test4(tuple((45.7576485, 4.8330241)))      # Lyon Bellecour | ||||
| test4(tuple((48.5848435, 7.7332974)))      # Strasbourg Gare | ||||
| #test4(tuple((48.2067858, 16.3692340)))      # Vienne | ||||
| #test(tuple((48.8344400, 2.3220540)))       # Café Chez César  | ||||
| #test(tuple((48.8375946, 2.2949904)))       # Point random | ||||
| #test(tuple((47.377859, 8.540585)))         # Zurich HB | ||||
| #test(tuple((45.7576485, 4.8330241)))      # Lyon Bellecour | ||||
| test(tuple((48.5848435, 7.7332974)))      # Strasbourg Gare | ||||
| #test(tuple((48.2067858, 16.3692340)))      # Vienne | ||||
|   | ||||
							
								
								
									
										106
									
								
								backend/src/utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										106
									
								
								backend/src/utils.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,106 @@ | ||||
| import yaml | ||||
| from typing import List, Tuple | ||||
| from geopy.distance import geodesic | ||||
|  | ||||
| from structs.landmarks import Landmark | ||||
| import constants | ||||
|  | ||||
| def get_time(p1: Tuple[float, float], p2: Tuple[float, float]) -> int : | ||||
|     """ | ||||
|     Calculate the time in minutes to travel from one location to another. | ||||
|  | ||||
|     Args: | ||||
|         p1 (Tuple[float, float]): Coordinates of the starting location. | ||||
|         p2 (Tuple[float, float]): Coordinates of the destination. | ||||
|         detour (float): Detour factor affecting the distance. | ||||
|         speed (float): Walking speed in kilometers per hour. | ||||
|  | ||||
|     Returns: | ||||
|         int: Time to travel from p1 to p2 in minutes. | ||||
|     """ | ||||
|  | ||||
|     with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f: | ||||
|         parameters = yaml.safe_load(f) | ||||
|         detour_factor = parameters['detour_factor'] | ||||
|         average_walking_speed = parameters['average_walking_speed'] | ||||
|  | ||||
|     # Compute the straight-line distance in km | ||||
|     if p1 == p2 : | ||||
|         return 0 | ||||
|     else:  | ||||
|         dist = geodesic(p1, p2).kilometers | ||||
|  | ||||
|     # Consider the detour factor for average cityto deterline walking distance (in km) | ||||
|     walk_dist = dist*detour_factor | ||||
|  | ||||
|     # Time to walk this distance (in minutes) | ||||
|     walk_time = walk_dist/average_walking_speed*60 | ||||
|  | ||||
|     return round(walk_time) | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| # Same as link_list but does it on a already ordered list | ||||
| def link_list_simple(ordered_visit: List[Landmark])-> List[Landmark] : | ||||
|  | ||||
|     L =  [] | ||||
|     j = 0 | ||||
|     total_dist = 0 | ||||
|     while j < len(ordered_visit)-1 : | ||||
|         elem = ordered_visit[j] | ||||
|         next = ordered_visit[j+1] | ||||
|  | ||||
|         elem.next_uuid = next.uuid | ||||
|         d = get_time(elem.location, next.location) | ||||
|         elem.time_to_reach_next = d | ||||
|         if elem.name not in ['start', 'finish'] : | ||||
|             elem.must_do = True | ||||
|         L.append(elem) | ||||
|         j += 1 | ||||
|         total_dist += d | ||||
|  | ||||
|     L.append(next) | ||||
|      | ||||
|     return L, total_dist | ||||
|  | ||||
|  | ||||
|  | ||||
| # Take the most important landmarks from the list | ||||
| def take_most_important(landmarks: List[Landmark], N_important) -> List[Landmark] : | ||||
|  | ||||
|     L = len(landmarks) | ||||
|     L_copy = [] | ||||
|     L_clean = [] | ||||
|     scores = [0]*len(landmarks) | ||||
|     names = [] | ||||
|     name_id = {} | ||||
|  | ||||
|     for i, elem in enumerate(landmarks) : | ||||
|         if elem.name not in names : | ||||
|             names.append(elem.name) | ||||
|             name_id[elem.name] = [i] | ||||
|             L_copy.append(elem) | ||||
|         else : | ||||
|             name_id[elem.name] += [i] | ||||
|             scores = [] | ||||
|             for j in name_id[elem.name] : | ||||
|                 scores.append(L[j].attractiveness) | ||||
|             best_id = max(range(len(scores)), key=scores.__getitem__) | ||||
|             t = name_id[elem.name][best_id] | ||||
|             if t == i : | ||||
|                 for old in L_copy : | ||||
|                     if old.name == elem.name : | ||||
|                         old.attractiveness = L[t].attractiveness | ||||
|      | ||||
|     scores = [0]*len(L_copy) | ||||
|     for i, elem in enumerate(L_copy) : | ||||
|         scores[i] = elem.attractiveness | ||||
|  | ||||
|     res = sorted(range(len(scores)), key = lambda sub: scores[sub])[-(N_important-L):] | ||||
|  | ||||
|     for i, elem in enumerate(L_copy) : | ||||
|         if i in res : | ||||
|             L_clean.append(elem) | ||||
|  | ||||
|     return L_clean | ||||
		Reference in New Issue
	
	Block a user