anyway/backend/src/example_refiner.py
Remy Moll 7d7a25e2f3
All checks were successful
Build and push docker image / Build (pull_request) Successful in 2m50s
Build and release APK / Build APK (pull_request) Successful in 4m11s
Build web / Build Web (pull_request) Successful in 1m28s
stage changes as reference implementation
2024-07-17 12:35:08 +02:00

298 lines
9.9 KiB
Python

from collections import defaultdict
from heapq import heappop, heappush
from itertools import permutations
import os
import yaml
from shapely import buffer, LineString, Point, Polygon, MultiPoint, convex_hull, concave_hull, LinearRing
from typing import List, Tuple
from math import pi
from structs.landmarks import Landmark
from landmarks_manager import take_most_important
from backend.src.example_optimizer import solve_optimization, link_list_simple, print_res, get_distance
import constants
def create_corridor(landmarks: List[Landmark], width: float) :
corrected_width = (180*width)/(6371000*pi)
path = create_linestring(landmarks)
obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2)
return obj
def create_linestring(landmarks: List[Landmark])->List[Point] :
points = []
for landmark in landmarks :
points.append(Point(landmark.location))
return LineString(points)
def is_in_area(area: Polygon, coordinates) -> bool :
point = Point(coordinates)
return point.within(area)
def is_close_to(location1: Tuple[float], location2: Tuple[float]):
"""Determine if two locations are close by rounding their coordinates to 3 decimals."""
absx = abs(location1[0] - location2[0])
absy = abs(location1[1] - location2[1])
return absx < 0.001 and absy < 0.001
#return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3))
def rearrange(landmarks: List[Landmark]) -> List[Landmark]:
i = 1
while i < len(landmarks):
j = i+1
while j < len(landmarks):
if is_close_to(landmarks[i].location, landmarks[j].location) and landmarks[i].name not in ['start', 'finish'] and landmarks[j].name not in ['start', 'finish']:
# If they are not adjacent, move the j-th element to be adjacent to the i-th element
if j != i + 1:
landmarks.insert(i + 1, landmarks.pop(j))
break # Move to the next i-th element after rearrangement
j += 1
i += 1
return landmarks
"""
def find_shortest_path(landmarks: List[Landmark]) -> List[Landmark]:
# Read from data
with open (os.path.dirname(os.path.abspath(__file__)) + '/parameters/optimizer.params', "r") as f :
parameters = json.loads(f.read())
detour = parameters['detour factor']
speed = parameters['average walking speed']
# Step 1: Build the graph
graph = defaultdict(list)
for i in range(len(landmarks)):
for j in range(len(landmarks)):
if i != j:
distance = get_distance(landmarks[i].location, landmarks[j].location, detour, speed)[1]
graph[i].append((distance, j))
# Step 2: Dijkstra's algorithm to find the shortest path from start to finish
start_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'start')
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'finish')
distances = {i: float('inf') for i in range(len(landmarks))}
previous_nodes = {i: None for i in range(len(landmarks))}
distances[start_idx] = 0
priority_queue = [(0, start_idx)]
while priority_queue:
current_distance, current_index = heappop(priority_queue)
if current_distance > distances[current_index]:
continue
for neighbor_distance, neighbor_index in graph[current_index]:
distance = current_distance + neighbor_distance
if distance < distances[neighbor_index]:
distances[neighbor_index] = distance
previous_nodes[neighbor_index] = current_index
heappush(priority_queue, (distance, neighbor_index))
# Step 3: Backtrack from finish to start to find the path
path = []
current_index = finish_idx
while current_index is not None:
path.append(landmarks[current_index])
current_index = previous_nodes[current_index]
path.reverse()
return path
"""
"""
def total_path_distance(path: List[Landmark], detour, speed) -> float:
total_distance = 0
for i in range(len(path) - 1):
total_distance += get_distance(path[i].location, path[i + 1].location, detour, speed)[1]
return total_distance
"""
def find_shortest_path_through_all_landmarks(landmarks: List[Landmark]) -> List[Landmark]:
# Read the parameters from the file
with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
detour = parameters['detour_factor']
speed = parameters['average_walking_speed']
# Step 1: Find 'start' and 'finish' landmarks
start_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'start')
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.name == 'finish')
start_landmark = landmarks[start_idx]
finish_landmark = landmarks[finish_idx]
# Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish'
unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]]
# Step 3: Initialize the path with the 'start' landmark
path = [start_landmark]
coordinates = [landmarks[start_idx].location]
current_landmark = start_landmark
# Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_distance(current_landmark.location, lm.location, detour, speed)[1])
path.append(nearest_landmark)
coordinates.append(nearest_landmark.location)
current_landmark = nearest_landmark
unvisited_landmarks.remove(nearest_landmark)
# Step 5: Finally add the 'finish' landmark to the path
path.append(finish_landmark)
coordinates.append(landmarks[finish_idx].location)
path_poly = Polygon(coordinates)
return path, path_poly
def get_minor_landmarks(all_landmarks: List[Landmark], visited_landmarks: List[Landmark], width: float) -> List[Landmark] :
second_order_landmarks = []
visited_names = []
area = create_corridor(visited_landmarks, width)
for visited in visited_landmarks :
visited_names.append(visited.name)
for landmark in all_landmarks :
if is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
with constants.LANDMARK_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
return take_most_important(second_order_landmarks, parameters, len(visited_landmarks))
"""def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] :
minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200)
if print_infos : print("There are " + str(len(minor_landmarks)) + " minor landmarks around the predicted path")
full_set = base_tour[:-1] + minor_landmarks # create full set of possible landmarks (without finish)
full_set.append(base_tour[-1]) # add finish back
new_tour = solve_optimization(full_set, max_time, print_infos)
return new_tour"""
def refine_optimization(landmarks: List[Landmark], base_tour: List[Landmark], max_time: int, print_infos: bool) -> List[Landmark] :
# Read the parameters from the file
with constants.OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
max_landmarks = parameters['max_landmarks']
if len(base_tour)-2 >= max_landmarks :
return base_tour
minor_landmarks = get_minor_landmarks(landmarks, base_tour, 200)
if print_infos : print("Using " + str(len(minor_landmarks)) + " minor landmarks around the predicted path")
# full set of visitable landmarks
full_set = base_tour[:-1] + minor_landmarks # create full set of possible landmarks (without finish)
full_set.append(base_tour[-1]) # add finish back
# get a new tour
new_tour = solve_optimization(full_set, max_time, False)
new_tour, new_dist = link_list_simple(new_tour)
"""#if base_tour[0].location == base_tour[-1].location :
if False :
coords = [] # Coordinates of the new tour
coords_dict = {} # maps the location of an element to the element itself. Used to access the elements back once we get the geometry
# Iterate through the new tour without finish
for elem in new_tour[:-1] :
coords.append(Point(elem.location))
coords_dict[elem.location] = elem # if start = goal, only finish remains
# Create a concave polygon using the coordinates
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
# reverse the xs and ys
xs.reverse()
ys.reverse()
better_tour = [] # List of ordered visit
name_index = {} # Maps the name of a landmark to its index in the concave polygon
# Loop through the polygon and generate the better (ordered) tour
for i,x in enumerate(xs[:-1]) :
better_tour.append(coords_dict[tuple((x,ys[i]))])
name_index[coords_dict[tuple((x,ys[i]))].name] = i
# Scroll the list to have start in front again
start_index = name_index['start']
better_tour = better_tour[start_index:] + better_tour[:start_index]
# Append the finish back and correct the time to reach
better_tour.append(new_tour[-1])
# Rearrange only if polygon
better_tour = rearrange(better_tour)
# Add the time to reach
better_tour = add_time_to_reach_simple(better_tour)
"""
"""
if not better_poly.is_simple :
coords_dict = {}
better_tour2 = []
for elem in better_tour :
coords_dict[elem.location] = elem
better_poly2 = better_poly.buffer(0)
new_coords = better_poly2.exterior.coords[:]
start_coords = base_tour[0].location
start_index = new_coords.
#for point in new_coords :
"""
better_tour, better_poly = find_shortest_path_through_all_landmarks(new_tour)
better_tour, better_dist = link_list_simple(better_tour)
if new_dist < better_dist :
final_tour = new_tour
else :
final_tour = better_tour
if print_infos :
print("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")
print("\nRefined tour (result of second stage optimization): ")
print_res(final_tour, len(full_set))
return final_tour