some rpoblem

This commit is contained in:
Helldragon67 2025-01-15 21:12:47 +01:00
parent 3ebe0b7191
commit 2be7cd1e61
2 changed files with 99 additions and 123 deletions

View File

@ -21,7 +21,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
request: request:
""" """
start_time = time.time() # Start timer start_time = time.time() # Start timer
duration_minutes = 20 duration_minutes = 15
response = client.post( response = client.post(
"/trip/new", "/trip/new",
@ -45,8 +45,8 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
# Add details to report # Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes) log_trip_details(request, landmarks, result['total_time'], duration_minutes)
# for elem in landmarks : for elem in landmarks :
# print(elem) print(elem)
# checks : # checks :
assert response.status_code == 200 # check for successful planning assert response.status_code == 200 # check for successful planning
@ -54,9 +54,9 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2 assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert len(landmarks) > 2 # check that there is something to visit assert len(landmarks) > 2 # check that there is something to visit
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
# assert 2==3 assert 2==3
'''
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
""" """
@ -219,7 +219,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2 assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
# def test_new_trip_single_prefs(client): # def test_new_trip_single_prefs(client):
# response = client.post( # response = client.post(
# "/trip/new", # "/trip/new",

View File

@ -21,7 +21,8 @@ class Optimizer:
average_walking_speed: float # average walking speed of adult average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
prob: pl.LpProblem # linear optimization problem to solve
x: list[pl.LpVariable] # decision variables
def __init__(self) : def __init__(self) :
@ -33,8 +34,11 @@ class Optimizer:
self.max_landmarks = parameters['max_landmarks'] self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot'] self.overshoot = parameters['overshoot']
# Initalize the optimization problem
self.prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
def init_ub_time(self, landmarks: list[Landmark], max_time: int):
def init_ub_time(self, L: int, landmarks: list[Landmark], max_time: int):
""" """
Initialize the objective function coefficients and inequality constraints. Initialize the objective function coefficients and inequality constraints.
-> Adds 1 row of constraints -> Adds 1 row of constraints
@ -57,22 +61,19 @@ class Optimizer:
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ... # Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = np.zeros(L, dtype=np.int16) c = np.zeros(L, dtype=np.int16)
# Coefficients of inequality constraints (left-hand side) # inequality matrix and vector
A_ub = np.zeros((L + int((L*L-L)/2), L*L), dtype=np.int16) A_ub = np.zeros(L*L, dtype=np.int16)
b_ub = np.zeros(L + int((L*L-L)/2), dtype=np.int16) b_ub = round(max_time*self.overshoot)
# Fill in first row
b_ub[0] = round(max_time*self.overshoot)
for i, spot1 in enumerate(landmarks) : for i, spot1 in enumerate(landmarks) :
c[i] = spot1.attractiveness c[i] = spot1.attractiveness
for j in range(i+1, L) : for j in range(i+1, L) :
if i !=j : if i !=j :
t = get_time(spot1.location, landmarks[j].location) t = get_time(spot1.location, landmarks[j].location)
A_ub[0, i*L + j] = t + spot1.duration A_ub[i*L + j] = t + spot1.duration
A_ub[0, j*L + i] = t + landmarks[j].duration A_ub[j*L + i] = t + landmarks[j].duration
# Expand 'c' to L*L for every decision variable # Expand 'c' to L*L for every decision variable and ad
c = np.tile(c, L) c = np.tile(c, L)
# Now sort and modify A_ub for each row # Now sort and modify A_ub for each row
@ -90,10 +91,12 @@ class Optimizer:
row_values[mask] = 32765 row_values[mask] = 32765
A_ub[0, i*L:i*L+L] = row_values A_ub[0, i*L:i*L+L] = row_values
return c, A_ub, b_ub # Add the objective and the distance constraint
self.prob += pl.lpSum([c[j] * self.x[j] for j in range(L*L)])
self.prob += (pl.lpSum([A_ub[j] * self.x[j] for j in range(L*L)]) <= b_ub)
def respect_number(self, A, b, L, max_landmarks: int): def respect_number(self, L: int, max_landmarks: int):
""" """
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks. Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
-> Adds L-1 rows of constraints -> Adds L-1 rows of constraints
@ -105,18 +108,16 @@ class Optimizer:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
""" """
# First constraint: each landmark is visited exactly once # First constraint: each landmark is visited exactly once
# Fill-in row 2 until row L-2 A_ub = np.zeros(L*L, dtype=np.int8)
for i in range(1, L-1): for i in range(0, L-2):
A[i, L*i:L*(i+1)] = np.ones(L, dtype=np.int16) A_ub[L*i:L*(i+1)] = np.ones(L, dtype=np.int16)
b[i] = 1 self.prob += (pl.lpSum([A_ub[j] * self.x[j] for j in range(L*L)]) <= 1)
# Fill-in row L-1
# Second constraint: cap the total number of visits # Second constraint: cap the total number of visits
A[L-1, :] = np.ones(L*L, dtype=np.int16) self.prob += (pl.lpSum([1 * self.x[j] for j in range(L*L)]) <= max_landmarks+2)
b[L-1] = max_landmarks+2
def break_sym(self, A, b, L): def break_sym(self, L: int):
""" """
Generate constraints to prevent simultaneous travel between two landmarks Generate constraints to prevent simultaneous travel between two landmarks
in both directions. Constraint to not have d14 and d41 simultaneously. in both directions. Constraint to not have d14 and d41 simultaneously.
@ -133,18 +134,17 @@ class Optimizer:
upper_ind = np.triu_indices(L,0,L) upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0] up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1] up_ind_y = upper_ind[1]
A = np.zeros(L*L, dtype=np.int8)
# Fill-in rows L to 2*L-1 # Fill-in rows L to 2*L-1
incr = 0
for i in range(int((L*L+L)/2)) : for i in range(int((L*L+L)/2)) :
if up_ind_x[i] != up_ind_y[i] : if up_ind_x[i] != up_ind_y[i] :
A[L+incr, up_ind_x[i]*L + up_ind_y[i]] = 1 A[up_ind_x[i]*L + up_ind_y[i]] = 1
A[L+incr, up_ind_y[i]*L + up_ind_x[i]] = 1 A[up_ind_y[i]*L + up_ind_x[i]] = 1
b[L+incr] = 1 self.prob += (pl.lpSum([A[j] * self.x[j] for j in range(L*L)]) <= 1)
incr += 1
def init_eq_not_stay(self, landmarks: list): def init_eq_not_stay(self, L: int):
""" """
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.). Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
-> Adds 1 row of constraints -> Adds 1 row of constraints
@ -156,28 +156,17 @@ class Optimizer:
Returns: Returns:
tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints. tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
""" """
L = len(landmarks) A_eq = np.zeros((L, L), dtype=np.int8)
incr = 0
for i, elem in enumerate(landmarks) :
if (elem.must_do or elem.must_avoid) and i not in [0, L-1]:
incr += 1
A_eq = np.zeros((L+2+incr, L*L), dtype=np.int8)
b_eq = np.zeros(L+2+incr, dtype=np.int8)
l = np.zeros((L, L), dtype=np.int8)
# Set diagonal elements to 1 (to prevent staying in the same position) # Set diagonal elements to 1 (to prevent staying in the same position)
np.fill_diagonal(l, 1) np.fill_diagonal(A_eq, 1)
A_eq = A_eq.flatten()
# Fill-in first row self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 1)
A_eq[0,:] = l.flatten()
b_eq[0] = 0
return A_eq, b_eq
# Constraint to ensure start at start and finish at goal # Constraint to ensure start at start and finish at goal
def respect_start_finish(self, A, b, L: int): def respect_start_finish(self, L: int):
""" """
Generate constraints to ensure that the optimization starts at the designated Generate constraints to ensure that the optimization starts at the designated
start landmark and finishes at the goal landmark. start landmark and finishes at the goal landmark.
@ -189,22 +178,24 @@ class Optimizer:
Returns: Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
""" """
# Fill-in row 1. # Fill-in row 0.
A[1, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones) A_eq = np.zeros((3,L*L), dtype=np.int8)
A_eq[0, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones)
for k in range(L-1) : for k in range(L-1) :
if k != 0 : if k != 0 :
# Fill-in row 2 # Fill-in row 1
A[2, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones) A_eq[1, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones)
# Fill-in row 3 # Fill-in row 1
A[3, k*L] = 1 A_eq[2, k*L] = 1
A[3, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal A_eq[2, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal
b[1:4] = [1, 1, 0] b_eq= [1, 1, 0]
# return A, b # Add the constraints to pulp
for i in range(3) :
self.prob += (pl.lpSum([A_eq[i][j] * self.x[j] for j in range(L*L)]) == b_eq[i])
def respect_order(self, L: int):
def respect_order(self, A, b, L: int):
""" """
Generate constraints to tie the optimization problem together and prevent Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles. stacked ones, although this does not fully prevent circles.
@ -216,17 +207,17 @@ class Optimizer:
Returns: Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
""" """
A_eq = np.zeros(L*L, dtype=np.int8)
ones = np.ones(L, dtype=np.int8) ones = np.ones(L, dtype=np.int8)
# Fill-in rows 4 to L+2 # Fill-in rows 4 to L+2
for i in range(1, L-1) : # Prevent stacked ones for i in range(1, L-1) : # Prevent stacked ones
for j in range(L) : for j in range(L) :
A[i-1+4, i + j*L] = -1 A_eq[i + j*L] = -1
A[i-1+4, i*L:(i+1)*L] = ones A_eq[i*L:(i+1)*L] = ones
self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 0)
b[4:L+2] = np.zeros(L-2, dtype=np.int8)
def respect_user_must(self, A, b, landmarks: list[Landmark]) : def respect_user_must(self, L: int, landmarks: list[Landmark]) :
""" """
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization. Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
-> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED -> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED
@ -238,21 +229,16 @@ class Optimizer:
Returns: Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
""" """
L = len(landmarks)
ones = np.ones(L, dtype=np.int8) ones = np.ones(L, dtype=np.int8)
incr = 0 A_eq = np.zeros(L*L, dtype=np.int8)
for i, elem in enumerate(landmarks) : for i, elem in enumerate(landmarks) :
if elem.must_do is True and i not in [0, L-1]: if elem.must_do is True and i not in [0, L-1]:
# First part of the dynamic infill A_eq[i*L:i*L+L] = ones
A[L+2+incr, i*L:i*L+L] = ones self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 1)
b[L+2+incr] = 1
incr += 1
if elem.must_avoid is True and i not in [0, L-1]: if elem.must_avoid is True and i not in [0, L-1]:
# Second part of the dynamic infill A_eq[i*L:i*L+L] = ones
A[L+2+incr, i*L:i*L+L] = ones self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 2)
b[L+2+incr] = 0
incr += 1
# Prevent the use of a particular solution. TODO probably can be done faster just using resx # Prevent the use of a particular solution. TODO probably can be done faster just using resx
@ -316,7 +302,9 @@ class Optimizer:
l[0, g*L + s] = 1 l[0, g*L + s] = 1
l[1, s*L + g] = 1 l[1, s*L + g] = 1
return l, [0, 0] # Add the constraints
self.prob += (pl.lpSum([l[0][j] * self.x[j] for j in range(L*L)]) == 0)
self.prob += (pl.lpSum([l[1][j] * self.x[j] for j in range(L*L)]) == 0)
def is_connected(self, resx) : def is_connected(self, resx) :
@ -481,6 +469,27 @@ class Optimizer:
return L return L
def pre_processing(self, L: int, landmarks: list[Landmark], max_time: int, max_landmarks: int | None) :
if max_landmarks is None :
max_landmarks = self.max_landmarks
# Define the problem
x_bounds = [(0, 1)]*L*L
self.x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)]
# Setup the inequality constraints
self.init_ub_time(L, landmarks, max_time) # Adds the distances from each landmark to the other.
self.respect_number(L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks).
self.break_sym(L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces.
# Setup the equality constraints
self.init_eq_not_stay(L) # Force solution not to stay in same place
self.respect_start_finish(L) # Force start and finish positions
self.respect_order(L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(L, landmarks) # Force to do/avoid landmarks set by user.
def solve_optimization( def solve_optimization(
self, self,
max_time: int, max_time: int,
@ -500,48 +509,16 @@ class Optimizer:
Returns: Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found. list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
""" """
if max_landmarks is None : # 1. Setup the optimization proplem.
max_landmarks = self.max_landmarks
L = len(landmarks) L = len(landmarks)
self.pre_processing(L, landmarks, max_time, max_landmarks)
# SET CONSTRAINTS FOR INEQUALITY # 2. Solve the problem
c, A_ub, b_ub = self.init_ub_time(landmarks, max_time) # Adds the distances from each landmark to the other. self.prob.solve(pl.PULP_CBC_CMD(msg=True, gapRel=0.1))
self.respect_number(A_ub, b_ub, L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks).
self.break_sym(A_ub, b_ub, L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces.
# SET CONSTRAINTS FOR EQUALITY # 3. Extract Results
A_eq, b_eq = self.init_eq_not_stay(landmarks) # Force solution not to stay in same place status = pl.LpStatus[self.prob.status]
self.respect_start_finish(A_eq, b_eq, L) # Force start and finish positions solution = [pl.value(var) for var in self.x] # The values of the decision variables (will be 0 or 1)
self.respect_order(A_eq, b_eq, L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(A_eq, b_eq, landmarks) # Force to do/avoid landmarks set by user.
self.logger.debug(f"Optimizing with {A_ub.shape[0]} + {A_eq.shape[0]} = {A_ub.shape[0] + A_eq.shape[0]} constraints.")
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem with PulP
prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize)
x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)]
# Add the objective function
prob += pl.lpSum([c[i] * x[i] for i in range(L*L)])
# Add Inequality Constraints (A_ub @ x <= b_ub)
for i in range(len(b_ub)): # Iterate over rows of A_ub
prob += (pl.lpSum([A_ub[i][j] * x[j] for j in range(L*L)]) <= b_ub[i])
# 5. Add Equality Constraints (A_eq @ x == b_eq)
for i in range(len(b_eq)): # Iterate over rows of A_eq
prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i])
# 6. Solve the problem
prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.3))
# 7. Extract Results
status = pl.LpStatus[prob.status]
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
self.logger.debug("First results are out. Looking out for circles and correcting.") self.logger.debug("First results are out. Looking out for circles and correcting.")
@ -563,12 +540,11 @@ class Optimizer:
for circle in circles : for circle in circles :
A, b = self.prevent_circle(circle, L) A, b = self.prevent_circle(circle, L)
prob += (pl.lpSum([A[0][j] * x[j] for j in range(L*L)]) == b[0])
prob += (pl.lpSum([A[1][j] * x[j] for j in range(L*L)]) == b[1])
prob.solve(pl.PULP_CBC_CMD(msg=False))
status = pl.LpStatus[prob.status] self.prob.solve(pl.PULP_CBC_CMD(msg=False))
solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1)
status = pl.LpStatus[self.prob.status]
solution = [pl.value(var) for var in self.x] # The values of the decision variables (will be 0 or 1)
if status != 'Optimal' : if status != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution after {i} cycles.") self.logger.error("The problem is overconstrained, no solution after {i} cycles.")
@ -589,5 +565,5 @@ class Optimizer:
order = self.get_order(solution) order = self.get_order(solution)
tour = [landmarks[i] for i in order] tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}") self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(self.prob.objective))}")
return tour return tour