added some ideas

This commit is contained in:
Helldragon67 2024-07-07 10:17:50 +02:00
parent 006b80018a
commit e71c92da40
12 changed files with 4247 additions and 355 deletions

View File

@ -6,3 +6,5 @@ historic
'amenity'='place_of_worship' 'amenity'='place_of_worship'
'amenity'='fountain' 'amenity'='fountain'
'water'='reflecting_pool' 'water'='reflecting_pool'
# 'tourism'='attraction' might be a bit too broad
# historic as well

View File

@ -43,31 +43,6 @@ def generate_landmarks(preferences: Preferences, coordinates: Tuple[float, float
return L, take_most_important(L) return L, take_most_important(L)
"""def generate_landmarks(preferences: Preferences, city_country: str = None, coordinates: Tuple[float, float] = None) -> Tuple[List[Landmark], List[Landmark]] :
l_sights, l_nature, l_shop = get_amenities()
L = []
# List for sightseeing
if preferences.sightseeing.score != 0 :
L1 = get_landmarks(l_sights, SIGHTSEEING, city_country=city_country, coordinates=coordinates)
correct_score(L1, preferences.sightseeing)
L += L1
# List for nature
if preferences.nature.score != 0 :
L2 = get_landmarks(l_nature, NATURE, city_country=city_country, coordinates=coordinates)
correct_score(L2, preferences.nature)
L += L2
# List for shopping
if preferences.shopping.score != 0 :
L3 = get_landmarks(l_shop, SHOPPING, city_country=city_country, coordinates=coordinates)
correct_score(L3, preferences.shopping)
L += L3
return remove_duplicates(L), take_most_important(L)
"""
# Helper function to gather the amenities list # Helper function to gather the amenities list
def get_amenities() -> List[List[str]] : def get_amenities() -> List[List[str]] :
@ -87,6 +62,7 @@ def get_list(path: str) -> List[str] :
amenities = [] amenities = []
for line in content : for line in content :
if not line.startswith('#') :
amenities.append(line.strip('\n')) amenities.append(line.strip('\n'))
return amenities return amenities
@ -173,7 +149,7 @@ def correct_score(L: List[Landmark], preference: Preference) :
raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {L[0].name}") raise TypeError(f"LandmarkType {preference.type} does not match the type of Landmark {L[0].name}")
for elem in L : for elem in L :
elem.attractiveness = int(elem.attractiveness*preference.score/500) # arbitrary computation elem.attractiveness = int(elem.attractiveness*preference.score/5) # arbitrary computation
# Function to count elements within a certain radius of a location # Function to count elements within a certain radius of a location
@ -192,10 +168,14 @@ def count_elements_within_radius(coordinates: Tuple[float, float], radius: int)
try : try :
overpass = Overpass() overpass = Overpass()
radius_result = overpass.query(radius_query) radius_result = overpass.query(radius_query)
return radius_result.countElements() N_elem = radius_result.countWays() + radius_result.countRelations()
#print(f"There are {N_elem} ways/relations within 50m")
if N_elem is None :
return 0
return N_elem
except : except :
return None return 0
# Creates a bounding box around given coordinates # Creates a bounding box around given coordinates
@ -267,99 +247,39 @@ def get_landmarks(list_amenity: list, landmarktype: LandmarkType, coordinates: T
elem_type = landmarktype # Add the landmark type as 'sightseeing elem_type = landmarktype # Add the landmark type as 'sightseeing
n_tags = len(elem.tags().keys()) # Add number of tags n_tags = len(elem.tags().keys()) # Add number of tags
# remove specific tags
skip = False
for tag in elem.tags().keys() :
if "pay" in tag :
n_tags -= 1 # discard payment options for tags
if "disused" in tag :
skip = True
break
if amenity not in ["'shop'='department_store'", "'shop'='mall'"] :
if "shop" in tag :
skip = True
break
if tag == "building" and elem.tag('building') in ['retail', 'supermarket', 'parking']:
skip = True
break
if skip:
continue
# Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT # Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT
if amenity == "'amenity'='place_of_worship'" : if amenity == "'amenity'='place_of_worship'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*church_coeff) score = int((count_elements_within_radius(location, radius) + (n_tags*tag_coeff) )*church_coeff)
elif amenity == "'leisure'='park'" : elif amenity == "'leisure'='park'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*park_coeff) score = int((count_elements_within_radius(location, radius) + (n_tags*tag_coeff) )*park_coeff)
else : else :
score = count_elements_within_radius(location, radius) + n_tags*tag_coeff score = count_elements_within_radius(location, radius) + (n_tags*tag_coeff)
if score is not None : if score is not None :
# Generate the landmark and append it to the list # Generate the landmark and append it to the list
#print(f"There are {n_tags} tags on this Landmark. Total score : {score}\n")
landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=n_tags) landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=n_tags)
L.append(landmark) L.append(landmark)
return L return L
"""def get_landmarks(list_amenity: list, landmarktype: LandmarkType, city_country: str = None, coordinates: Tuple[float, float] = None) -> List[Landmark] :
if city_country is None and coordinates is None :
raise ValueError("Either one of 'city_country' and 'coordinates' arguments must be specified")
if city_country is not None and coordinates is not None :
raise ValueError("Cannot specify both 'city_country' and 'coordinates' at the same time, please choose either one")
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/landmarks_manager.params', "r") as f :
parameters = json.loads(f.read())
tag_coeff = parameters['tag coeff']
park_coeff = parameters['park coeff']
church_coeff = parameters['church coeff']
radius = parameters['radius close to']
bbox_side = parameters['city bbox side']
# If city_country is specified :
if city_country is not None :
nominatim = Nominatim()
areaId = nominatim.query(city_country).areaId()
bbox = None
# If coordinates are specified :
elif coordinates is not None :
bbox = create_bbox(coordinates, bbox_side)
areaId = None
else :
raise ValueError("Argument number is not corresponding.")
# Initialize some variables
N = 0
L = []
overpass = Overpass()
for amenity in list_amenity :
query = overpassQueryBuilder(area=areaId, bbox=bbox, elementType=['way', 'relation'], selector=amenity, includeCenter=True, out='body')
result = overpass.query(query)
N += result.countElements()
for elem in result.elements():
name = elem.tag('name') # Add name
location = (elem.centerLat(), elem.centerLon()) # Add coordinates (lat, lon)
# skip if unprecise location
if name is None or location[0] is None:
continue
# skip if unused
if 'disused:leisure' in elem.tags().keys():
continue
# skip if part of another building
if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes':
continue
else :
osm_type = elem.type() # Add type : 'way' or 'relation'
osm_id = elem.id() # Add OSM id
elem_type = landmarktype # Add the landmark type as 'sightseeing
n_tags = len(elem.tags().keys()) # Add number of tags
# Add score of given landmark based on the number of surrounding elements. Penalty for churches as there are A LOT
if amenity == "'amenity'='place_of_worship'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*church_coeff)
elif amenity == "'leisure'='park'" :
score = int((count_elements_within_radius(location, radius) + n_tags*tag_coeff )*park_coeff)
else :
score = count_elements_within_radius(location, radius) + n_tags*tag_coeff
if score is not None :
# Generate the landmark and append it to the list
landmark = Landmark(name=name, type=elem_type, location=location, osm_type=osm_type, osm_id=osm_id, attractiveness=score, must_do=False, n_tags=n_tags)
L.append(landmark)
return L
"""

View File

@ -1,23 +0,0 @@
import fastapi
from dataclasses import dataclass
@dataclass
class Destination:
name: str
location: tuple
attractiveness: int
d = Destination()
def get_route() -> list[Destination]:
return {"route": "Hello World"}
endpoint = ("/get_route", get_route)
end
if __name__ == "__main__":
fastapi.run()

View File

@ -5,23 +5,60 @@ from scipy.spatial import KDTree
import numpy as np import numpy as np
from itertools import combinations from itertools import combinations
from structs.landmarks import Landmark from structs.landmarks import Landmark
from optimizer import print_res, link_list_simple from optimizer_v4 import print_res, link_list_simple, get_time
import os import os
import json import json
import heapq import heapq
# Define the get_distance function
def get_distance(loc1: Tuple[float, float], loc2: Tuple[float, float], detour: float, speed: float) -> Tuple[float, float]:
# Placeholder implementation, should be replaced with the actual logic
distance = geodesic(loc1, loc2).meters
return distance, distance * detour / speed
# Heuristic function: distance to the goal # Heuristic function: distance to the goal
def heuristic(loc1: Tuple[float, float], loc2: Tuple[float, float]) -> float: def heuristic(loc1: Tuple[float, float], loc2: Tuple[float, float], score2: int) -> float:
return geodesic(loc1, loc2).meters return geodesic(loc1, loc2).meters
# A* planner to search through the graph
def a_star2(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed):
open_set = []
heapq.heappush(open_set, (0, start_id, 0, [start_id], set([start_id])))
best_path = None
max_attractiveness = 0
visited_must_do = set()
while open_set:
_, current_node, current_length, path, visited = heapq.heappop(open_set)
# If current node is a must_do node and hasn't been visited yet, mark it as visited
if current_node in must_do_nodes and current_node not in visited_must_do:
visited_must_do.add(current_node)
# Check if path includes all must_do nodes and reaches the end
if current_node == end_id and all(node in visited for node in must_do_nodes):
attractiveness = sum(G.nodes[node]['weight'] for node in path)
if attractiveness > max_attractiveness:
best_path = path
max_attractiveness = attractiveness
continue
if len(path) > max_landmarks + 1:
continue
for neighbor in G.neighbors(current_node):
if neighbor not in visited:
#distance = int(geodesic(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos']).meters * detour / (speed * 16.6666))
distance = get_time(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos'], detour, speed)
if current_length + distance <= max_walking_time:
new_path = path + [neighbor]
new_visited = visited | {neighbor}
estimated_cost = current_length + distance + get_time(G.nodes[neighbor]['pos'], G.nodes[end_id]['pos'], detour, speed)
heapq.heappush(open_set, (estimated_cost, neighbor, current_length + distance, new_path, new_visited))
# Check if all must_do_nodes have been visited
if all(node in visited_must_do for node in must_do_nodes):
return best_path, max_attractiveness
else:
return None, 0
def a_star(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed): def a_star(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed):
open_set = [] open_set = []
heapq.heappush(open_set, (0, start_id, 0, [start_id], set([start_id]))) heapq.heappush(open_set, (0, start_id, 0, [start_id], set([start_id])))
@ -49,11 +86,11 @@ def a_star(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks,
for neighbor in G.neighbors(current_node): for neighbor in G.neighbors(current_node):
if neighbor not in visited: if neighbor not in visited:
distance = int(geodesic(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos']).meters * detour / (speed * 16.6666)) distance = get_time(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos'], detour, speed)
if current_length + distance <= max_walking_time: if current_length + distance <= max_walking_time:
new_path = path + [neighbor] new_path = path + [neighbor]
new_visited = visited | {neighbor} new_visited = visited | {neighbor}
estimated_cost = current_length + distance + heuristic(G.nodes[neighbor]['pos'], G.nodes[end_id]['pos']) estimated_cost = current_length + distance + heuristic(G.nodes[neighbor]['pos'], G.nodes[end_id]['pos'], G.nodes[neighbor]['weight'])
heapq.heappush(open_set, (estimated_cost, neighbor, current_length + distance, new_path, new_visited)) heapq.heappush(open_set, (estimated_cost, neighbor, current_length + distance, new_path, new_visited))
# Check if all must_do_nodes have been visited # Check if all must_do_nodes have been visited
@ -64,31 +101,6 @@ def a_star(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks,
def dfs(G, current_node, end_id, current_length, path, visited, max_walking_time, must_do_nodes, max_landmarks, detour, speed):
# If the path includes all must_do nodes and reaches the end
if current_node == end_id and all(node in path for node in must_do_nodes):
return path, sum(G.nodes[node]['weight'] for node in path)
# If the number of landmarks exceeds the maximum allowed, return None
if len(path) > max_landmarks+1:
return None, 0
best_path = None
max_attractiveness = 0
for neighbor in G.neighbors(current_node):
if neighbor not in visited:
distance = int(geodesic(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos']).meters * detour / (speed*16.6666))
if current_length + distance <= max_walking_time:
new_path = path + [neighbor]
new_visited = visited | {neighbor}
result_path, attractiveness = dfs(G, neighbor, end_id, current_length + distance, new_path, new_visited, max_walking_time, must_do_nodes, max_landmarks, detour, speed)
if attractiveness > max_attractiveness:
best_path = result_path
max_attractiveness = attractiveness
return best_path, max_attractiveness
def find_path(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks) -> List[str]: def find_path(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks) -> List[str]:
# Read the parameters from the file # Read the parameters from the file
@ -97,15 +109,12 @@ def find_path(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landm
detour = parameters['detour factor'] detour = parameters['detour factor']
speed = parameters['average walking speed'] speed = parameters['average walking speed']
"""if G[start_id]['pos'] == G[finish_id]['pos'] :
best_path, _ = dfs(G, start_id, finish_id, 0, [start_id], {start_id}, max_walking_time, must_do_nodes, max_landmarks, detour, speed)
else :"""
best_path, _ = a_star(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed) best_path, _ = a_star(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed)
return best_path if best_path else [] return best_path if best_path else []
# Function to dynamically adjust theta # Function to dynamically adjust theta
def adjust_theta(num_nodes, theta_opt, target_ratio=2.0): def adjust_theta(num_nodes, theta_opt, target_ratio=2.0):
# Start with an initial guess # Start with an initial guess
@ -114,62 +123,8 @@ def adjust_theta(num_nodes, theta_opt, target_ratio=2.0):
return initial_theta / (num_nodes ** (1 / target_ratio)) return initial_theta / (num_nodes ** (1 / target_ratio))
# Create a graph using NetworkX and generate the path # Create a graph using NetworkX and generate the path without must_do
def generate_path(landmarks: List[Landmark], max_walking_time: float, max_landmarks: int, theta_opt = 0.0008) -> List[List[Landmark]]: def generate_path(landmarks: List[Landmark], max_walking_time: float, max_landmarks: int) -> List[List[Landmark]]:
landmap = {}
pos_dict = {}
weight_dict = {}
# Add nodes to the graph with attractiveness
for i, landmark in enumerate(landmarks):
#G.nodes[i]['attractiveness'] = landmark.attractiveness
pos_dict[i] = landmark.location
weight_dict[i] = landmark.attractiveness
#G.nodes[i]['pos'] = landmark.location
landmap[i] = landmark
if landmark.name == 'start' :
start_id = i
elif landmark.name == 'finish' :
end_id = i
# Lambda version of get_distance
get_dist = lambda loc1, loc2: geodesic(loc1, loc2).meters + 0.001 #.meters*detour/speed +0.0000001
theta = adjust_theta(len(landmarks), theta_opt)
G = nx.geographical_threshold_graph(n=len(landmarks), theta=theta, pos=pos_dict, weight=weight_dict, metric=get_dist)
# good theta : 0.000125
# Define must_do nodes
must_do_nodes = [i for i in G.nodes() if landmap[i].must_do]
for node1, node2 in combinations(must_do_nodes, 2):
if not G.has_edge(node1, node2):
distance = geodesic(G.nodes[node1]['pos'], G.nodes[node2]['pos']).meters + 0.001
G.add_edge(node1, node2, weight=distance)
print(f"Graph with {G.number_of_nodes()} nodes")
print(f"Graph with {G.number_of_edges()} edges")
print("Computing path...")
# Find the valid path using the greedy algorithm
valid_path = find_path(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks)
if not valid_path:
return [] # No valid path found
lis = [landmap[id] for id in valid_path]
lis, tot_dist = link_list_simple(lis)
print_res(lis, len(landmarks))
return lis
# Create a graph using NetworkX and generate the path
def generate_path2(landmarks: List[Landmark], max_walking_time: float, max_landmarks: int) -> List[List[Landmark]]:
# Read the parameters from the file # Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
@ -177,11 +132,11 @@ def generate_path2(landmarks: List[Landmark], max_walking_time: float, max_landm
detour = parameters['detour factor'] detour = parameters['detour factor']
speed = parameters['average walking speed'] speed = parameters['average walking speed']
landmap = {} landmap = {}
pos_dict = {} pos_dict = {}
weight_dict = {} weight_dict = {}
G = nx.Graph() G = nx.Graph()
# Add nodes to the graph with attractiveness # Add nodes to the graph with attractiveness
for i, landmark in enumerate(landmarks): for i, landmark in enumerate(landmarks):
pos_dict[i] = landmark.location pos_dict[i] = landmark.location
@ -193,40 +148,89 @@ def generate_path2(landmarks: List[Landmark], max_walking_time: float, max_landm
elif landmark.name == 'finish' : elif landmark.name == 'finish' :
finish_id = i finish_id = i
"""# If start and finish are the same no need to add another node
if pos_dict[finish_id] == pos_dict[start_id] :
end_id = start_id
else :
G.add_node(finish_id, pos=pos_dict[finish_id], weight=weight_dict[finish_id])
end_id = finish_id"""
# Lambda version of get_distance
#get_dist = lambda loc1, loc2: geodesic(loc1, loc2).meters + 0.001 #.meters*detour/speed +0.0000001
coords = np.array(list(pos_dict.values())) coords = np.array(list(pos_dict.values()))
kdtree = KDTree(coords) kdtree = KDTree(coords)
k = 4 k = 5
if len(landmarks) <= k :
k = len(landmarks)-1
for node, coord in pos_dict.items(): for node, coord in pos_dict.items():
indices = kdtree.query(coord, k + 1)[1] # k+1 because the closest neighbor is the node itself indices = kdtree.query(coord, k + 1)[1] # k+1 because the closest neighbor is the node itself
for idx in indices[1:]: # skip the first one (itself) for idx in indices[1:]: # skip the first one (itself)
neighbor = list(pos_dict.keys())[idx] neighbor = list(pos_dict.keys())[idx]
distance = get_distance(coord, pos_dict[neighbor], detour, speed) distance = get_time(coord, pos_dict[neighbor], detour, speed)
G.add_edge(node, neighbor, weight=distance)
print(f"Graph with {G.number_of_nodes()} nodes and {G.number_of_edges()} edges")
print("Start computing path...")
# Find the valid path using the greedy algorithm
valid_path = find_path(G, start_id, finish_id, max_walking_time, [], max_landmarks)
if not valid_path:
return [] # No valid path found
lis = [landmap[id] for id in valid_path]
lis, _ = link_list_simple(lis)
print_res(lis, len(landmarks))
return lis
# Create a graph using NetworkX and generate the path
def generate_path2(landmarks: List[Landmark], max_walking_time: float, max_landmarks: int) -> List[List[Landmark]]:
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
landmap = {}
pos_dict = {}
weight_dict = {}
must_do_nodes = []
G = nx.Graph()
# Add nodes to the graph with attractiveness
for i, landmark in enumerate(landmarks):
pos_dict[i] = landmark.location
weight_dict[i] = landmark.attractiveness
landmap[i] = landmark
if landmark.must_do :
must_do_nodes.append(i)
G.add_node(i, pos=landmark.location, weight=landmark.attractiveness)
if landmark.name == 'start' :
start_id = i
elif landmark.name == 'finish' :
finish_id = i
coords = np.array(list(pos_dict.values()))
kdtree = KDTree(coords)
k = 3
for node, coord in pos_dict.items():
indices = kdtree.query(coord, k + 1)[1] # k+1 because the closest neighbor is the node itself
for idx in indices[1:]: # skip the first one (itself)
neighbor = list(pos_dict.keys())[idx]
distance = get_time(coord, pos_dict[neighbor], detour, speed)
G.add_edge(node, neighbor, weight=distance) G.add_edge(node, neighbor, weight=distance)
# Define must_do nodes
must_do_nodes = [i for i in G.nodes() if landmap[i].must_do]
# Add special edges between must_do nodes # Add special edges between must_do nodes
if len(must_do_nodes) > 0 : if len(must_do_nodes) > 0 :
for node1, node2 in combinations(must_do_nodes, 2): for node1, node2 in combinations(must_do_nodes, 2):
if not G.has_edge(node1, node2): if not G.has_edge(node1, node2):
distance = get_distance(G.nodes[node1]['pos'], G.nodes[node2]['pos'], detour, speed) distance = get_time(G.nodes[node1]['pos'], G.nodes[node2]['pos'], detour, speed)
G.add_edge(node1, node2, weight=distance) G.add_edge(node1, node2, weight=distance)
print(f"Graph with {G.number_of_nodes()} nodes") print(f"Graph with {G.number_of_nodes()} nodes and {G.number_of_edges()} edges")
print(f"Graph with {G.number_of_edges()} edges")
print("Computing path...") print("Computing path...")
# Find the valid path using the greedy algorithm # Find the valid path using the greedy algorithm
@ -249,12 +253,38 @@ def generate_path2(landmarks: List[Landmark], max_walking_time: float, max_landm
def correct_path(tour: List[Landmark]) -> List[Landmark] : def correct_path(tour: List[Landmark]) -> List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
G = nx.Graph()
coords = [] coords = []
for landmark in tour : landmap = {}
for i, landmark in enumerate(tour) :
coords.append(landmark.location) coords.append(landmark.location)
landmap[i] = landmark
G.add_node(i, pos=landmark.location, weight=landmark.attractiveness)
G = nx.circulant_graph(n=len(tour), create_using=coords) kdtree = KDTree(coords)
path = nx.shortest_path(G=G, source=tour[0].location, target=tour[-1].location) k = 3
for node, coord in coords:
indices = kdtree.query(coord, k + 1)[1] # k+1 because the closest neighbor is the node itself
for idx in indices[1:]: # skip the first one (itself)
neighbor = list(coords)[idx]
distance = get_time(coord, coords[neighbor], detour, speed)
G.add_edge(node, neighbor, weight=distance)
path = nx.approximation.traveling_salesman_problem(G, weight='weight', cycle=True)
lis = [landmap[id] for id in path]
lis, tot_dist = link_list_simple(lis)
print_res(lis, len(tour))
return path return path

326
backend/src/optimizer_v3.py Normal file
View File

@ -0,0 +1,326 @@
import numpy as np
import json, os
from typing import List, Tuple
from itertools import combinations
from scipy.optimize import linprog
from math import radians, sin, cos, acos
from shapely import Polygon
from geopy.distance import geodesic
from structs.landmarks import Landmark
# Function to print the result
def print_res(L: List[Landmark], L_tot):
if len(L) == L_tot:
print('\nAll landmarks can be visited within max_steps, the following order is suggested : ')
else :
print('Could not visit all the landmarks, the following order is suggested : ')
dist = 0
for elem in L :
if elem.time_to_reach_next is not None :
print('- ' + elem.name + ', time to reach next = ' + str(elem.time_to_reach_next))
dist += elem.time_to_reach_next
else :
print('- ' + elem.name)
print("\nMinutes walked : " + str(dist))
print(f"Visited {len(L)-2} out of {L_tot-2} landmarks")
# Function that returns the distance in meters from one location to another
def get_time(p1: Tuple[float, float], p2: Tuple[float, float], detour: float, speed: float) :
# Compute the straight-line distance in m
if p1 == p2 :
return 0
else:
#dist = 1000 * 6371.01 * acos(sin(radians(p1[0]))*sin(radians(p2[0])) + cos(radians(p1[0]))*cos(radians(p2[0]))*cos(radians(p1[1]) - radians(p2[1])))
dist = geodesic(p1, p2).meters
# Consider the detour factor for average city to determine walking distance (in m)
walk_dist = dist*detour
# Time to walk this distance (in minutes)
walk_time = walk_dist/speed*(60/1000)
"""if walk_time > 15 :
walk_time = 5*round(walk_time/5)
else :
walk_time = round(walk_time)"""
return round(walk_time)
# Checks if the path is connected, returns a circle if it finds one and the RESULT
def is_connected(resx) -> bool:
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
resx = resx[:L*L]
# first round the results to have only 0-1 values
for i, elem in enumerate(resx):
resx[i] = round(elem)
n_edges = resx.sum() # number of edges
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
edges = []
edges_visited = []
vertices_visited = []
edge1 = (ind_a[0], ind_b[0])
edges_visited.append(edge1)
vertices_visited.append(edge1[0])
for i, a in enumerate(ind_a) :
edges.append((a, ind_b[i])) # Create the list of edges
remaining = edges
remaining.remove(edge1)
break_flag = False
while len(remaining) > 0 and not break_flag:
for edge2 in remaining :
if edge2[0] == edge1[1] :
if edge1[1] in vertices_visited :
edges_visited.append(edge2)
break_flag = True
break
else :
vertices_visited.append(edge1[1])
edges_visited.append(edge2)
remaining.remove(edge2)
edge1 = edge2
elif edge1[1] == L-1 or edge1[1] in vertices_visited:
break_flag = True
break
vertices_visited.append(edge1[1])
if len(vertices_visited) == n_edges +1 :
return vertices_visited, []
else:
return vertices_visited, edges_visited
# Computes the time to reach from each landmark to the next
def link_list(order: List[int], landmarks: List[Landmark])->List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour_factor = parameters['detour factor']
speed = parameters['average walking speed']
L = []
j = 0
total_dist = 0
while j < len(order)-1 :
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
d = get_time(elem.location, next.location, detour_factor, speed)
elem.time_to_reach_next = d
if elem.name not in ['start', 'finish'] :
elem.must_do = True
L.append(elem)
j += 1
total_dist += d
L.append(next)
return L, total_dist
# Constraint to respect only one travel per landmark. Also caps the total number of visited landmarks
def respect_number(L:int, A_ub, b_ub):
ones = [1]*L
zeros = [0]*L
for i in range(L) :
h = zeros*i + ones + zeros*(L-1-i) + [0]*(L-1)
A_ub.append(h)
b_ub.append(1)
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
max_landmarks = parameters['max landmarks']
A_ub.append(ones*L + [0]*(L-1))
b_ub.append(max_landmarks+1)
return A_ub, b_ub
def solve_optimizationv3(landmarks, max_walking_time):
L = len(landmarks)
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
# Create distance matrix
A = np.zeros((L, L))
for i in range(L):
for j in range(L):
if i != j:
A[i, j] = get_time(landmarks[i].location, landmarks[j].location, detour, speed)
# Define the linear program
c = np.hstack((A.flatten(), [0]*(L-1)))
bounds = [(0, 1) for _ in range(L*L+L-1)]
# Flow conservation constraints
A_eq = []
b_eq = []
# Each node (except start and end) has one incoming and one outgoing edge
for i in range(L):
if i == 0 or i == L-1:
continue
A_eq.append([1 if j // L == i else 0 for j in range(L*L)] + [0]*(L-1))
b_eq.append(1)
A_eq.append([1 if j % L == i else 0 for j in range(L*L)] + [0]*(L-1))
b_eq.append(1)
# Start node constraint
A_eq.append([1 if j // L == 0 else 0 for j in range(L*L)] + [0]*(L-1))
b_eq.append(1)
# End node constraint
A_eq.append([1 if j % L == L-1 else 0 for j in range(L*L)] + [0]*(L-1))
b_eq.append(1)
# Subtour elimination constraints
A_ub = []
b_ub = []
# u_i - u_j + L*x_ij <= L-1 for all i != j
for i in range(1, L):
for j in range(1, L):
if i != j:
constraint = [0] * (L * L + L - 1)
constraint[i * L + j] = L
constraint[j * L + i] = -L
A_ub.append(constraint)
b_ub.append(L - 1)
A_ub, b_ub = respect_number(L, A_ub, b_ub) # Respect max number of visits (no more possible stops than landmarks).
# Convert constraints to numpy arrays
A_eq = np.array(A_eq)
A_ub = np.array(A_ub)
b_ub = np.array(b_ub)
b_eq = np.array(b_eq)
# Solve the linear program
result = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')
if result.success:
x = result.x[:L*L].reshape((L, L))
path = []
for i in range(L):
for j in range(L):
if x[i, j] > 0.5:
path.append((i, j))
print(f"({i}, {j})")
order, _ = is_connected(result.x)
L, _ = link_list(order, landmarks)
print_res(L, len(landmarks))
print("\nTotal score : " + str(int(-result.fun)))
return L
else:
print("no results")
return []
# Main optimization pipeline
# def solve_optimization (landmarks :List[Landmark], max_steps: int, printing_details: bool) :
# L = len(landmarks)
# # SET CONSTRAINTS FOR INEQUALITY
# #c, A_ub, b_ub = init_ub_dist(landmarks, max_steps) # Add the distances from each landmark to the other
# A_ub, b_ub = respect_number(L, A_ub, b_ub) # Respect max number of visits (no more possible stops than landmarks).
# #A_ub, b_ub = break_sym(L, A_ub, b_ub) # break the 'zig-zag' symmetry
# #A_ub, b_ub = prevent_subtours(L, A_ub, b_ub)
# # SET CONSTRAINTS FOR EQUALITY
# #A_eq, b_eq = init_eq_not_stay(L) # Force solution not to stay in same place
# #A_eq, b_eq = respect_user_mustsee(landmarks, A_eq, b_eq) # Check if there are user_defined must_see. Also takes care of start/goal
# #A_eq, b_eq = respect_start_finish(L, A_eq, b_eq) # Force start and finish positions
# #A_eq, b_eq = respect_order(L, A_eq, b_eq) # Respect order of visit (only works when max_steps is limiting factor)
# # SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
# x_bounds = [(0, 1)]*(L*L + L)
# # Solve linear programming problem
# res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# # Raise error if no solution is found
# if not res.success :
# raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos")
# # If there is a solution, we're good to go, just check for connectiveness
# else :
# order, circle = is_connected(res.x)
# i = 0
# timeout = 80
# """while len(circle) != 0 and i < timeout:
# A_ub, b_ub = prevent_config(res.x, A_ub, b_ub)
# #A_ub, b_ub = break_cricle(order, len(landmarks), A_ub, b_ub)
# res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# if not res.success :
# raise ArithmeticError(f"No solution found after {timeout} iterations.")
# order, circle = is_connected(res.x)
# if len(circle) == 0 :
# break
# print(i)
# i += 1
# if i == timeout :
# raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
# """
# # Add the times to reach and stop optimizing
# L, total_dist = link_list(order, landmarks)
# if printing_details is True :
# if i != 0 :
# print(f"Neded to recompute paths {i} times because of unconnected loops...")
# print_res(L, len(landmarks))
# print("\nTotal score : " + str(int(-res.fun)))
# return L

422
backend/src/optimizer_v4.py Normal file
View File

@ -0,0 +1,422 @@
import numpy as np
import json, os
from typing import List, Tuple
from scipy.optimize import linprog
from math import radians, sin, cos, acos
from geopy.distance import geodesic
from shapely import Polygon
from structs.landmarks import Landmark
# Function to print the result
def print_res(L: List[Landmark], L_tot):
if len(L) == L_tot:
print('\nAll landmarks can be visited within max_steps, the following order is suggested : ')
else :
print('Could not visit all the landmarks, the following order is suggested : ')
dist = 0
for elem in L :
if elem.time_to_reach_next is not None :
print('- ' + elem.name + ', time to reach next = ' + str(elem.time_to_reach_next))
dist += elem.time_to_reach_next
else :
print('- ' + elem.name)
print("\nMinutes walked : " + str(dist))
print(f"Visited {len(L)-2} out of {L_tot-2} landmarks")
# Prevent the use of a particular solution
def prevent_config(resx, A_ub, b_ub) -> bool:
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
vertices_visited = ind_a
vertices_visited.remove(0)
ones = [1]*L
h = [0]*N
for i in range(L) :
if i in vertices_visited :
h[i*L:i*L+L] = ones
A_ub = np.vstack((A_ub, h))
b_ub.append(len(vertices_visited)-1)
return A_ub, b_ub
# Prevent the possibility of a given solution bit
def break_cricle(circle_vertices: list, L: int, A_ub: list, b_ub: list) -> bool:
if L-1 in circle_vertices :
circle_vertices.remove(L-1)
h = [0]*L*L
for i in range(L) :
if i in circle_vertices :
h[i*L:i*L+L] = [1]*L
A_ub = np.vstack((A_ub, h))
b_ub.append(len(circle_vertices)-1)
return A_ub, b_ub
# Checks if the path is connected, returns a circle if it finds one and the RESULT
def is_connected(resx) -> bool:
# first round the results to have only 0-1 values
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
n_edges = resx.sum() # number of edges
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
edges = []
edges_visited = []
vertices_visited = []
edge1 = (ind_a[0], ind_b[0])
edges_visited.append(edge1)
vertices_visited.append(edge1[0])
for i, a in enumerate(ind_a) :
edges.append((a, ind_b[i])) # Create the list of edges
remaining = edges
remaining.remove(edge1)
break_flag = False
while len(remaining) > 0 and not break_flag:
for edge2 in remaining :
if edge2[0] == edge1[1] :
if edge1[1] in vertices_visited :
edges_visited.append(edge2)
break_flag = True
break
else :
vertices_visited.append(edge1[1])
edges_visited.append(edge2)
remaining.remove(edge2)
edge1 = edge2
elif edge1[1] == L-1 or edge1[1] in vertices_visited:
break_flag = True
break
vertices_visited.append(edge1[1])
if len(vertices_visited) == n_edges +1 :
return vertices_visited, []
else:
return vertices_visited, edges_visited
# Function that returns the distance in meters from one location to another
def get_time(p1: Tuple[float, float], p2: Tuple[float, float], detour: float, speed: float) :
# Compute the straight-line distance in km
if p1 == p2 :
return 0
else:
#dist = 6371.01 * acos(sin(radians(p1[0]))*sin(radians(p2[0])) + cos(radians(p1[0]))*cos(radians(p2[0]))*cos(radians(p1[1]) - radians(p2[1])))
dist = geodesic(p1, p2).kilometers
# Consider the detour factor for average cityto deterline walking distance (in km)
walk_dist = dist*detour
# Time to walk this distance (in minutes)
walk_time = walk_dist/speed*60
"""if walk_time > 15 :
walk_time = 5*round(walk_time/5)
else :
walk_time = round(walk_time)"""
return round(walk_time)
# Initialize A and c. Compute the distances from all landmarks to each other and store attractiveness
# We want to maximize the sightseeing : max(c) st. A*x < b and A_eq*x = b_eq
def init_ub_dist(landmarks: List[Landmark], max_steps: int):
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
for spot1 in landmarks :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
t = get_time(spot1.location, spot2.location, detour, speed)
dist_table[j] = t
A_ub += dist_table
c = c*len(landmarks)
return c, A_ub, [max_steps]
# Constraint to respect only one travel per landmark. Also caps the total number of visited landmarks
def respect_number(L:int, A_ub, b_ub):
ones = [1]*L
zeros = [0]*L
for i in range(L) :
h = zeros*i + ones + zeros*(L-1-i)
A_ub = np.vstack((A_ub, h))
b_ub.append(1)
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
max_landmarks = parameters['max landmarks']
A_ub = np.vstack((A_ub, ones*L))
b_ub.append(max_landmarks+1)
return A_ub, b_ub
# Constraint to not have d14 and d41 simultaneously. Does not prevent circular symmetry with more elements
def break_sym(L, A_ub, b_ub):
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
for i, _ in enumerate(up_ind_x) :
l = [0]*L*L
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
A_ub = np.vstack((A_ub,l))
b_ub.append(1)
return A_ub, b_ub
# Constraint to not stay in position. Removes d11, d22, d33, etc.
def init_eq_not_stay(L: int):
l = [0]*L*L
for i in range(L) :
for j in range(L) :
if j == i :
l[j + i*L] = 1
l = np.array(np.array(l))
return [l], [0]
# Go through the landmarks and force the optimizer to use landmarks where attractiveness is set to -1
def respect_user_mustsee(landmarks: List[Landmark], A_eq: list, b_eq: list) :
L = len(landmarks)
for i, elem in enumerate(landmarks) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
for j in range(L) : # sets the horizontal ones (go from)
l[j +i*L] = 1 # sets the vertical ones (go to) double check if good
for k in range(L-1) :
l[k*L+L-1] = 1
A_eq = np.vstack((A_eq,l))
b_eq.append(2)
return A_eq, b_eq
# Constraint to ensure start at start and finish at goal
def respect_start_finish(L: int, A_eq: list, b_eq: list):
ls = [1]*L + [0]*L*(L-1) # sets only horizontal ones for start (go from)
ljump = [0]*L*L
ljump[L-1] = 1 # Prevent start finish jump
lg = [0]*L*L
ll = [0]*L*(L-1) + [1]*L
for k in range(L-1) : # sets only vertical ones for goal (go to)
ll[k*L] = 1
if k != 0 : # Prevent the shortcut start -> finish
lg[k*L+L-1] = 1
A_eq = np.vstack((A_eq,ls))
A_eq = np.vstack((A_eq,ljump))
A_eq = np.vstack((A_eq,lg))
A_eq = np.vstack((A_eq,ll))
b_eq.append(1)
b_eq.append(0)
b_eq.append(1)
b_eq.append(0)
return A_eq, b_eq
# Constraint to tie the problem together. Necessary but not sufficient to avoid circles
def respect_order(N: int, A_eq, b_eq):
for i in range(N-1) : # Prevent stacked ones
if i == 0 or i == N-1: # Don't touch start or finish
continue
else :
l = [0]*N
l[i] = -1
l = l*N
for j in range(N) :
l[i*N + j] = 1
A_eq = np.vstack((A_eq,l))
b_eq.append(0)
return A_eq, b_eq
# Computes the time to reach from each landmark to the next
def link_list(order: List[int], landmarks: List[Landmark])->List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour_factor = parameters['detour factor']
speed = parameters['average walking speed']
L = []
j = 0
total_dist = 0
while j < len(order)-1 :
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
d = get_time(elem.location, next.location, detour_factor, speed)
elem.time_to_reach_next = d
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
L.append(elem)
j += 1
total_dist += d
next.location = (round(next.location[0], 5), round(next.location[1], 5))
L.append(next)
return L, total_dist
def link_list_simple(ordered_visit: List[Landmark])-> List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour_factor = parameters['detour factor']
speed = parameters['average walking speed']
L = []
j = 0
total_dist = 0
while j < len(ordered_visit)-1 :
elem = ordered_visit[j]
next = ordered_visit[j+1]
elem.next_uuid = next.uuid
d = get_time(elem.location, next.location, detour_factor, speed)
elem.time_to_reach_next = d
if elem.name not in ['start', 'finish'] :
elem.must_do = True
L.append(elem)
j += 1
total_dist += d
L.append(next)
return L, total_dist
# Main optimization pipeline
def solve_optimization (landmarks :List[Landmark], max_steps: int, printing_details: bool) :
L = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = init_ub_dist(landmarks, max_steps) # Add the distances from each landmark to the other
A_ub, b_ub = respect_number(L, A_ub, b_ub) # Respect max number of visits (no more possible stops than landmarks).
A_ub, b_ub = break_sym(L, A_ub, b_ub) # break the 'zig-zag' symmetry
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = init_eq_not_stay(L) # Force solution not to stay in same place
A_eq, b_eq = respect_user_mustsee(landmarks, A_eq, b_eq) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq, b_eq = respect_start_finish(L, A_eq, b_eq) # Force start and finish positions
A_eq, b_eq = respect_order(L, A_eq, b_eq) # Respect order of visit (only works when max_steps is limiting factor)
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# Raise error if no solution is found
if not res.success :
raise ArithmeticError("No solution could be found, the problem is overconstrained. Please adapt your must_dos")
# If there is a solution, we're good to go, just check for connectiveness
else :
order, circle = is_connected(res.x)
i = 0
timeout = 80
while len(circle) != 0 and i < timeout:
#A_ub, b_ub = prevent_config(res.x, A_ub, b_ub)
A_ub, b_ub = break_cricle(order, len(landmarks), A_ub, b_ub)
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
order, circle = is_connected(res.x)
if len(circle) == 0 :
break
print(i)
i += 1
if i == timeout :
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
# Add the times to reach and stop optimizing
L, total_dist = link_list(order, landmarks)
if printing_details is True :
if i != 0 :
print(f"Neded to recompute paths {i} times because of unconnected loops...")
print_res(L, len(landmarks))
print("\nTotal score : " + str(int(-res.fun)))
return L

View File

@ -1,8 +1,8 @@
{ {
"city bbox side" : 3, "city bbox side" : 3,
"radius close to" : 27.5, "radius close to" : 50,
"church coeff" : 0.7, "church coeff" : 0.9,
"park coeff" : 1.5, "park coeff" : 1.2,
"tag coeff" : 100, "tag coeff" : 10,
"N important" : 40 "N important" : 30
} }

View File

@ -1,5 +1,5 @@
{ {
"detour factor" : 1.4, "detour factor" : 1.4,
"average walking speed" : 4.8, "average walking speed" : 4.8,
"max landmarks" : 8 "max landmarks" : 7
} }

View File

@ -5,11 +5,13 @@ import os, json
from shapely import buffer, LineString, Point, Polygon, MultiPoint, convex_hull, concave_hull, LinearRing from shapely import buffer, LineString, Point, Polygon, MultiPoint, convex_hull, concave_hull, LinearRing
from typing import List, Tuple from typing import List, Tuple
from scipy.spatial import KDTree
from math import pi from math import pi
import networkx as nx
from structs.landmarks import Landmark from structs.landmarks import Landmark
from landmarks_manager import take_most_important from landmarks_manager import take_most_important
from optimizer import solve_optimization, link_list_simple, print_res, get_distance from optimizer_v4 import solve_optimization, link_list_simple, print_res, get_time
from optimizer_v2 import generate_path, generate_path2 from optimizer_v2 import generate_path, generate_path2
@ -63,7 +65,7 @@ def rearrange(landmarks: List[Landmark]) -> List[Landmark]:
return landmarks return landmarks
def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> List[Landmark]: def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> Tuple[List[Landmark], Polygon]:
# Read from data # Read from data
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
@ -90,7 +92,7 @@ def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> List[
# Step 4: Use nearest neighbor heuristic to visit all landmarks # Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks: while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_distance(current_landmark.location, lm.location, detour, speed)[1]) nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location, detour, speed))
path.append(nearest_landmark) path.append(nearest_landmark)
coordinates.append(nearest_landmark.location) coordinates.append(nearest_landmark.location)
current_landmark = nearest_landmark current_landmark = nearest_landmark
@ -120,23 +122,6 @@ def get_minor_landmarks(all_landmarks: List[Landmark], visited_landmarks: List[L
return take_most_important(second_order_landmarks, len(visited_landmarks)) return take_most_important(second_order_landmarks, len(visited_landmarks))
def get_minor_landmarks2(all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] :
second_order_landmarks = []
visited_names = []
area = create_corridor(visited_landmarks, width)
for visited in visited_landmarks :
visited_names.append(visited.name)
for landmark in all_landmarks :
if is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
return take_most_important(second_order_landmarks, len(visited_landmarks))
"""def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] : """def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] :
@ -152,6 +137,52 @@ def get_minor_landmarks2(all_landmarks: List[Landmark], visited_landmarks: List[
return new_tour""" return new_tour"""
def fix_using_polygon(tour: List[Landmark])-> List[Landmark] :
coords = []
coords_dict = {}
for landmark in tour :
coords.append(landmark.location)
if landmark.name != 'finish' :
coords_dict[landmark.location] = landmark
tour_poly = Polygon(coords)
better_tour_poly = tour_poly.buffer(0)
xs, ys = better_tour_poly.exterior.xy
if len(xs) != len(tour) :
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
# reverse the xs and ys
xs.reverse()
ys.reverse()
better_tour = [] # List of ordered visit
name_index = {} # Maps the name of a landmark to its index in the concave polygon
# Loop through the polygon and generate the better (ordered) tour
for i,x in enumerate(xs[:-1]) :
y = ys[i]
better_tour.append(coords_dict[tuple((x,y))])
name_index[coords_dict[tuple((x,y))].name] = i
# Scroll the list to have start in front again
start_index = name_index['start']
better_tour = better_tour[start_index:] + better_tour[:start_index]
# Append the finish back and correct the time to reach
better_tour.append(tour[-1])
# Rearrange only if polygon
better_tour = rearrange(better_tour)
return better_tour
def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] : def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] :
# Read from the file # Read from the file
@ -159,10 +190,6 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma
parameters = json.loads(f.read()) parameters = json.loads(f.read())
max_landmarks = parameters['max landmarks'] + 4 max_landmarks = parameters['max landmarks'] + 4
if len(base_tour)-2 >= max_landmarks :
return base_tour
minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200) minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200)
if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path") if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path")
@ -175,66 +202,12 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma
new_tour = solve_optimization(full_set, max_time, False) new_tour = solve_optimization(full_set, max_time, False)
new_tour, new_dist = link_list_simple(new_tour) new_tour, new_dist = link_list_simple(new_tour)
"""#if base_tour[0].location == base_tour[-1].location :
if False :
coords = [] # Coordinates of the new tour
coords_dict = {} # maps the location of an element to the element itself. Used to access the elements back once we get the geometry
# Iterate through the new tour without finish
for elem in new_tour[:-1] :
coords.append(Point(elem.location))
coords_dict[elem.location] = elem # if start = goal, only finish remains
# Create a concave polygon using the coordinates
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
# reverse the xs and ys
xs.reverse()
ys.reverse()
better_tour = [] # List of ordered visit
name_index = {} # Maps the name of a landmark to its index in the concave polygon
# Loop through the polygon and generate the better (ordered) tour
for i,x in enumerate(xs[:-1]) :
better_tour.append(coords_dict[tuple((x,ys[i]))])
name_index[coords_dict[tuple((x,ys[i]))].name] = i
# Scroll the list to have start in front again
start_index = name_index['start']
better_tour = better_tour[start_index:] + better_tour[:start_index]
# Append the finish back and correct the time to reach
better_tour.append(new_tour[-1])
# Rearrange only if polygon
better_tour = rearrange(better_tour)
# Add the time to reach
better_tour = add_time_to_reach_simple(better_tour)
"""
"""
if not better_poly.is_simple :
coords_dict = {}
better_tour2 = []
for elem in better_tour :
coords_dict[elem.location] = elem
better_poly2 = better_poly.buffer(0)
new_coords = better_poly2.exterior.coords[:]
start_coords = base_tour[0].location
start_index = new_coords.
#for point in new_coords :
"""
better_tour, better_poly = find_shortest_path_through_all_landmarks(new_tour) better_tour, better_poly = find_shortest_path_through_all_landmarks(new_tour)
if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid :
better_tour = fix_using_polygon(better_tour)
better_tour, better_dist = link_list_simple(better_tour) better_tour, better_dist = link_list_simple(better_tour)
if new_dist < better_dist : if new_dist < better_dist :
@ -258,12 +231,12 @@ def refine_path(landmarks: List[Landmark], base_tour: List[Landmark], max_time:
# Read from the file # Read from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read()) parameters = json.loads(f.read())
max_landmarks = parameters['max landmarks'] + 4 max_landmarks = parameters['max landmarks'] + 3
"""if len(base_tour)-2 >= max_landmarks : """if len(base_tour)-2 >= max_landmarks :
return base_tour""" return base_tour"""
minor_landmarks = get_minor_landmarks2(landmarks, base_tour, 200) minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200)
if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path") if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path")
@ -277,3 +250,46 @@ def refine_path(landmarks: List[Landmark], base_tour: List[Landmark], max_time:
# If a tour is not connected
def correct_path(tour: List[Landmark]) -> List[Landmark] :
# Read the parameters from the file
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
G = nx.Graph()
coords = []
landmap = {}
for i, landmark in enumerate(tour) :
coords.append(landmark.location)
landmap[i] = landmark
G.add_node(i, pos=landmark.location, weight=landmark.attractiveness)
kdtree = KDTree(coords)
k = 3
for node, coord in coords:
indices = kdtree.query(coord, k + 1)[1] # k+1 because the closest neighbor is the node itself
for idx in indices[1:]: # skip the first one (itself)
neighbor = list(coords)[idx]
distance = get_time(coord, coords[neighbor], detour, speed)
G.add_edge(node, neighbor, weight=distance)
path = nx.approximation.traveling_salesman_problem(G, weight='weight', cycle=True)
if len(path) != len(tour) :
print("nope")
lis = [landmap[id] for id in path]
lis, tot_dist = link_list_simple(lis)
print_res(lis, len(tour))
return path

View File

@ -7,8 +7,6 @@ from uuid import uuid4
# Output to frontend # Output to frontend
class Landmark(BaseModel) : class Landmark(BaseModel) :
# Unique ID of a given landmark
uuid: str = Field(default_factory=uuid4) # TODO implement this ASAP
# Properties of the landmark # Properties of the landmark
name : str name : str
@ -22,6 +20,9 @@ class Landmark(BaseModel) :
description : Optional[str] = None # TODO future description : Optional[str] = None # TODO future
duration : Optional[int] = 0 # TODO future duration : Optional[int] = 0 # TODO future
# Unique ID of a given landmark
uuid: str = Field(default_factory=uuid4) # TODO implement this ASAP
# Additional properties depending on specific tour # Additional properties depending on specific tour
must_do : bool must_do : bool
is_secondary : Optional[bool] = False # TODO future is_secondary : Optional[bool] = False # TODO future

View File

@ -6,8 +6,8 @@ from typing import List
from landmarks_manager import generate_landmarks from landmarks_manager import generate_landmarks
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from optimizer import solve_optimization from optimizer_v4 import solve_optimization
from optimizer_v2 import generate_path, generate_path2 from optimizer_v2 import generate_path
from refiner import refine_optimization, refine_path from refiner import refine_optimization, refine_path
from structs.landmarks import Landmark from structs.landmarks import Landmark
from structs.landmarktype import LandmarkType from structs.landmarktype import LandmarkType
@ -94,9 +94,10 @@ def test4(coordinates: tuple[float, float]) -> List[Landmark]:
# Generate the landmarks from the start location # Generate the landmarks from the start location
landmarks, landmarks_short = generate_landmarks(preferences=preferences, coordinates=start.location) landmarks, landmarks_short = generate_landmarks(preferences=preferences, coordinates=start.location)
#write_data(landmarks, "landmarks.txt") #write_data(landmarks, "landmarks_Lyon.txt")
# Insert start and finish to the landmarks list # Insert start and finish to the landmarks list
#landmarks_short = landmarks_short[:4]
landmarks_short.insert(0, start) landmarks_short.insert(0, start)
landmarks_short.append(finish) landmarks_short.append(finish)
@ -104,24 +105,29 @@ def test4(coordinates: tuple[float, float]) -> List[Landmark]:
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f : with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read()) parameters = json.loads(f.read())
max_landmarks = parameters['max landmarks'] max_landmarks = parameters['max landmarks']
max_walking_time = 45 # minutes max_walking_time = 120 # minutes
detour = 10 # minutes detour = 30 # minutes
# First stage optimization # First stage optimization
#base_tour = solve_optimization(landmarks_short, max_walking_time*60, True) #base_tour = solve_optimization(landmarks_short, max_walking_time, True)
#base_tour = solve_optimization(landmarks_short, max_walking_time, True)
# First stage using NetworkX # First stage using NetworkX
base_tour = generate_path2(landmarks_short, max_walking_time, max_landmarks) base_tour = generate_path(landmarks_short, max_walking_time, max_landmarks)
# Second stage using linear optimization # Second stage using linear optimization
#refined_tour = refine_optimization(landmarks, base_tour, max_walking_time+detour, True) refined_tour = refine_optimization(landmarks, base_tour, max_walking_time+detour, True)
# Second stage using NetworkX
#refined_tour = refine_path(landmarks, base_tour, max_walking_time+detour, True)
# Use NetworkX again to correct to shortest path # Use NetworkX again to correct to shortest path
refined_tour = refine_path(landmarks, base_tour, max_walking_time+detour, True) #refined_tour = refine_path(landmarks, base_tour, max_walking_time+detour, True)
return base_tour return refined_tour
#test4(tuple((48.8344400, 2.3220540))) # Café Chez César #test4(tuple((48.8344400, 2.3220540))) # Café Chez César

3192
landmarks_Lyon.txt Normal file

File diff suppressed because it is too large Load Diff