Added
This commit is contained in:
@@ -344,6 +344,8 @@ def link_list_simple(ordered_visit: List[Landmark])-> List[Landmark] :
|
||||
elem.next_uuid = next.uuid
|
||||
d = get_distance(elem.location, next.location, detour_factor, speed)[1]
|
||||
elem.time_to_reach_next = d
|
||||
if elem.name not in ['start', 'finish'] :
|
||||
elem.must_do = True
|
||||
L.append(elem)
|
||||
j += 1
|
||||
total_dist += d
|
||||
|
260
backend/src/optimizer_v2.py
Normal file
260
backend/src/optimizer_v2.py
Normal file
@@ -0,0 +1,260 @@
|
||||
import networkx as nx
|
||||
from typing import List, Tuple
|
||||
from geopy.distance import geodesic
|
||||
from scipy.spatial import KDTree
|
||||
import numpy as np
|
||||
from itertools import combinations
|
||||
from structs.landmarks import Landmark
|
||||
from optimizer import print_res, link_list_simple
|
||||
import os
|
||||
import json
|
||||
import heapq
|
||||
|
||||
|
||||
# Define the get_distance function
|
||||
def get_distance(loc1: Tuple[float, float], loc2: Tuple[float, float], detour: float, speed: float) -> Tuple[float, float]:
|
||||
# Placeholder implementation, should be replaced with the actual logic
|
||||
distance = geodesic(loc1, loc2).meters
|
||||
return distance, distance * detour / speed
|
||||
|
||||
# Heuristic function: distance to the goal
|
||||
def heuristic(loc1: Tuple[float, float], loc2: Tuple[float, float]) -> float:
|
||||
return geodesic(loc1, loc2).meters
|
||||
|
||||
|
||||
def a_star(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed):
|
||||
open_set = []
|
||||
heapq.heappush(open_set, (0, start_id, 0, [start_id], set([start_id])))
|
||||
best_path = None
|
||||
max_attractiveness = 0
|
||||
visited_must_do = set()
|
||||
|
||||
while open_set:
|
||||
_, current_node, current_length, path, visited = heapq.heappop(open_set)
|
||||
|
||||
# If current node is a must_do node and hasn't been visited yet, mark it as visited
|
||||
if current_node in must_do_nodes and current_node not in visited_must_do:
|
||||
visited_must_do.add(current_node)
|
||||
|
||||
# Check if path includes all must_do nodes and reaches the end
|
||||
if current_node == end_id and all(node in visited for node in must_do_nodes):
|
||||
attractiveness = sum(G.nodes[node]['weight'] for node in path)
|
||||
if attractiveness > max_attractiveness:
|
||||
best_path = path
|
||||
max_attractiveness = attractiveness
|
||||
continue
|
||||
|
||||
if len(path) > max_landmarks + 1:
|
||||
continue
|
||||
|
||||
for neighbor in G.neighbors(current_node):
|
||||
if neighbor not in visited:
|
||||
distance = int(geodesic(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos']).meters * detour / (speed * 16.6666))
|
||||
if current_length + distance <= max_walking_time:
|
||||
new_path = path + [neighbor]
|
||||
new_visited = visited | {neighbor}
|
||||
estimated_cost = current_length + distance + heuristic(G.nodes[neighbor]['pos'], G.nodes[end_id]['pos'])
|
||||
heapq.heappush(open_set, (estimated_cost, neighbor, current_length + distance, new_path, new_visited))
|
||||
|
||||
# Check if all must_do_nodes have been visited
|
||||
if all(node in visited_must_do for node in must_do_nodes):
|
||||
return best_path, max_attractiveness
|
||||
else:
|
||||
return None, 0
|
||||
|
||||
|
||||
|
||||
def dfs(G, current_node, end_id, current_length, path, visited, max_walking_time, must_do_nodes, max_landmarks, detour, speed):
|
||||
# If the path includes all must_do nodes and reaches the end
|
||||
if current_node == end_id and all(node in path for node in must_do_nodes):
|
||||
return path, sum(G.nodes[node]['weight'] for node in path)
|
||||
|
||||
# If the number of landmarks exceeds the maximum allowed, return None
|
||||
if len(path) > max_landmarks+1:
|
||||
return None, 0
|
||||
|
||||
best_path = None
|
||||
max_attractiveness = 0
|
||||
|
||||
for neighbor in G.neighbors(current_node):
|
||||
if neighbor not in visited:
|
||||
distance = int(geodesic(G.nodes[current_node]['pos'], G.nodes[neighbor]['pos']).meters * detour / (speed*16.6666))
|
||||
if current_length + distance <= max_walking_time:
|
||||
new_path = path + [neighbor]
|
||||
new_visited = visited | {neighbor}
|
||||
result_path, attractiveness = dfs(G, neighbor, end_id, current_length + distance, new_path, new_visited, max_walking_time, must_do_nodes, max_landmarks, detour, speed)
|
||||
if attractiveness > max_attractiveness:
|
||||
best_path = result_path
|
||||
max_attractiveness = attractiveness
|
||||
|
||||
return best_path, max_attractiveness
|
||||
|
||||
def find_path(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks) -> List[str]:
|
||||
|
||||
# Read the parameters from the file
|
||||
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
|
||||
parameters = json.loads(f.read())
|
||||
detour = parameters['detour factor']
|
||||
speed = parameters['average walking speed']
|
||||
|
||||
|
||||
"""if G[start_id]['pos'] == G[finish_id]['pos'] :
|
||||
best_path, _ = dfs(G, start_id, finish_id, 0, [start_id], {start_id}, max_walking_time, must_do_nodes, max_landmarks, detour, speed)
|
||||
else :"""
|
||||
best_path, _ = a_star(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks, detour, speed)
|
||||
|
||||
return best_path if best_path else []
|
||||
|
||||
|
||||
# Function to dynamically adjust theta
|
||||
def adjust_theta(num_nodes, theta_opt, target_ratio=2.0):
|
||||
# Start with an initial guess
|
||||
initial_theta = theta_opt
|
||||
# Adjust theta to aim for the target ratio of edges to nodes
|
||||
return initial_theta / (num_nodes ** (1 / target_ratio))
|
||||
|
||||
|
||||
# Create a graph using NetworkX and generate the path
|
||||
def generate_path(landmarks: List[Landmark], max_walking_time: float, max_landmarks: int, theta_opt = 0.0008) -> List[List[Landmark]]:
|
||||
|
||||
landmap = {}
|
||||
pos_dict = {}
|
||||
weight_dict = {}
|
||||
# Add nodes to the graph with attractiveness
|
||||
for i, landmark in enumerate(landmarks):
|
||||
#G.nodes[i]['attractiveness'] = landmark.attractiveness
|
||||
pos_dict[i] = landmark.location
|
||||
weight_dict[i] = landmark.attractiveness
|
||||
#G.nodes[i]['pos'] = landmark.location
|
||||
landmap[i] = landmark
|
||||
if landmark.name == 'start' :
|
||||
start_id = i
|
||||
elif landmark.name == 'finish' :
|
||||
end_id = i
|
||||
|
||||
# Lambda version of get_distance
|
||||
get_dist = lambda loc1, loc2: geodesic(loc1, loc2).meters + 0.001 #.meters*detour/speed +0.0000001
|
||||
|
||||
theta = adjust_theta(len(landmarks), theta_opt)
|
||||
G = nx.geographical_threshold_graph(n=len(landmarks), theta=theta, pos=pos_dict, weight=weight_dict, metric=get_dist)
|
||||
|
||||
# good theta : 0.000125
|
||||
# Define must_do nodes
|
||||
must_do_nodes = [i for i in G.nodes() if landmap[i].must_do]
|
||||
|
||||
for node1, node2 in combinations(must_do_nodes, 2):
|
||||
if not G.has_edge(node1, node2):
|
||||
distance = geodesic(G.nodes[node1]['pos'], G.nodes[node2]['pos']).meters + 0.001
|
||||
G.add_edge(node1, node2, weight=distance)
|
||||
|
||||
print(f"Graph with {G.number_of_nodes()} nodes")
|
||||
print(f"Graph with {G.number_of_edges()} edges")
|
||||
print("Computing path...")
|
||||
|
||||
# Find the valid path using the greedy algorithm
|
||||
valid_path = find_path(G, start_id, end_id, max_walking_time, must_do_nodes, max_landmarks)
|
||||
|
||||
if not valid_path:
|
||||
return [] # No valid path found
|
||||
|
||||
lis = [landmap[id] for id in valid_path]
|
||||
|
||||
lis, tot_dist = link_list_simple(lis)
|
||||
|
||||
print_res(lis, len(landmarks))
|
||||
|
||||
|
||||
return lis
|
||||
|
||||
|
||||
# Create a graph using NetworkX and generate the path
|
||||
def generate_path2(landmarks: List[Landmark], max_walking_time: float, max_landmarks: int) -> List[List[Landmark]]:
|
||||
|
||||
|
||||
# Read the parameters from the file
|
||||
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
|
||||
parameters = json.loads(f.read())
|
||||
detour = parameters['detour factor']
|
||||
speed = parameters['average walking speed']
|
||||
|
||||
|
||||
landmap = {}
|
||||
pos_dict = {}
|
||||
weight_dict = {}
|
||||
G = nx.Graph()
|
||||
# Add nodes to the graph with attractiveness
|
||||
for i, landmark in enumerate(landmarks):
|
||||
pos_dict[i] = landmark.location
|
||||
weight_dict[i] = landmark.attractiveness
|
||||
landmap[i] = landmark
|
||||
G.add_node(i, pos=landmark.location, weight=landmark.attractiveness)
|
||||
if landmark.name == 'start' :
|
||||
start_id = i
|
||||
elif landmark.name == 'finish' :
|
||||
finish_id = i
|
||||
|
||||
"""# If start and finish are the same no need to add another node
|
||||
if pos_dict[finish_id] == pos_dict[start_id] :
|
||||
end_id = start_id
|
||||
else :
|
||||
G.add_node(finish_id, pos=pos_dict[finish_id], weight=weight_dict[finish_id])
|
||||
end_id = finish_id"""
|
||||
|
||||
# Lambda version of get_distance
|
||||
#get_dist = lambda loc1, loc2: geodesic(loc1, loc2).meters + 0.001 #.meters*detour/speed +0.0000001
|
||||
|
||||
coords = np.array(list(pos_dict.values()))
|
||||
kdtree = KDTree(coords)
|
||||
|
||||
k = 4
|
||||
for node, coord in pos_dict.items():
|
||||
indices = kdtree.query(coord, k + 1)[1] # k+1 because the closest neighbor is the node itself
|
||||
for idx in indices[1:]: # skip the first one (itself)
|
||||
neighbor = list(pos_dict.keys())[idx]
|
||||
distance = get_distance(coord, pos_dict[neighbor], detour, speed)
|
||||
G.add_edge(node, neighbor, weight=distance)
|
||||
|
||||
|
||||
# Define must_do nodes
|
||||
must_do_nodes = [i for i in G.nodes() if landmap[i].must_do]
|
||||
|
||||
# Add special edges between must_do nodes
|
||||
if len(must_do_nodes) > 0 :
|
||||
for node1, node2 in combinations(must_do_nodes, 2):
|
||||
if not G.has_edge(node1, node2):
|
||||
distance = get_distance(G.nodes[node1]['pos'], G.nodes[node2]['pos'], detour, speed)
|
||||
G.add_edge(node1, node2, weight=distance)
|
||||
|
||||
print(f"Graph with {G.number_of_nodes()} nodes")
|
||||
print(f"Graph with {G.number_of_edges()} edges")
|
||||
print("Computing path...")
|
||||
|
||||
# Find the valid path using the greedy algorithm
|
||||
valid_path = find_path(G, start_id, finish_id, max_walking_time, must_do_nodes, max_landmarks)
|
||||
|
||||
if not valid_path:
|
||||
return [] # No valid path found
|
||||
|
||||
lis = [landmap[id] for id in valid_path]
|
||||
|
||||
lis, tot_dist = link_list_simple(lis)
|
||||
|
||||
print_res(lis, len(landmarks))
|
||||
|
||||
|
||||
return lis
|
||||
|
||||
|
||||
|
||||
|
||||
def correct_path(tour: List[Landmark]) -> List[Landmark] :
|
||||
|
||||
coords = []
|
||||
for landmark in tour :
|
||||
coords.append(landmark.location)
|
||||
|
||||
G = nx.circulant_graph(n=len(tour), create_using=coords)
|
||||
|
||||
path = nx.shortest_path(G=G, source=tour[0].location, target=tour[-1].location)
|
||||
|
||||
return path
|
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"city bbox side" : 10,
|
||||
"city bbox side" : 3,
|
||||
"radius close to" : 27.5,
|
||||
"church coeff" : 0.6,
|
||||
"church coeff" : 0.7,
|
||||
"park coeff" : 1.5,
|
||||
"tag coeff" : 100,
|
||||
"N important" : 40
|
||||
|
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"detour factor" : 1.4,
|
||||
"average walking speed" : 4.8,
|
||||
"max landmarks" : 10
|
||||
"max landmarks" : 8
|
||||
}
|
@@ -10,6 +10,7 @@ from math import pi
|
||||
from structs.landmarks import Landmark
|
||||
from landmarks_manager import take_most_important
|
||||
from optimizer import solve_optimization, link_list_simple, print_res, get_distance
|
||||
from optimizer_v2 import generate_path, generate_path2
|
||||
|
||||
|
||||
def create_corridor(landmarks: List[Landmark], width: float) :
|
||||
@@ -62,65 +63,6 @@ def rearrange(landmarks: List[Landmark]) -> List[Landmark]:
|
||||
|
||||
return landmarks
|
||||
|
||||
"""
|
||||
def find_shortest_path(landmarks: List[Landmark]) -> List[Landmark]:
|
||||
|
||||
# Read from data
|
||||
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
|
||||
parameters = json.loads(f.read())
|
||||
detour = parameters['detour factor']
|
||||
speed = parameters['average walking speed']
|
||||
|
||||
# Step 1: Build the graph
|
||||
graph = defaultdict(list)
|
||||
for i in range(len(landmarks)):
|
||||
for j in range(len(landmarks)):
|
||||
if i != j:
|
||||
distance = get_distance(landmarks[i].location, landmarks[j].location, detour, speed)[1]
|
||||
graph[i].append((distance, j))
|
||||
|
||||
# Step 2: Dijkstra's algorithm to find the shortest path from start to finish
|
||||
start_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'start')
|
||||
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'finish')
|
||||
|
||||
distances = {i: float('inf') for i in range(len(landmarks))}
|
||||
previous_nodes = {i: None for i in range(len(landmarks))}
|
||||
distances[start_idx] = 0
|
||||
priority_queue = [(0, start_idx)]
|
||||
|
||||
while priority_queue:
|
||||
current_distance, current_index = heappop(priority_queue)
|
||||
|
||||
if current_distance > distances[current_index]:
|
||||
continue
|
||||
|
||||
for neighbor_distance, neighbor_index in graph[current_index]:
|
||||
distance = current_distance + neighbor_distance
|
||||
|
||||
if distance < distances[neighbor_index]:
|
||||
distances[neighbor_index] = distance
|
||||
previous_nodes[neighbor_index] = current_index
|
||||
heappush(priority_queue, (distance, neighbor_index))
|
||||
|
||||
# Step 3: Backtrack from finish to start to find the path
|
||||
path = []
|
||||
current_index = finish_idx
|
||||
while current_index is not None:
|
||||
path.append(landmarks[current_index])
|
||||
current_index = previous_nodes[current_index]
|
||||
path.reverse()
|
||||
|
||||
return path
|
||||
"""
|
||||
"""
|
||||
def total_path_distance(path: List[Landmark], detour, speed) -> float:
|
||||
total_distance = 0
|
||||
for i in range(len(path) - 1):
|
||||
total_distance += get_distance(path[i].location, path[i + 1].location, detour, speed)[1]
|
||||
return total_distance
|
||||
"""
|
||||
|
||||
|
||||
def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> List[Landmark]:
|
||||
|
||||
# Read from data
|
||||
@@ -178,6 +120,23 @@ def get_minor_landmarks(all_landmarks: List[Landmark], visited_landmarks: List[L
|
||||
return take_most_important(second_order_landmarks, len(visited_landmarks))
|
||||
|
||||
|
||||
def get_minor_landmarks2(all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] :
|
||||
|
||||
second_order_landmarks = []
|
||||
visited_names = []
|
||||
area = create_corridor(visited_landmarks, width)
|
||||
|
||||
for visited in visited_landmarks :
|
||||
visited_names.append(visited.name)
|
||||
|
||||
for landmark in all_landmarks :
|
||||
if is_in_area(area, landmark.location) and landmark.name not in visited_names:
|
||||
second_order_landmarks.append(landmark)
|
||||
|
||||
return take_most_important(second_order_landmarks, len(visited_landmarks))
|
||||
|
||||
|
||||
|
||||
|
||||
"""def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] :
|
||||
|
||||
@@ -198,7 +157,7 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma
|
||||
# Read from the file
|
||||
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
|
||||
parameters = json.loads(f.read())
|
||||
max_landmarks = parameters['max landmarks']
|
||||
max_landmarks = parameters['max landmarks'] + 4
|
||||
|
||||
if len(base_tour)-2 >= max_landmarks :
|
||||
return base_tour
|
||||
@@ -284,10 +243,37 @@ def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], ma
|
||||
final_tour = better_tour
|
||||
|
||||
if print_infos :
|
||||
print("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")
|
||||
print("\nRefined tour (result of second stage optimization): ")
|
||||
print("\n\n\nRefined tour (result of second stage optimization): ")
|
||||
print_res(final_tour, len(full_set))
|
||||
|
||||
|
||||
|
||||
return final_tour
|
||||
|
||||
|
||||
|
||||
def refine_path(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] :
|
||||
|
||||
print("\nRefining the base tour...")
|
||||
# Read from the file
|
||||
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
|
||||
parameters = json.loads(f.read())
|
||||
max_landmarks = parameters['max landmarks'] + 4
|
||||
|
||||
"""if len(base_tour)-2 >= max_landmarks :
|
||||
return base_tour"""
|
||||
|
||||
minor_landmarks = get_minor_landmarks2(landmarks, base_tour, 200)
|
||||
|
||||
if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path")
|
||||
|
||||
full_set = base_tour + minor_landmarks # create full set of possible landmarks
|
||||
|
||||
print("\nRefined tour (result of second stage optimization): ")
|
||||
|
||||
new_path = generate_path2(full_set, max_time, max_landmarks)
|
||||
|
||||
return new_path
|
||||
|
||||
|
||||
|
||||
|
@@ -1,11 +1,14 @@
|
||||
import pandas as pd
|
||||
import os
|
||||
import json
|
||||
|
||||
from typing import List
|
||||
from landmarks_manager import generate_landmarks
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
|
||||
from optimizer import solve_optimization
|
||||
from refiner import refine_optimization
|
||||
from optimizer_v2 import generate_path, generate_path2
|
||||
from refiner import refine_optimization, refine_path
|
||||
from structs.landmarks import Landmark
|
||||
from structs.landmarktype import LandmarkType
|
||||
from structs.preferences import Preferences, Preference
|
||||
@@ -82,8 +85,8 @@ def test4(coordinates: tuple[float, float]) -> List[Landmark]:
|
||||
|
||||
|
||||
# Create start and finish
|
||||
start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=coordinates, osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
|
||||
finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=coordinates, osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
|
||||
start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=coordinates, osm_type='start', osm_id=0, attractiveness=0, must_do=False, n_tags = 0)
|
||||
finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=coordinates, osm_type='finish', osm_id=0, attractiveness=0, must_do=False, n_tags = 0)
|
||||
#finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.8777055, 2.3640967), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
|
||||
#start = Landmark(name='start', type=LandmarkType(landmark_type='start'), location=(48.847132, 2.312359), osm_type='start', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
|
||||
#finish = Landmark(name='finish', type=LandmarkType(landmark_type='finish'), location=(48.843185, 2.344533), osm_type='finish', osm_id=0, attractiveness=0, must_do=True, n_tags = 0)
|
||||
@@ -98,19 +101,31 @@ def test4(coordinates: tuple[float, float]) -> List[Landmark]:
|
||||
landmarks_short.append(finish)
|
||||
|
||||
# TODO use these parameters in another way
|
||||
max_walking_time = 2 # hours
|
||||
detour = 30 # minutes
|
||||
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
|
||||
parameters = json.loads(f.read())
|
||||
max_landmarks = parameters['max landmarks']
|
||||
max_walking_time = 45 # minutes
|
||||
detour = 10 # minutes
|
||||
|
||||
|
||||
# First stage optimization
|
||||
base_tour = solve_optimization(landmarks_short, max_walking_time*60, True)
|
||||
|
||||
# Second stage optimization
|
||||
refined_tour = refine_optimization(landmarks, base_tour, max_walking_time*60+detour, True)
|
||||
|
||||
return refined_tour
|
||||
#base_tour = solve_optimization(landmarks_short, max_walking_time*60, True)
|
||||
|
||||
|
||||
test4(tuple((48.8344400, 2.3220540))) # Café Chez César
|
||||
# First stage using NetworkX
|
||||
base_tour = generate_path2(landmarks_short, max_walking_time, max_landmarks)
|
||||
|
||||
# Second stage using linear optimization
|
||||
#refined_tour = refine_optimization(landmarks, base_tour, max_walking_time+detour, True)
|
||||
|
||||
# Use NetworkX again to correct to shortest path
|
||||
refined_tour = refine_path(landmarks, base_tour, max_walking_time+detour, True)
|
||||
|
||||
return base_tour
|
||||
|
||||
|
||||
#test4(tuple((48.8344400, 2.3220540))) # Café Chez César
|
||||
#test4(tuple((48.8375946, 2.2949904))) # Point random
|
||||
#test4(tuple((47.377859, 8.540585))) # Zurich HB
|
||||
test4(tuple((45.7576485, 4.8330241))) # Lyon Bellecour
|
||||
#test3('Vienna, Austria')
|
Reference in New Issue
Block a user