Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 1m57s
Run linting on the backend code / Build (pull_request) Successful in 28s
Run testing on the backend code / Build (pull_request) Failing after 53s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 24s
375 lines
15 KiB
Python
375 lines
15 KiB
Python
"""Allows to refine the tour by adding more landmarks and making the path easier to follow."""
|
|
import logging
|
|
from math import pi
|
|
import yaml
|
|
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
|
|
|
|
from ..structs.landmark import Landmark
|
|
from ..utils.get_time_distance import get_time
|
|
from ..utils.take_most_important import take_most_important
|
|
from .optimizer import Optimizer
|
|
from ..constants import OPTIMIZER_PARAMETERS_PATH
|
|
|
|
|
|
|
|
class Refiner :
|
|
"""
|
|
Refines a tour by incorporating smaller landmarks along the path to enhance the experience.
|
|
|
|
This class is designed to adjust an existing tour by considering additional,
|
|
smaller points of interest (landmarks) that may require minor detours but
|
|
improve the overall quality of the tour. It balances the efficiency of travel
|
|
with the added value of visiting these landmarks.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
detour_factor: float # detour factor of straight line vs real distance in cities
|
|
detour_corridor_width: float # width of the corridor around the path
|
|
average_walking_speed: float # average walking speed of adult
|
|
max_landmarks_refiner: int # max number of landmarks to visit
|
|
optimizer: Optimizer # optimizer object
|
|
|
|
def __init__(self, optimizer: Optimizer) :
|
|
self.optimizer = optimizer
|
|
|
|
# load parameters from file
|
|
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
|
|
parameters = yaml.safe_load(f)
|
|
self.detour_factor = parameters['detour_factor']
|
|
self.detour_corridor_width = parameters['detour_corridor_width']
|
|
self.average_walking_speed = parameters['average_walking_speed']
|
|
self.max_landmarks_refiner = parameters['max_landmarks_refiner']
|
|
|
|
|
|
def create_corridor(self, landmarks: list[Landmark], width: float) :
|
|
"""
|
|
Create a corridor around the path connecting the landmarks.
|
|
|
|
Args:
|
|
landmarks (list[Landmark]) : the landmark path around which to create the corridor
|
|
width (float) : width of the corridor in meters.
|
|
|
|
Returns:
|
|
Geometry: a buffered geometry object representing the corridor around the path.
|
|
"""
|
|
|
|
corrected_width = (180*width)/(6371000*pi)
|
|
|
|
path = self.create_linestring(landmarks)
|
|
obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2)
|
|
|
|
return obj
|
|
|
|
|
|
def create_linestring(self, tour: list[Landmark]) -> LineString :
|
|
"""
|
|
Create a `LineString` object from a tour.
|
|
|
|
Args:
|
|
tour (list[Landmark]): An ordered sequence of landmarks that represents the visiting order.
|
|
|
|
Returns:
|
|
LineString: A `LineString` object representing the path through the landmarks.
|
|
"""
|
|
|
|
points = []
|
|
for landmark in tour :
|
|
points.append(Point(landmark.location))
|
|
|
|
return LineString(points)
|
|
|
|
|
|
# Check if some coordinates are in area. Used for the corridor
|
|
def is_in_area(self, area: Polygon, coordinates) -> bool :
|
|
"""
|
|
Check if a given point is within a specified area.
|
|
|
|
Args:
|
|
area (Polygon): The polygon defining the area.
|
|
coordinates (tuple[float, float]): The coordinates of the point to check.
|
|
|
|
Returns:
|
|
bool: True if the point is within the area, otherwise False.
|
|
"""
|
|
point = Point(coordinates)
|
|
return point.within(area)
|
|
|
|
|
|
# Function to determine if two landmarks are close to each other
|
|
def is_close_to(self, location1: tuple[float], location2: tuple[float]):
|
|
"""
|
|
Determine if two locations are close to each other by rounding their coordinates to 3 decimal places.
|
|
|
|
Args:
|
|
location1 (tuple[float, float]): The coordinates of the first location.
|
|
location2 (tuple[float, float]): The coordinates of the second location.
|
|
|
|
Returns:
|
|
bool: True if the locations are within 0.001 degrees of each other, otherwise False.
|
|
"""
|
|
|
|
absx = abs(location1[0] - location2[0])
|
|
absy = abs(location1[1] - location2[1])
|
|
|
|
return absx < 0.001 and absy < 0.001
|
|
#return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3))
|
|
|
|
|
|
def rearrange(self, tour: list[Landmark]) -> list[Landmark]:
|
|
"""
|
|
Rearrange landmarks to group nearby visits together.
|
|
|
|
This function reorders landmarks so that nearby landmarks are adjacent to each other in the list,
|
|
while keeping 'start' and 'finish' landmarks in their original positions.
|
|
|
|
Args:
|
|
tour (list[Landmark]): Ordered list of landmarks to be rearranged.
|
|
|
|
Returns:
|
|
list[Landmark]: The rearranged list of landmarks with grouped nearby visits.
|
|
"""
|
|
|
|
i = 1
|
|
while i < len(tour):
|
|
j = i+1
|
|
while j < len(tour):
|
|
if self.is_close_to(tour[i].location, tour[j].location) and tour[i].name not in ['start', 'finish'] and tour[j].name not in ['start', 'finish']:
|
|
# If they are not adjacent, move the j-th element to be adjacent to the i-th element
|
|
if j != i + 1:
|
|
tour.insert(i + 1, tour.pop(j))
|
|
break # Move to the next i-th element after rearrangement
|
|
j += 1
|
|
i += 1
|
|
|
|
return tour
|
|
|
|
def integrate_landmarks(self, sub_list: list[Landmark], main_list: list[Landmark]) :
|
|
"""
|
|
Inserts 'sub_list' of Landmarks inside the 'main_list' by leaving the ends untouched.
|
|
|
|
Args:
|
|
sub_list : the list of Landmarks to be inserted inside of the 'main_list'.
|
|
main_list : the original list with start and finish.
|
|
|
|
Returns:
|
|
the full list.
|
|
"""
|
|
sub_list.append(main_list[-1]) # add finish back
|
|
return main_list[:-1] + sub_list # create full set of possible landmarks
|
|
|
|
|
|
|
|
def find_shortest_path_through_all_landmarks(self, landmarks: list[Landmark]) -> tuple[list[Landmark], Polygon]:
|
|
"""
|
|
Find the shortest path through all landmarks using a nearest neighbor heuristic.
|
|
|
|
This function constructs a path that starts from the 'start' landmark, visits all other landmarks in the order
|
|
of their proximity, and ends at the 'finish' landmark. It returns both the ordered list of landmarks and a
|
|
polygon representing the path.
|
|
|
|
Args:
|
|
landmarks (list[Landmark]): list of all landmarks including 'start' and 'finish'.
|
|
|
|
Returns:
|
|
tuple[list[Landmark], Polygon]: A tuple where the first element is the list of landmarks in the order they
|
|
should be visited, and the second element is a `Polygon` representing
|
|
the path connecting all landmarks.
|
|
"""
|
|
|
|
# Step 1: Find 'start' and 'finish' landmarks
|
|
start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start')
|
|
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish')
|
|
|
|
start_landmark = landmarks[start_idx]
|
|
finish_landmark = landmarks[finish_idx]
|
|
|
|
|
|
# Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish'
|
|
unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]]
|
|
|
|
# Step 3: Initialize the path with the 'start' landmark
|
|
path = [start_landmark]
|
|
coordinates = [landmarks[start_idx].location]
|
|
|
|
current_landmark = start_landmark
|
|
|
|
# Step 4: Use nearest neighbor heuristic to visit all landmarks
|
|
while unvisited_landmarks:
|
|
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location))
|
|
path.append(nearest_landmark)
|
|
coordinates.append(nearest_landmark.location)
|
|
current_landmark = nearest_landmark
|
|
unvisited_landmarks.remove(nearest_landmark)
|
|
|
|
# Step 5: Finally add the 'finish' landmark to the path
|
|
path.append(finish_landmark)
|
|
coordinates.append(landmarks[finish_idx].location)
|
|
|
|
path_poly = Polygon(coordinates)
|
|
|
|
return path, path_poly
|
|
|
|
|
|
# Returns a list of minor landmarks around the planned path to enhance experience
|
|
def get_minor_landmarks(self, all_landmarks: list[Landmark], visited_landmarks: list[Landmark], width: float) -> list[Landmark] :
|
|
"""
|
|
Identify landmarks within a specified corridor that have not been visited yet.
|
|
|
|
This function creates a corridor around the path defined by visited landmarks and then finds landmarks that fall
|
|
within this corridor. It returns a list of these landmarks, excluding those already visited, sorted by their importance.
|
|
|
|
Args:
|
|
all_landmarks (list[Landmark]): list of all available landmarks.
|
|
visited_landmarks (list[Landmark]): list of landmarks that have already been visited.
|
|
width (float): Width of the corridor around the visited landmarks.
|
|
|
|
Returns:
|
|
list[Landmark]: list of important landmarks within the corridor that have not been visited yet.
|
|
"""
|
|
|
|
second_order_landmarks = []
|
|
visited_names = []
|
|
area = self.create_corridor(visited_landmarks, width)
|
|
|
|
for visited in visited_landmarks :
|
|
visited_names.append(visited.name)
|
|
|
|
for landmark in all_landmarks :
|
|
if self.is_in_area(area, landmark.location) and landmark.name not in visited_names:
|
|
second_order_landmarks.append(landmark)
|
|
|
|
return take_most_important(second_order_landmarks, int(self.max_landmarks_refiner*0.75))
|
|
|
|
|
|
# Try fix the shortest path using shapely
|
|
def fix_using_polygon(self, tour: list[Landmark])-> list[Landmark] :
|
|
"""
|
|
Improve the tour path using geometric methods to ensure it follows a more optimal shape.
|
|
|
|
This function creates a polygon from the given tour and attempts to refine it using a concave hull. It reorders
|
|
the landmarks to fit within this refined polygon and adjusts the tour to ensure the 'start' landmark is at the
|
|
beginning. It also checks if the final polygon is simple and rearranges the tour if necessary.
|
|
|
|
Args:
|
|
tour (list[Landmark]): list of landmarks representing the current tour path.
|
|
|
|
Returns:
|
|
list[Landmark]: Refined list of landmarks in the order of visit to produce a better tour path.
|
|
"""
|
|
|
|
coords = []
|
|
coords_dict = {}
|
|
for landmark in tour :
|
|
coords.append(landmark.location)
|
|
if landmark.name != 'finish' :
|
|
coords_dict[landmark.location] = landmark
|
|
|
|
tour_poly = Polygon(coords)
|
|
|
|
better_tour_poly = tour_poly.buffer(0)
|
|
try :
|
|
xs, ys = better_tour_poly.exterior.xy
|
|
|
|
if len(xs) != len(tour) :
|
|
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
|
|
xs, ys = better_tour_poly.exterior.xy
|
|
|
|
except Exception:
|
|
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
|
|
xs, ys = better_tour_poly.exterior.xy
|
|
"""
|
|
ERROR HERE :
|
|
Exception has occurred: AttributeError
|
|
'LineString' object has no attribute 'exterior'
|
|
"""
|
|
|
|
|
|
# reverse the xs and ys
|
|
xs.reverse()
|
|
ys.reverse()
|
|
|
|
better_tour = [] # list of ordered visit
|
|
name_index = {} # Maps the name of a landmark to its index in the concave polygon
|
|
|
|
# Loop through the polygon and generate the better (ordered) tour
|
|
for i,x in enumerate(xs[:-1]) :
|
|
y = ys[i]
|
|
better_tour.append(coords_dict[tuple((x,y))])
|
|
name_index[coords_dict[tuple((x,y))].name] = i
|
|
|
|
|
|
# Scroll the list to have start in front again
|
|
start_index = name_index['start']
|
|
better_tour = better_tour[start_index:] + better_tour[:start_index]
|
|
|
|
# Append the finish back and correct the time to reach
|
|
better_tour.append(tour[-1])
|
|
|
|
# Rearrange only if polygon still not simple
|
|
if not better_tour_poly.is_simple :
|
|
better_tour = self.rearrange(better_tour)
|
|
|
|
return better_tour
|
|
|
|
|
|
def refine_optimization(
|
|
self,
|
|
all_landmarks: list[Landmark],
|
|
base_tour: list[Landmark],
|
|
max_time: int,
|
|
detour: int
|
|
) -> list[Landmark]:
|
|
"""
|
|
This is the second stage of the optimization. It refines the initial tour path by considering additional minor landmarks and optimizes the path.
|
|
|
|
This method evaluates the need for further optimization based on the initial tour. If a detour is required
|
|
it adds minor landmarks around the initial predicted path and solves a new optimization problem to find a potentially better
|
|
tour. It then links the new tour and adjusts it using a nearest neighbor heuristic and polygon-based methods to
|
|
ensure a valid path. The final tour is chosen based on the shortest distance.
|
|
|
|
Args:
|
|
all_landmarks (list[Landmark]): The full list of landmarks available for the optimization.
|
|
base_tour (list[Landmark]): The initial tour path to be refined.
|
|
max_time (int): The maximum time available for the tour in minutes.
|
|
detour (int): The maximum detour time allowed for the tour in minutes.
|
|
Returns:
|
|
list[Landmark]: The refined list of landmarks representing the optimized tour path.
|
|
"""
|
|
|
|
# No need to refine if no detour is taken
|
|
# if detour == 0:
|
|
# return base_tour
|
|
|
|
minor_landmarks = self.get_minor_landmarks(all_landmarks, base_tour, self.detour_corridor_width)
|
|
|
|
self.logger.debug(f"Using {len(minor_landmarks)} minor landmarks around the predicted path")
|
|
|
|
# Full set of visitable landmarks.
|
|
full_set = self.integrate_landmarks(minor_landmarks, base_tour) # could probably be optimized with less overhead
|
|
|
|
# Generate a new tour with the optimizer.
|
|
new_tour = self.optimizer.solve_optimization(
|
|
max_time = max_time + detour,
|
|
landmarks = full_set,
|
|
max_landmarks = self.max_landmarks_refiner
|
|
)
|
|
|
|
# If unsuccessful optimization, use the base_tour.
|
|
if new_tour is None:
|
|
self.logger.warning("No solution found for the refined tour. Returning the initial tour.")
|
|
new_tour = base_tour
|
|
|
|
# If only one landmark, return it.
|
|
if len(new_tour) < 4 :
|
|
return new_tour
|
|
|
|
# Find shortest path using the nearest neighbor heuristic.
|
|
better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour)
|
|
|
|
# Fix the tour using Polygons if the path looks weird.
|
|
# Conditions : circular trip and invalid polygon.
|
|
if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid :
|
|
better_tour = self.fix_using_polygon(better_tour)
|
|
|
|
return better_tour
|