cleanup before prod
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 1m57s
Run linting on the backend code / Build (pull_request) Successful in 28s
Run testing on the backend code / Build (pull_request) Failing after 53s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 24s

This commit is contained in:
2025-01-23 12:53:01 +01:00
parent b30fa1f02e
commit 4818bde820
8 changed files with 148 additions and 1001 deletions

View File

@@ -1,606 +0,0 @@
"""Module responsible for sloving an MILP to find best tour around the given landmarks."""
import logging
from collections import defaultdict, deque
import yaml
import numpy as np
import pulp as pl
from ..structs.landmark import Landmark
from .get_time_distance import get_time
from ..constants import OPTIMIZER_PARAMETERS_PATH
# Silence the pupl logger
logging.getLogger('pulp').setLevel(level=logging.CRITICAL)
class Optimizer:
"""
Optimizes the balance between the efficiency of a tour and the inclusion of landmarks.
The `Optimizer` class is responsible for calculating the best possible detour adjustments
to a tour based on specific parameters such as detour time, walking speed, and the maximum
number of landmarks to visit. It helps refine a tour by determining whether adding additional
landmarks would significantly reduce the overall efficiency.
Responsibilities:
- Calculates the maximum detour time allowed for a given tour.
- Considers the detour factor, which accounts for real-world walking paths versus straight-line distance.
- Takes into account the average walking speed to estimate walking times.
- Limits the number of landmarks that can be added to the tour to prevent excessive detouring.
- Allows some overflow (overshoot) in the maximum detour time to accommodate for slight inefficiencies.
Attributes:
logger (logging.Logger): Logger for capturing relevant events and errors.
detour (int): The accepted maximum detour time in minutes.
detour_factor (float): The ratio between straight-line distance and actual walking distance in cities.
average_walking_speed (float): The average walking speed of an adult (in meters per second or kilometers per hour).
max_landmarks (int): The maximum number of landmarks to include in the tour.
overshoot (float): The overshoot allowance for exceeding the maximum detour time in a restrictive manner.
"""
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
detour_factor: float # detour factor of straight line vs real distance in cities
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
def __init__(self) :
# load parameters from file
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot']
def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function and inequality constraints for the linear program.
This function sets up the objective to maximize the attractiveness of visiting landmarks,
while ensuring that the total time (including travel and visit duration) does not exceed
the maximum allowed time. It calculates the pairwise travel times between landmarks and
incorporates visit duration to form the inequality constraints.
The objective is to maximize sightseeing by selecting the most attractive landmarks within
the time limit.
Args:
prob (pl.LpProblem): The linear programming problem where constraints and the objective will be added.
x (pl.LpVariable): A decision variable representing whether a landmark is visited.
L (int): The number of landmarks.
landmarks (list[Landmark]): List of landmarks to visit.
max_time (int): Maximum allowable time for sightseeing, including travel and visit duration.
Returns:
None: Adds the objective function and constraints to the LP problem directly.
constraint coefficients, and the right-hand side of the inequality constraint.
"""
L = len(landmarks)
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = np.zeros(L, dtype=np.int16)
# inequality matrix and vector
A_ub = np.zeros(L*L, dtype=np.int16)
b_ub = round(max_time*self.overshoot)
for i, spot1 in enumerate(landmarks) :
c[i] = spot1.attractiveness
for j in range(i+1, L) :
if i !=j :
t = get_time(spot1.location, landmarks[j].location)
A_ub[i*L + j] = t + spot1.duration
A_ub[j*L + i] = t + landmarks[j].duration
# Expand 'c' to L*L for every decision variable and ad
c = np.tile(c, L)
# Now sort and modify A_ub for each row
if L > 22 :
for i in range(L):
# Get indices of the 4 smallest values in row i
row_values = A_ub[i*L:i*L+L]
closest_indices = np.argpartition(row_values, 22)[:22]
# Create a mask for non-closest landmarks
mask = np.ones(L, dtype=bool)
mask[closest_indices] = False
# Set non-closest landmarks to 32765
row_values[mask] = 32765
A_ub[i*L:i*L+L] = row_values
# Add the objective and the 1 distance constraint
prob += pl.lpSum([c[j] * x[j] for j in range(L*L)])
prob += (pl.lpSum([A_ub[j] * x[j] for j in range(L*L)]) <= b_ub)
def respect_number(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited at most once and cap the total number of visited landmarks.
This function adds the following constraints to the linear program:
1. Each landmark is visited at most once by creating L-2 constraints (one for each landmark).
2. The total number of visited landmarks is capped by the specified maximum number (`max_landmarks`) plus 2.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable indicating whether a landmark is visited.
L (int): The total number of landmarks.
max_landmarks (int): The maximum number of landmarks that can be visited.
Returns:
None: This function directly modifies the `prob` object by adding constraints.
"""
# L-2 constraints: each landmark is visited exactly once
for i in range(1, L-1):
prob += (pl.lpSum([x[L*i + j] for j in range(L)]) <= 1)
# 1 constraint: cap the total number of visits
prob += (pl.lpSum([1 * x[j] for j in range(L*L)]) <= max_landmarks+2)
def break_sym(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to prevent simultaneous travel between two landmarks
in both directions. This constraint ensures that, for any pair of landmarks,
travel from landmark i to landmark j (dij) and travel from landmark j to landmark i (dji)
cannot happen simultaneously.
This method adds constraints to break symmetry, specifically to prevent
cyclic paths with only two elements. It does not prevent cyclic paths involving more than two elements.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding constraints in-place.
"""
upper_ind = np.triu_indices(L, 0, L) # Get the upper triangular indices
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
# Loop over the upper triangular indices, excluding diagonal elements
for i, up_ind in enumerate(up_ind_x):
if up_ind != up_ind_y[i]:
# Add (L*L-L)/2 constraints to break symmetry
prob += (x[up_ind*L + up_ind_y[i]] + x[up_ind_y[i]*L + up_ind] <= 1)
def init_eq_not_stay(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to prevent staying at the same position during travel.
Specifically, it removes travel from a landmark to itself (e.g., d11, d22, d33, etc.).
This function adds one equality constraint to the optimization problem that ensures
no decision variable corresponding to staying at the same landmark is included
in the solution. This helps in ensuring that the path does not include self-loops.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding an equality constraint in-place.
"""
A_eq = np.zeros((L, L), dtype=np.int8)
# Set diagonal elements to 1 (to prevent staying in the same position)
np.fill_diagonal(A_eq, 1)
A_eq = A_eq.flatten()
# First equality constraint
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 0)
def respect_start_finish(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated
start landmark and finishes at the goal landmark.
Specifically, this function adds three equality constraints:
1. Ensures that the path starts at the designated start landmark (row 0).
2. Ensures that the path finishes at the designated goal landmark (row 1).
3. Prevents any arrivals at the start landmark or departures from the goal landmark (row 2).
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding three equality constraints in-place.
"""
# Fill-in row 0.
A_eq = np.zeros((3,L*L), dtype=np.int8)
A_eq[0, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones)
for k in range(L-1) :
if k != 0 :
# Fill-in row 1
A_eq[1, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones)
# Fill-in row 1
A_eq[2, k*L] = 1
A_eq[2, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal
b_eq= [1, 1, 0]
# Add the constraints to pulp
for i in range(3) :
prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i])
def respect_order(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles.
This function adds constraints to the optimization problem that prevent
simultaneous travel between landmarks in a way that would result in stacked ones.
However, it does not fully prevent circular paths.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding L-2 equality constraints in-place.
"""
# FIXME: weird 0 artifact in the coefficients popping up
# Loop through rows 1 to L-2 to prevent stacked ones
for i in range(1, L-1):
# Add the constraint that sums across each "row" or "block" in the decision variables
row_sum = -pl.lpSum(x[i + j*L] for j in range(L)) + pl.lpSum(x[i*L:(i+1)*L])
prob += (row_sum == 0)
def respect_user_must(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
This function adds constraints to the optimization problem to ensure that landmarks marked as
'must_do' are included in the solution. It precomputes the constraints and adds them to the
problem accordingly.
Args:
prob (pl.LpProblem): The linear programming problem where constraints will be added.
x (pl.LpVariable): Decision variable representing travel between landmarks.
L (int): The total number of landmarks.
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
None: This function modifies the `prob` object by adding equality constraints in-place.
"""
ones = np.ones(L, dtype=np.int8)
A_eq = np.zeros(L*L, dtype=np.int8)
for i, elem in enumerate(landmarks) :
if elem.must_do is True and i not in [0, L-1]:
A_eq[i*L:i*L+L] = ones
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 1)
if elem.must_avoid is True and i not in [0, L-1]:
A_eq[i*L:i*L+L] = ones
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 2)
def prevent_circle(self, prob: pl.LpProblem, x: pl.LpVariable, circle_vertices: list, L: int) :
"""
Prevent circular paths by adding constraints to the optimization.
This function ensures that circular paths in both directions (i.e., forward and reverse)
between landmarks are avoided in the optimization problem by adding the corresponding constraints.
Args:
prob (pl.LpProblem): The linear programming problem instance to which the constraints will be added.
x (pl.LpVariable): Decision variable representing the travel between landmarks in the problem.
circle_vertices (list): List of indices representing the landmarks that form a circular path.
L (int): The total number of landmarks.
Returns:
None: This function modifies the `prob` object by adding two equality constraints that
prevent circular paths in both directions for the specified circle vertices.
"""
l = np.zeros((2, L*L), dtype=np.int8)
for i, node in enumerate(circle_vertices[:-1]) :
next = circle_vertices[i+1]
l[0, node*L + next] = 1
l[1, next*L + node] = 1
s = circle_vertices[0]
g = circle_vertices[-1]
l[0, g*L + s] = 1
l[1, s*L + g] = 1
# Add the constraints
prob += (pl.lpSum([l[0][j] * x[j] for j in range(L*L)]) == 0)
prob += (pl.lpSum([l[1][j] * x[j] for j in range(L*L)]) == 0)
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
Args:
resx (list): List of edge weights.
Returns:
tuple[list[int], Optional[list[list[int]]]]: A tuple containing the visit order and a list of any detected circles.
"""
resx = np.round(resx).astype(np.int8) # round all elements and cast them to int
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0]
ind_b = nonzero_tup[1]
# Extract all journeys
all_journeys_nodes = []
visited_nodes = set()
for node in ind_a:
if node not in visited_nodes:
journey_nodes = self.get_journey(node, ind_a, ind_b)
all_journeys_nodes.append(journey_nodes)
visited_nodes.update(journey_nodes)
for l in all_journeys_nodes :
if 0 in l :
all_journeys_nodes.remove(l)
break
if not all_journeys_nodes :
return None
return all_journeys_nodes
def get_journey(self, start, ind_a, ind_b):
"""
Trace the journey starting from a given node and follow the connections between landmarks.
This method constructs a graph from two lists of landmark connections, `ind_a` and `ind_b`,
where each element in `ind_a` is connected to the corresponding element in `ind_b`.
It then performs a depth-first search (DFS) starting from the `start` node to determine
the path (journey) by following the connections.
Args:
start (int): The starting node of the journey.
ind_a (list[int]): List of "from" nodes, representing the starting points of each connection.
ind_b (list[int]): List of "to" nodes, representing the endpoints of each connection.
Returns:
list[int]: A list of nodes representing the order of the journey, starting from the `start` node.
Example:
If `ind_a = [0, 1, 2]` and `ind_b = [1, 2, 3]`, starting from node 0, the journey would be `[0, 1, 2, 3]`.
"""
graph = defaultdict(list)
for a, b in zip(ind_a, ind_b):
graph[a].append(b)
journey_nodes = []
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
journey_nodes.append(node)
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return journey_nodes
def get_order(self, resx):
"""
Determine the order of visits given the result of the optimization.
Args:
resx (list): List of edge weights.
Returns:
list[int]: A list containing the visit order.
"""
resx = np.round(resx).astype(np.uint8) # must contain only 0 and 1
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
order = [0]
current = 0
used_indices = set() # Track visited index pairs
while True:
# Find index of the current node in ind_a
try:
i = ind_a.index(current)
except ValueError:
break # No more links, stop the search
if i in used_indices:
break # Prevent infinite loops
used_indices.add(i) # Mark this index as visited
next_node = ind_b[i] # Get the corresponding node in ind_b
order.append(next_node) # Add it to the path
# Switch roles, now look for next_node in ind_a
try:
current = next_node
except ValueError:
break # No further connections, end the path
return order
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
Args:
order (list[int]): List of indices representing the order of landmarks to visit.
landmarks (list[Landmark]): List of all landmarks.
Returns:
list[Landmark]]: The updated linked list of landmarks with travel times
"""
L = []
j = 0
while j < len(order)-1 :
# get landmarks involved
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
# get attributes
elem.time_to_reach_next = get_time(elem.location, next.location)
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
elem.next_uuid = next.uuid
L.append(elem)
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
L.append(next)
return L
def pre_processing(self, L: int, landmarks: list[Landmark], max_time: int, max_landmarks: int | None) :
"""
Preprocesses the optimization problem by setting up constraints and variables for the tour optimization.
This method initializes and prepares the linear programming problem to optimize a tour that includes landmarks,
while respecting various constraints such as time limits, the number of landmarks to visit, and user preferences.
The pre-processing step sets up the problem before solving it using a linear programming solver.
Responsibilities:
- Defines the optimization problem using linear programming (LP) with the objective to maximize the tour value.
- Creates binary decision variables for each potential transition between landmarks.
- Sets up inequality constraints to respect the maximum time available for the tour and the maximum number of landmarks.
- Implements equality constraints to ensure the tour respects the start and finish positions, avoids staying in the same place,
and adheres to a visit order.
- Forces inclusion or exclusion of specific landmarks based on user preferences.
Attributes:
prob (pl.LpProblem): The linear programming problem to be solved.
x (list): A list of binary variables representing transitions between landmarks.
L (int): The total number of landmarks considered in the optimization.
landmarks (list[Landmark]): The list of landmarks to be visited in the tour.
max_time (int): The maximum allowable time for the entire tour.
max_landmarks (int | None): The maximum number of landmarks to visit in the tour, or None if no limit is set.
Returns:
prob (pl.LpProblem): The linear programming problem setup for optimization.
x (list): The list of binary variables for transitions between landmarks in the tour.
"""
if max_landmarks is None :
max_landmarks = self.max_landmarks
# Initalize the optimization problem
prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
# Define the problem
x_bounds = [(0, 1)]*L*L
x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)]
# Setup the inequality constraints
self.init_ub_time(prob, x, L, landmarks, max_time) # Adds the distances from each landmark to the other.
self.respect_number(prob, x, L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks).
self.break_sym(prob, x, L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces.
# Setup the equality constraints
self.init_eq_not_stay(prob, x, L) # Force solution not to stay in same place
self.respect_start_finish(prob, x, L) # Force start and finish positions
self.respect_order(prob, x, L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(prob, x, L, landmarks) # Force to do/avoid landmarks set by user.
return prob, x
def solve_optimization(self, max_time: int, landmarks: list[Landmark], max_landmarks: int = None) -> list[Landmark]:
"""
Main optimization pipeline to solve the landmark visiting problem.
This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks,
considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present.
Args:
max_time (int): Maximum time allowed for the tour in minutes.
landmarks (list[Landmark]): List of landmarks to visit.
max_landmarks (int): Maximum number of landmarks visited
Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
"""
# Setup the optimization proplem.
L = len(landmarks)
prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
# Solve the problem and extract results.
prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1))
status = pl.LpStatus[prob.status]
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
self.logger.debug("First results are out. Looking out for circles and correcting.")
# Raise error if no solution is found. FIXME: for now this throws the internal server error
if status != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution on first try.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
# If there is a solution, we're good to go, just check for connectiveness
circles = self.is_connected(solution)
i = 0
timeout = 40
while circles is not None :
i += 1
if i == timeout :
self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
for circle in circles :
self.prevent_circle(prob, x, circle, L)
# Solve the problem again
prob.solve(pl.PULP_CBC_CMD(msg=False))
solution = [pl.value(var) for var in x]
if pl.LpStatus[prob.status] != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution after {i} cycles.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
circles = self.is_connected(solution)
if circles is None :
break
# Sort the landmarks in the order of the solution
order = self.get_order(solution)
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
return tour

View File

@@ -1,374 +0,0 @@
"""Allows to refine the tour by adding more landmarks and making the path easier to follow."""
import logging
from math import pi
import yaml
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
from ..structs.landmark import Landmark
from .get_time_distance import get_time
from .take_most_important import take_most_important
from .optimizer import Optimizer
from ..constants import OPTIMIZER_PARAMETERS_PATH
class Refiner :
"""
Refines a tour by incorporating smaller landmarks along the path to enhance the experience.
This class is designed to adjust an existing tour by considering additional,
smaller points of interest (landmarks) that may require minor detours but
improve the overall quality of the tour. It balances the efficiency of travel
with the added value of visiting these landmarks.
"""
logger = logging.getLogger(__name__)
detour_factor: float # detour factor of straight line vs real distance in cities
detour_corridor_width: float # width of the corridor around the path
average_walking_speed: float # average walking speed of adult
max_landmarks_refiner: int # max number of landmarks to visit
optimizer: Optimizer # optimizer object
def __init__(self, optimizer: Optimizer) :
self.optimizer = optimizer
# load parameters from file
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.detour_corridor_width = parameters['detour_corridor_width']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks_refiner = parameters['max_landmarks_refiner']
def create_corridor(self, landmarks: list[Landmark], width: float) :
"""
Create a corridor around the path connecting the landmarks.
Args:
landmarks (list[Landmark]) : the landmark path around which to create the corridor
width (float) : width of the corridor in meters.
Returns:
Geometry: a buffered geometry object representing the corridor around the path.
"""
corrected_width = (180*width)/(6371000*pi)
path = self.create_linestring(landmarks)
obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2)
return obj
def create_linestring(self, tour: list[Landmark]) -> LineString :
"""
Create a `LineString` object from a tour.
Args:
tour (list[Landmark]): An ordered sequence of landmarks that represents the visiting order.
Returns:
LineString: A `LineString` object representing the path through the landmarks.
"""
points = []
for landmark in tour :
points.append(Point(landmark.location))
return LineString(points)
# Check if some coordinates are in area. Used for the corridor
def is_in_area(self, area: Polygon, coordinates) -> bool :
"""
Check if a given point is within a specified area.
Args:
area (Polygon): The polygon defining the area.
coordinates (tuple[float, float]): The coordinates of the point to check.
Returns:
bool: True if the point is within the area, otherwise False.
"""
point = Point(coordinates)
return point.within(area)
# Function to determine if two landmarks are close to each other
def is_close_to(self, location1: tuple[float], location2: tuple[float]):
"""
Determine if two locations are close to each other by rounding their coordinates to 3 decimal places.
Args:
location1 (tuple[float, float]): The coordinates of the first location.
location2 (tuple[float, float]): The coordinates of the second location.
Returns:
bool: True if the locations are within 0.001 degrees of each other, otherwise False.
"""
absx = abs(location1[0] - location2[0])
absy = abs(location1[1] - location2[1])
return absx < 0.001 and absy < 0.001
#return (round(location1[0], 3), round(location1[1], 3)) == (round(location2[0], 3), round(location2[1], 3))
def rearrange(self, tour: list[Landmark]) -> list[Landmark]:
"""
Rearrange landmarks to group nearby visits together.
This function reorders landmarks so that nearby landmarks are adjacent to each other in the list,
while keeping 'start' and 'finish' landmarks in their original positions.
Args:
tour (list[Landmark]): Ordered list of landmarks to be rearranged.
Returns:
list[Landmark]: The rearranged list of landmarks with grouped nearby visits.
"""
i = 1
while i < len(tour):
j = i+1
while j < len(tour):
if self.is_close_to(tour[i].location, tour[j].location) and tour[i].name not in ['start', 'finish'] and tour[j].name not in ['start', 'finish']:
# If they are not adjacent, move the j-th element to be adjacent to the i-th element
if j != i + 1:
tour.insert(i + 1, tour.pop(j))
break # Move to the next i-th element after rearrangement
j += 1
i += 1
return tour
def integrate_landmarks(self, sub_list: list[Landmark], main_list: list[Landmark]) :
"""
Inserts 'sub_list' of Landmarks inside the 'main_list' by leaving the ends untouched.
Args:
sub_list : the list of Landmarks to be inserted inside of the 'main_list'.
main_list : the original list with start and finish.
Returns:
the full list.
"""
sub_list.append(main_list[-1]) # add finish back
return main_list[:-1] + sub_list # create full set of possible landmarks
def find_shortest_path_through_all_landmarks(self, landmarks: list[Landmark]) -> tuple[list[Landmark], Polygon]:
"""
Find the shortest path through all landmarks using a nearest neighbor heuristic.
This function constructs a path that starts from the 'start' landmark, visits all other landmarks in the order
of their proximity, and ends at the 'finish' landmark. It returns both the ordered list of landmarks and a
polygon representing the path.
Args:
landmarks (list[Landmark]): list of all landmarks including 'start' and 'finish'.
Returns:
tuple[list[Landmark], Polygon]: A tuple where the first element is the list of landmarks in the order they
should be visited, and the second element is a `Polygon` representing
the path connecting all landmarks.
"""
# Step 1: Find 'start' and 'finish' landmarks
start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start')
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish')
start_landmark = landmarks[start_idx]
finish_landmark = landmarks[finish_idx]
# Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish'
unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]]
# Step 3: Initialize the path with the 'start' landmark
path = [start_landmark]
coordinates = [landmarks[start_idx].location]
current_landmark = start_landmark
# Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time(current_landmark.location, lm.location))
path.append(nearest_landmark)
coordinates.append(nearest_landmark.location)
current_landmark = nearest_landmark
unvisited_landmarks.remove(nearest_landmark)
# Step 5: Finally add the 'finish' landmark to the path
path.append(finish_landmark)
coordinates.append(landmarks[finish_idx].location)
path_poly = Polygon(coordinates)
return path, path_poly
# Returns a list of minor landmarks around the planned path to enhance experience
def get_minor_landmarks(self, all_landmarks: list[Landmark], visited_landmarks: list[Landmark], width: float) -> list[Landmark] :
"""
Identify landmarks within a specified corridor that have not been visited yet.
This function creates a corridor around the path defined by visited landmarks and then finds landmarks that fall
within this corridor. It returns a list of these landmarks, excluding those already visited, sorted by their importance.
Args:
all_landmarks (list[Landmark]): list of all available landmarks.
visited_landmarks (list[Landmark]): list of landmarks that have already been visited.
width (float): Width of the corridor around the visited landmarks.
Returns:
list[Landmark]: list of important landmarks within the corridor that have not been visited yet.
"""
second_order_landmarks = []
visited_names = []
area = self.create_corridor(visited_landmarks, width)
for visited in visited_landmarks :
visited_names.append(visited.name)
for landmark in all_landmarks :
if self.is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
return take_most_important(second_order_landmarks, int(self.max_landmarks_refiner*0.75))
# Try fix the shortest path using shapely
def fix_using_polygon(self, tour: list[Landmark])-> list[Landmark] :
"""
Improve the tour path using geometric methods to ensure it follows a more optimal shape.
This function creates a polygon from the given tour and attempts to refine it using a concave hull. It reorders
the landmarks to fit within this refined polygon and adjusts the tour to ensure the 'start' landmark is at the
beginning. It also checks if the final polygon is simple and rearranges the tour if necessary.
Args:
tour (list[Landmark]): list of landmarks representing the current tour path.
Returns:
list[Landmark]: Refined list of landmarks in the order of visit to produce a better tour path.
"""
coords = []
coords_dict = {}
for landmark in tour :
coords.append(landmark.location)
if landmark.name != 'finish' :
coords_dict[landmark.location] = landmark
tour_poly = Polygon(coords)
better_tour_poly = tour_poly.buffer(0)
try :
xs, ys = better_tour_poly.exterior.xy
if len(xs) != len(tour) :
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
except Exception:
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
"""
ERROR HERE :
Exception has occurred: AttributeError
'LineString' object has no attribute 'exterior'
"""
# reverse the xs and ys
xs.reverse()
ys.reverse()
better_tour = [] # list of ordered visit
name_index = {} # Maps the name of a landmark to its index in the concave polygon
# Loop through the polygon and generate the better (ordered) tour
for i,x in enumerate(xs[:-1]) :
y = ys[i]
better_tour.append(coords_dict[tuple((x,y))])
name_index[coords_dict[tuple((x,y))].name] = i
# Scroll the list to have start in front again
start_index = name_index['start']
better_tour = better_tour[start_index:] + better_tour[:start_index]
# Append the finish back and correct the time to reach
better_tour.append(tour[-1])
# Rearrange only if polygon still not simple
if not better_tour_poly.is_simple :
better_tour = self.rearrange(better_tour)
return better_tour
def refine_optimization(
self,
all_landmarks: list[Landmark],
base_tour: list[Landmark],
max_time: int,
detour: int
) -> list[Landmark]:
"""
This is the second stage of the optimization. It refines the initial tour path by considering additional minor landmarks and optimizes the path.
This method evaluates the need for further optimization based on the initial tour. If a detour is required
it adds minor landmarks around the initial predicted path and solves a new optimization problem to find a potentially better
tour. It then links the new tour and adjusts it using a nearest neighbor heuristic and polygon-based methods to
ensure a valid path. The final tour is chosen based on the shortest distance.
Args:
all_landmarks (list[Landmark]): The full list of landmarks available for the optimization.
base_tour (list[Landmark]): The initial tour path to be refined.
max_time (int): The maximum time available for the tour in minutes.
detour (int): The maximum detour time allowed for the tour in minutes.
Returns:
list[Landmark]: The refined list of landmarks representing the optimized tour path.
"""
# No need to refine if no detour is taken
# if detour == 0:
# return base_tour
minor_landmarks = self.get_minor_landmarks(all_landmarks, base_tour, self.detour_corridor_width)
self.logger.debug(f"Using {len(minor_landmarks)} minor landmarks around the predicted path")
# Full set of visitable landmarks.
full_set = self.integrate_landmarks(minor_landmarks, base_tour) # could probably be optimized with less overhead
# Generate a new tour with the optimizer.
new_tour = self.optimizer.solve_optimization(
max_time = max_time + detour,
landmarks = full_set,
max_landmarks = self.max_landmarks_refiner
)
# If unsuccessful optimization, use the base_tour.
if new_tour is None:
self.logger.warning("No solution found for the refined tour. Returning the initial tour.")
new_tour = base_tour
# If only one landmark, return it.
if len(new_tour) < 4 :
return new_tour
# Find shortest path using the nearest neighbor heuristic.
better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour)
# Fix the tour using Polygons if the path looks weird.
# Conditions : circular trip and invalid polygon.
if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid :
better_tour = self.fix_using_polygon(better_tour)
return better_tour