finally pulp is working !
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 2m31s
Run linting on the backend code / Build (pull_request) Successful in 25s
Run testing on the backend code / Build (pull_request) Failing after 5m30s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 23s

This commit is contained in:
Helldragon67 2025-01-16 07:34:55 +01:00
parent 2be7cd1e61
commit e2e54f5205
3 changed files with 107 additions and 107 deletions

View File

@ -21,7 +21,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
request: request:
""" """
start_time = time.time() # Start timer start_time = time.time() # Start timer
duration_minutes = 15 duration_minutes = 20
response = client.post( response = client.post(
"/trip/new", "/trip/new",
@ -35,7 +35,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
} }
) )
result = response.json() result = response.json()
print(result) # print(result)
landmarks = load_trip_landmarks(client, result['first_landmark_uuid']) landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
@ -45,8 +45,8 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
# Add details to report # Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes) log_trip_details(request, landmarks, result['total_time'], duration_minutes)
for elem in landmarks : # for elem in landmarks :
print(elem) # print(elem)
# checks : # checks :
assert response.status_code == 200 # check for successful planning assert response.status_code == 200 # check for successful planning
@ -54,9 +54,8 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2 assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert len(landmarks) > 2 # check that there is something to visit assert len(landmarks) > 2 # check that there is something to visit
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert 2==3 # assert 2==3
'''
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
""" """
@ -219,6 +218,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2 assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
''' '''
# def test_new_trip_single_prefs(client): # def test_new_trip_single_prefs(client):
# response = client.post( # response = client.post(

View File

@ -81,7 +81,7 @@ class LandmarkManager:
all_landmarks = set() all_landmarks = set()
# Create a bbox using the around technique # Create a bbox using the around technique
bbox = tuple((f"around:{reachable_bbox_side/2}", str(center_coordinates[0]), str(center_coordinates[1]))) bbox = tuple((f"around:{min(2000, reachable_bbox_side/2)}", str(center_coordinates[0]), str(center_coordinates[1])))
# list for sightseeing # list for sightseeing
if preferences.sightseeing.score != 0: if preferences.sightseeing.score != 0:

View File

@ -21,8 +21,6 @@ class Optimizer:
average_walking_speed: float # average walking speed of adult average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
prob: pl.LpProblem # linear optimization problem to solve
x: list[pl.LpVariable] # decision variables
def __init__(self) : def __init__(self) :
@ -34,11 +32,8 @@ class Optimizer:
self.max_landmarks = parameters['max_landmarks'] self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot'] self.overshoot = parameters['overshoot']
# Initalize the optimization problem
self.prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
def init_ub_time(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark], max_time: int):
def init_ub_time(self, L: int, landmarks: list[Landmark], max_time: int):
""" """
Initialize the objective function coefficients and inequality constraints. Initialize the objective function coefficients and inequality constraints.
-> Adds 1 row of constraints -> Adds 1 row of constraints
@ -80,7 +75,7 @@ class Optimizer:
if L > 22 : if L > 22 :
for i in range(L): for i in range(L):
# Get indices of the 4 smallest values in row i # Get indices of the 4 smallest values in row i
row_values = A_ub[0, i*L:i*L+L] row_values = A_ub[i*L:i*L+L]
closest_indices = np.argpartition(row_values, 22)[:22] closest_indices = np.argpartition(row_values, 22)[:22]
# Create a mask for non-closest landmarks # Create a mask for non-closest landmarks
@ -89,14 +84,14 @@ class Optimizer:
# Set non-closest landmarks to 32765 # Set non-closest landmarks to 32765
row_values[mask] = 32765 row_values[mask] = 32765
A_ub[0, i*L:i*L+L] = row_values A_ub[i*L:i*L+L] = row_values
# Add the objective and the distance constraint # Add the objective and the 1 distance constraint
self.prob += pl.lpSum([c[j] * self.x[j] for j in range(L*L)]) prob += pl.lpSum([c[j] * x[j] for j in range(L*L)])
self.prob += (pl.lpSum([A_ub[j] * self.x[j] for j in range(L*L)]) <= b_ub) prob += (pl.lpSum([A_ub[j] * x[j] for j in range(L*L)]) <= b_ub)
def respect_number(self, L: int, max_landmarks: int): def respect_number(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, max_landmarks: int):
""" """
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks. Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
-> Adds L-1 rows of constraints -> Adds L-1 rows of constraints
@ -107,17 +102,15 @@ class Optimizer:
Returns: Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
""" """
# First constraint: each landmark is visited exactly once # L-2 constraints: each landmark is visited exactly once
A_ub = np.zeros(L*L, dtype=np.int8) for i in range(1, L-1):
for i in range(0, L-2): prob += (pl.lpSum([x[L*i + j] for j in range(L)]) <= 1)
A_ub[L*i:L*(i+1)] = np.ones(L, dtype=np.int16)
self.prob += (pl.lpSum([A_ub[j] * self.x[j] for j in range(L*L)]) <= 1)
# Second constraint: cap the total number of visits # 1 constraint: cap the total number of visits
self.prob += (pl.lpSum([1 * self.x[j] for j in range(L*L)]) <= max_landmarks+2) prob += (pl.lpSum([1 * x[j] for j in range(L*L)]) <= max_landmarks+2)
def break_sym(self, L: int): def break_sym(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
""" """
Generate constraints to prevent simultaneous travel between two landmarks Generate constraints to prevent simultaneous travel between two landmarks
in both directions. Constraint to not have d14 and d41 simultaneously. in both directions. Constraint to not have d14 and d41 simultaneously.
@ -131,20 +124,18 @@ class Optimizer:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and tuple[np.ndarray, list[int]]: Inequality constraint coefficients and
the right-hand side of the inequality constraints. the right-hand side of the inequality constraints.
""" """
upper_ind = np.triu_indices(L,0,L) upper_ind = np.triu_indices(L, 0, L) # Get the upper triangular indices
up_ind_x = upper_ind[0] up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1] up_ind_y = upper_ind[1]
A = np.zeros(L*L, dtype=np.int8)
# Fill-in rows L to 2*L-1 # Loop over the upper triangular indices, excluding diagonal elements
for i in range(int((L*L+L)/2)) : for i in range(len(up_ind_x)):
if up_ind_x[i] != up_ind_y[i] : if up_ind_x[i] != up_ind_y[i]:
A[up_ind_x[i]*L + up_ind_y[i]] = 1 # Add (L*L-L)/2 constraints to break symmetry
A[up_ind_y[i]*L + up_ind_x[i]] = 1 prob += (x[up_ind_x[i]*L + up_ind_y[i]] + x[up_ind_y[i]*L + up_ind_x[i]] <= 1)
self.prob += (pl.lpSum([A[j] * self.x[j] for j in range(L*L)]) <= 1)
def init_eq_not_stay(self, L: int): def init_eq_not_stay(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
""" """
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.). Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
-> Adds 1 row of constraints -> Adds 1 row of constraints
@ -162,11 +153,12 @@ class Optimizer:
np.fill_diagonal(A_eq, 1) np.fill_diagonal(A_eq, 1)
A_eq = A_eq.flatten() A_eq = A_eq.flatten()
self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 1) # First equality constraint
prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 0)
# Constraint to ensure start at start and finish at goal # Constraint to ensure start at start and finish at goal
def respect_start_finish(self, L: int): def respect_start_finish(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
""" """
Generate constraints to ensure that the optimization starts at the designated Generate constraints to ensure that the optimization starts at the designated
start landmark and finishes at the goal landmark. start landmark and finishes at the goal landmark.
@ -193,9 +185,9 @@ class Optimizer:
# Add the constraints to pulp # Add the constraints to pulp
for i in range(3) : for i in range(3) :
self.prob += (pl.lpSum([A_eq[i][j] * self.x[j] for j in range(L*L)]) == b_eq[i]) prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i])
def respect_order(self, L: int): def respect_order(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
""" """
Generate constraints to tie the optimization problem together and prevent Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles. stacked ones, although this does not fully prevent circles.
@ -207,17 +199,24 @@ class Optimizer:
Returns: Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
""" """
A_eq = np.zeros(L*L, dtype=np.int8) # A_eq = np.zeros(L*L, dtype=np.int8)
ones = np.ones(L, dtype=np.int8) # ones = np.ones(L, dtype=np.int8)
# Fill-in rows 4 to L+2 # # Fill-in rows 4 to L+2
for i in range(1, L-1) : # Prevent stacked ones # for i in range(1, L-1) : # Prevent stacked ones
for j in range(L) : # for j in range(L) :
A_eq[i + j*L] = -1 # A_eq[i + j*L] = -1
A_eq[i*L:(i+1)*L] = ones # A_eq[i*L:(i+1)*L] = ones
self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 0) # prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 0)
# FIXME: weird 0 artifact in the coefficients popping up
# Loop through rows 1 to L-2 to prevent stacked ones
for i in range(1, L-1):
# Add the constraint that sums across each "row" or "block" in the decision variables
row_sum = -pl.lpSum(x[i + j*L] for j in range(L)) + pl.lpSum(x[i*L:(i+1)*L])
prob += (row_sum == 0)
def respect_user_must(self, L: int, landmarks: list[Landmark]) : def respect_user_must(self, prob: pl.LpProblem, x: pl.LpVariable, L: int, landmarks: list[Landmark]) :
""" """
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization. Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
-> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED -> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED
@ -235,49 +234,49 @@ class Optimizer:
for i, elem in enumerate(landmarks) : for i, elem in enumerate(landmarks) :
if elem.must_do is True and i not in [0, L-1]: if elem.must_do is True and i not in [0, L-1]:
A_eq[i*L:i*L+L] = ones A_eq[i*L:i*L+L] = ones
self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 1) prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 1)
if elem.must_avoid is True and i not in [0, L-1]: if elem.must_avoid is True and i not in [0, L-1]:
A_eq[i*L:i*L+L] = ones A_eq[i*L:i*L+L] = ones
self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 2) prob += (pl.lpSum([A_eq[j] * x[j] for j in range(L*L)]) == 2)
# Prevent the use of a particular solution. TODO probably can be done faster just using resx # Prevent the use of a particular solution. TODO probably can be done faster just using resx
def prevent_config(self, resx): # def prevent_config(self, prob: pl.LpProblem, x: pl.LpVariable, resx):
""" # """
Prevent the use of a particular solution by adding constraints to the optimization. # Prevent the use of a particular solution by adding constraints to the optimization.
Args: # Args:
resx (list[float]): List of edge weights. # resx (list[float]): List of edge weights.
Returns: # Returns:
tuple[list[int], list[int]]: A tuple containing a new row for A and new value for ub. # tuple[list[int], list[int]]: A tuple containing a new row for A and new value for ub.
""" # """
for i, elem in enumerate(resx): # for i, elem in enumerate(resx):
resx[i] = round(elem) # resx[i] = round(elem)
N = len(resx) # Number of edges # N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks # L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0] # nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L)) # nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist() # ind_a = nonzero_tup[0].tolist()
vertices_visited = ind_a # vertices_visited = ind_a
vertices_visited.remove(0) # vertices_visited.remove(0)
ones = np.ones(L, dtype=np.int8) # ones = np.ones(L, dtype=np.int8)
h = np.zeros(L*L, dtype=np.int8) # h = np.zeros(L*L, dtype=np.int8)
for i in range(L) : # for i in range(L) :
if i in vertices_visited : # if i in vertices_visited :
h[i*L:i*L+L] = ones # h[i*L:i*L+L] = ones
return h, len(vertices_visited)-1 # return h, len(vertices_visited)-1
# Prevents the creation of the same circle (both directions) # Prevents the creation of the same circle (both directions)
def prevent_circle(self, circle_vertices: list, L: int) : def prevent_circle(self, prob: pl.LpProblem, x: pl.LpVariable, circle_vertices: list, L: int) :
""" """
Prevent circular paths by by adding constraints to the optimization. Prevent circular paths by by adding constraints to the optimization.
@ -303,8 +302,8 @@ class Optimizer:
l[1, s*L + g] = 1 l[1, s*L + g] = 1
# Add the constraints # Add the constraints
self.prob += (pl.lpSum([l[0][j] * self.x[j] for j in range(L*L)]) == 0) prob += (pl.lpSum([l[0][j] * x[j] for j in range(L*L)]) == 0)
self.prob += (pl.lpSum([l[1][j] * self.x[j] for j in range(L*L)]) == 0) prob += (pl.lpSum([l[1][j] * x[j] for j in range(L*L)]) == 0)
def is_connected(self, resx) : def is_connected(self, resx) :
@ -474,21 +473,25 @@ class Optimizer:
if max_landmarks is None : if max_landmarks is None :
max_landmarks = self.max_landmarks max_landmarks = self.max_landmarks
# Initalize the optimization problem
prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
# Define the problem # Define the problem
x_bounds = [(0, 1)]*L*L x_bounds = [(0, 1)]*L*L
self.x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)] x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)]
# Setup the inequality constraints # Setup the inequality constraints
self.init_ub_time(L, landmarks, max_time) # Adds the distances from each landmark to the other. self.init_ub_time(prob, x, L, landmarks, max_time) # Adds the distances from each landmark to the other.
self.respect_number(L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks). self.respect_number(prob, x, L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks).
self.break_sym(L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces. self.break_sym(prob, x, L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces.
# Setup the equality constraints # Setup the equality constraints
self.init_eq_not_stay(L) # Force solution not to stay in same place self.init_eq_not_stay(prob, x, L) # Force solution not to stay in same place
self.respect_start_finish(L) # Force start and finish positions self.respect_start_finish(prob, x, L) # Force start and finish positions
self.respect_order(L) # Respect order of visit (only works when max_time is limiting factor) self.respect_order(prob, x, L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(L, landmarks) # Force to do/avoid landmarks set by user. self.respect_user_must(prob, x, L, landmarks) # Force to do/avoid landmarks set by user.
return prob, x
def solve_optimization( def solve_optimization(
self, self,
@ -511,14 +514,14 @@ class Optimizer:
""" """
# 1. Setup the optimization proplem. # 1. Setup the optimization proplem.
L = len(landmarks) L = len(landmarks)
self.pre_processing(L, landmarks, max_time, max_landmarks) prob, x = self.pre_processing(L, landmarks, max_time, max_landmarks)
# 2. Solve the problem # 2. Solve the problem
self.prob.solve(pl.PULP_CBC_CMD(msg=True, gapRel=0.1)) prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.1))
# 3. Extract Results # 3. Extract Results
status = pl.LpStatus[self.prob.status] status = pl.LpStatus[prob.status]
solution = [pl.value(var) for var in self.x] # The values of the decision variables (will be 0 or 1) solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
self.logger.debug("First results are out. Looking out for circles and correcting.") self.logger.debug("First results are out. Looking out for circles and correcting.")
@ -531,39 +534,36 @@ class Optimizer:
circles = self.is_connected(solution) circles = self.is_connected(solution)
i = 0 i = 0
timeout = 80 timeout = 40
while circles is not None : while circles is not None :
i += 1 i += 1
# print(f"Iteration {i} of fixing circles") # print(f"Iteration {i} of fixing circles")
# l, b = self.prevent_config(solution) # l, b = self.prevent_config(solution)
# prob += (pl.lpSum([l[j] * x[j] for j in range(L*L)]) == b) # prob += (pl.lpSum([l[j] * x[j] for j in range(L*L)]) == b)
if i == timeout :
self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
for circle in circles : for circle in circles :
A, b = self.prevent_circle(circle, L) self.prevent_circle(prob, x, circle, L)
self.prob.solve(pl.PULP_CBC_CMD(msg=False)) # Solve the problem again
prob.solve(pl.PULP_CBC_CMD(msg=False))
solution = [pl.value(var) for var in x]
status = pl.LpStatus[self.prob.status] if pl.LpStatus[prob.status] != 'Optimal' :
solution = [pl.value(var) for var in self.x] # The values of the decision variables (will be 0 or 1)
if status != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution after {i} cycles.") self.logger.error("The problem is overconstrained, no solution after {i} cycles.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.") raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
if i == timeout :
self.logger.error(f'Unexpected error after {timeout} iterations of fixing circles.')
raise ArithmeticError("Solving failed because of overconstrained problem")
circles = self.is_connected(solution) circles = self.is_connected(solution)
if circles is None : if circles is None :
break break
if i == timeout :
self.logger.error(f'Timeout: No solution found after {timeout} iterations.')
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
# Sort the landmarks in the order of the solution # Sort the landmarks in the order of the solution
order = self.get_order(solution) order = self.get_order(solution)
tour = [landmarks[i] for i in order] tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(self.prob.objective))}") self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
return tour return tour