anyway/backend/src/utils/optimizer.py

605 lines
22 KiB
Python

import yaml, logging
import numpy as np
import pulp as pl
from scipy.optimize import linprog
from collections import defaultdict, deque
from ..structs.landmark import Landmark
from .get_time_separation import get_time
from ..constants import OPTIMIZER_PARAMETERS_PATH
logging.getLogger('pulp').setLevel(level=logging.CRITICAL)
class Optimizer:
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
detour_factor: float # detour factor of straight line vs real distance in cities
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
def __init__(self) :
# load parameters from file
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot']
def init_ub_time(self, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function coefficients and inequality constraints.
-> Adds 1 row of constraints
-> Pre-allocates A_ub for the rest of the computations with L + (L*L-L)/2 rows
This function computes the distances between all landmarks and stores
their attractiveness to maximize sightseeing. The goal is to maximize
the objective function subject to the constraints A*x < b and A_eq*x = b_eq.
Args:
landmarks (list[Landmark]): List of landmarks.
max_time (int): Maximum time of visit allowed.
Returns:
tuple[list[float], list[float], list[int]]: Objective function coefficients, inequality
constraint coefficients, and the right-hand side of the inequality constraint.
"""
L = len(landmarks)
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = np.zeros(L, dtype=np.int16)
# Coefficients of inequality constraints (left-hand side)
A_ub = np.zeros((L + int((L*L-L)/2), L*L), dtype=np.int16)
b_ub = np.zeros(L + int((L*L-L)/2), dtype=np.int16)
# Fill in first row
b_ub[0] = round(max_time*self.overshoot)
for i, spot1 in enumerate(landmarks) :
c[i] = spot1.attractiveness
for j in range(i+1, L) :
if i !=j :
t = get_time(spot1.location, landmarks[j].location)
A_ub[0, i*L + j] = t + spot1.duration
A_ub[0, j*L + i] = t + landmarks[j].duration
# Expand 'c' to L*L for every decision variable
c = np.tile(c, L)
# Now sort and modify A_ub for each row
if L > 22 :
for i in range(L):
# Get indices of the 4 smallest values in row i
row_values = A_ub[0, i*L:i*L+L]
closest_indices = np.argpartition(row_values, 22)[:22]
# Create a mask for non-closest landmarks
mask = np.ones(L, dtype=bool)
mask[closest_indices] = False
# Set non-closest landmarks to 32765
row_values[mask] = 32765
A_ub[0, i*L:i*L+L] = row_values
return c, A_ub, b_ub
def respect_number(self, A, b, L, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
-> Adds L-1 rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
# First constraint: each landmark is visited exactly once
# Fill-in row 2 until row L-2
for i in range(1, L-1):
A[i, L*i:L*(i+1)] = np.ones(L, dtype=np.int16)
b[i] = 1
# Fill-in row L-1
# Second constraint: cap the total number of visits
A[L-1, :] = np.ones(L*L, dtype=np.int16)
b[L-1] = max_landmarks+2
def break_sym(self, A, b, L):
"""
Generate constraints to prevent simultaneous travel between two landmarks
in both directions. Constraint to not have d14 and d41 simultaneously.
Does not prevent cyclic paths with more elements
-> Adds (L*L-L)/2 rows of constraints (some of which might be zero)
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and
the right-hand side of the inequality constraints.
"""
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
# Fill-in rows L to 2*L-1
incr = 0
for i in range(int((L*L+L)/2)) :
if up_ind_x[i] != up_ind_y[i] :
A[L+incr, up_ind_x[i]*L + up_ind_y[i]] = 1
A[L+incr, up_ind_y[i]*L + up_ind_x[i]] = 1
b[L+incr] = 1
incr += 1
def init_eq_not_stay(self, landmarks: list):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
-> Adds 1 row of constraints
-> Pre-allocates A_eq for the rest of the computations with (L+ 2 + dynamic incr) rows
Args:
L (int): Number of landmarks.
Returns:
tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
"""
L = len(landmarks)
incr = 0
for i, elem in enumerate(landmarks) :
if (elem.must_do or elem.must_avoid) and i not in [0, L-1]:
incr += 1
A_eq = np.zeros((L+2+incr, L*L), dtype=np.int8)
b_eq = np.zeros(L+2+incr, dtype=np.int8)
l = np.zeros((L, L), dtype=np.int8)
# Set diagonal elements to 1 (to prevent staying in the same position)
np.fill_diagonal(l, 1)
# Fill-in first row
A_eq[0,:] = l.flatten()
b_eq[0] = 0
return A_eq, b_eq
# Constraint to ensure start at start and finish at goal
def respect_start_finish(self, A, b, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated
start landmark and finishes at the goal landmark.
-> Adds 3 rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
# Fill-in row 1.
A[1, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones)
for k in range(L-1) :
if k != 0 :
# Fill-in row 2
A[2, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones)
# Fill-in row 3
A[3, k*L] = 1
A[3, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal
b[1:4] = [1, 1, 0]
# return A, b
def respect_order(self, A, b, L: int):
"""
Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles.
-> Adds L-2 rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
ones = np.ones(L, dtype=np.int8)
# Fill-in rows 4 to L+2
for i in range(1, L-1) : # Prevent stacked ones
for j in range(L) :
A[i-1+4, i + j*L] = -1
A[i-1+4, i*L:(i+1)*L] = ones
b[4:L+2] = np.zeros(L-2, dtype=np.int8)
def respect_user_must(self, A, b, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
-> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
ones = np.ones(L, dtype=np.int8)
incr = 0
for i, elem in enumerate(landmarks) :
if elem.must_do is True and i not in [0, L-1]:
# First part of the dynamic infill
A[L+2+incr, i*L:i*L+L] = ones
b[L+2+incr] = 1
incr += 1
if elem.must_avoid is True and i not in [0, L-1]:
# Second part of the dynamic infill
A[L+2+incr, i*L:i*L+L] = ones
b[L+2+incr] = 0
incr += 1
# Prevent the use of a particular solution
def prevent_config(self, resx):
"""
Prevent the use of a particular solution by adding constraints to the optimization.
Args:
resx (list[float]): List of edge weights.
Returns:
tuple[list[int], list[int]]: A tuple containing a new row for A and new value for ub.
"""
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
vertices_visited = ind_a
vertices_visited.remove(0)
ones = np.ones(L, dtype=np.int8)
h = np.zeros(L*L, dtype=np.int8)
for i in range(L) :
if i in vertices_visited :
h[i*L:i*L+L] = ones
return h, len(vertices_visited)-1
# Prevents the creation of the same circle (both directions)
def prevent_circle(self, circle_vertices: list, L: int) :
"""
Prevent circular paths by by adding constraints to the optimization.
Args:
circle_vertices (list): List of vertices forming a circle.
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
l = np.zeros((2, L*L), dtype=np.int8)
for i, node in enumerate(circle_vertices[:-1]) :
next = circle_vertices[i+1]
l[0, node*L + next] = 1
l[1, next*L + node] = 1
s = circle_vertices[0]
g = circle_vertices[-1]
l[0, g*L + s] = 1
l[1, s*L + g] = 1
return l, [0, 0]
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
Args:
resx (list): List of edge weights.
Returns:
tuple[list[int], Optional[list[list[int]]]]: A tuple containing the visit order and a list of any detected circles.
"""
resx = np.round(resx).astype(np.int8) # round all elements and cast them to int
print(f"resx = ")
# for r in resx : print(r)
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0]
ind_b = nonzero_tup[1]
# Extract all journeys
all_journeys_nodes = []
visited_nodes = set()
for node in ind_a:
if node not in visited_nodes:
journey_nodes = self.get_journey(node, ind_a, ind_b)
all_journeys_nodes.append(journey_nodes)
visited_nodes.update(journey_nodes)
for l in all_journeys_nodes :
if 0 in l :
all_journeys_nodes.remove(l)
break
if not all_journeys_nodes :
return None
return all_journeys_nodes
def get_journey(self, start, ind_a, ind_b):
"""
Trace the journey starting from a given node and follow the connections between landmarks.
This method constructs a graph from two lists of landmark connections, `ind_a` and `ind_b`,
where each element in `ind_a` is connected to the corresponding element in `ind_b`.
It then performs a depth-first search (DFS) starting from the `start` node to determine
the path (journey) by following the connections.
Args:
start (int): The starting node of the journey.
ind_a (list[int]): List of "from" nodes, representing the starting points of each connection.
ind_b (list[int]): List of "to" nodes, representing the endpoints of each connection.
Returns:
list[int]: A list of nodes representing the order of the journey, starting from the `start` node.
Example:
If `ind_a = [0, 1, 2]` and `ind_b = [1, 2, 3]`, starting from node 0, the journey would be `[0, 1, 2, 3]`.
"""
graph = defaultdict(list)
for a, b in zip(ind_a, ind_b):
graph[a].append(b)
journey_nodes = []
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
journey_nodes.append(node)
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return journey_nodes
def get_order(self, resx):
"""
Determine the order of visits given the result of the optimization.
Args:
resx (list): List of edge weights.
Returns:
list[int]: A list containing the visit order.
"""
resx = np.round(resx).astype(np.uint8) # must contain only 0 and 1
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
order = [0]
current = 0
used_indices = set() # Track visited index pairs
while True:
# Find index of the current node in ind_a
try:
i = ind_a.index(current)
except ValueError:
break # No more links, stop the search
if i in used_indices:
break # Prevent infinite loops
used_indices.add(i) # Mark this index as visited
next_node = ind_b[i] # Get the corresponding node in ind_b
order.append(next_node) # Add it to the path
# Switch roles, now look for next_node in ind_a
try:
current = next_node
except ValueError:
break # No further connections, end the path
return order
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
Args:
order (list[int]): List of indices representing the order of landmarks to visit.
landmarks (list[Landmark]): List of all landmarks.
Returns:
list[Landmark]]: The updated linked list of landmarks with travel times
"""
L = []
j = 0
while j < len(order)-1 :
# get landmarks involved
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
# get attributes
elem.time_to_reach_next = get_time(elem.location, next.location)
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
elem.next_uuid = next.uuid
L.append(elem)
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
L.append(next)
return L
def solve_optimization(
self,
max_time: int,
landmarks: list[Landmark],
max_landmarks: int = None
) -> list[Landmark]:
"""
Main optimization pipeline to solve the landmark visiting problem.
This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks,
considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present.
Args:
max_time (int): Maximum time allowed for the tour in minutes.
landmarks (list[Landmark]): List of landmarks to visit.
max_landmarks (int): Maximum number of landmarks visited
Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
"""
if max_landmarks is None :
max_landmarks = self.max_landmarks
L = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = self.init_ub_time(landmarks, max_time) # Adds the distances from each landmark to the other.
self.respect_number(A_ub, b_ub, L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks).
self.break_sym(A_ub, b_ub, L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces.
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = self.init_eq_not_stay(landmarks) # Force solution not to stay in same place
self.respect_start_finish(A_eq, b_eq, L) # Force start and finish positions
self.respect_order(A_eq, b_eq, L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(A_eq, b_eq, landmarks) # Force to do/avoid landmarks set by user.
self.logger.debug(f"Optimizing with {A_ub.shape[0]} + {A_eq.shape[0]} = {A_ub.shape[0] + A_eq.shape[0]} constraints.")
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem with PulP
prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)]
# Add the objective function
prob += pl.lpSum([c[i] * x[i] for i in range(L*L)])
# Add Inequality Constraints (A_ub @ x <= b_ub)
for i in range(len(b_ub)): # Iterate over rows of A_ub
prob += (pl.lpSum([A_ub[i][j] * x[j] for j in range(L*L)]) <= b_ub[i])
# 5. Add Equality Constraints (A_eq @ x == b_eq)
for i in range(len(b_eq)): # Iterate over rows of A_eq
prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i])
# 6. Solve the problem
prob.solve(pl.PULP_CBC_CMD(msg=True))
# 7. Extract Results
status = pl.LpStatus[prob.status]
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
print(status)
self.logger.debug("First results are out. Looking out for circles and correcting.")
# Raise error if no solution is found. FIXME: for now this throws the internal server error
if status != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution on first try.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
# If there is a solution, we're good to go, just check for connectiveness
circles = self.is_connected(solution)
i = 0
timeout = 80
while circles is not None and i < timeout:
i += 1
# print(f"Iteration {i} of fixing circles")
print("ok1")
A, b = self.prevent_config(solution)
print("ok2")
print(f"A: {A}")
print(f"b: {b}")
try :
prob += pl.lpSum([A[j] * x[j // L][j % L] for j in range(L * L)]) == b
except Exception as exc :
self.logger.error(f'Unexpected error occured', details=exc)
raise Exception from exc
print("ok3")
for circle in circles :
A, b = self.prevent_circle(circle, L)
prob += (pl.lpSum([A[0][j] * x[j // L][j % L] for j in range(L*L)]) == b[0])
prob += (pl.lpSum([A[1][j] * x[j // L][j % L] for j in range(L*L)]) == b[1])
print("ok4")
prob.solve(pl.PULP_CBC_CMD(msg=True))
status = pl.LpStatus[prob.status]
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
if status != 'Optimal' :
self.logger.error(f'Unexpected error after {timeout} iterations of fixing circles.')
raise ArithmeticError("Solving failed because of overconstrained problem")
circles = self.is_connected(solution)
if circles is None :
break
if i == timeout :
self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
# Sort the landmarks in the order of the solution
order = self.get_order(solution)
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, score: {int(pl.value(prob.objective))}")
return tour