diff --git a/backend/src/tests/test_main.py b/backend/src/tests/test_main.py index bff5b00..710c21b 100644 --- a/backend/src/tests/test_main.py +++ b/backend/src/tests/test_main.py @@ -21,7 +21,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name request: """ start_time = time.time() # Start timer - duration_minutes = 20 + duration_minutes = 15 response = client.post( "/trip/new", @@ -45,8 +45,8 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name # Add details to report log_trip_details(request, landmarks, result['total_time'], duration_minutes) - # for elem in landmarks : - # print(elem) + for elem in landmarks : + print(elem) # checks : assert response.status_code == 200 # check for successful planning @@ -54,9 +54,9 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2 assert len(landmarks) > 2 # check that there is something to visit assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" - # assert 2==3 - + assert 2==3 +''' def test_bellecour(client, request) : # pylint: disable=redefined-outer-name """ @@ -219,7 +219,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds" assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2 - +''' # def test_new_trip_single_prefs(client): # response = client.post( # "/trip/new", diff --git a/backend/src/utils/optimizer.py b/backend/src/utils/optimizer.py index 210559f..31bf343 100644 --- a/backend/src/utils/optimizer.py +++ b/backend/src/utils/optimizer.py @@ -21,7 +21,8 @@ class Optimizer: average_walking_speed: float # average walking speed of adult max_landmarks: int # max number of landmarks to visit overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive - + prob: pl.LpProblem # linear optimization problem to solve + x: list[pl.LpVariable] # decision variables def __init__(self) : @@ -32,9 +33,12 @@ class Optimizer: self.average_walking_speed = parameters['average_walking_speed'] self.max_landmarks = parameters['max_landmarks'] self.overshoot = parameters['overshoot'] + + # Initalize the optimization problem + self.prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize) - def init_ub_time(self, landmarks: list[Landmark], max_time: int): + def init_ub_time(self, L: int, landmarks: list[Landmark], max_time: int): """ Initialize the objective function coefficients and inequality constraints. -> Adds 1 row of constraints @@ -57,23 +61,20 @@ class Optimizer: # Objective function coefficients. a*x1 + b*x2 + c*x3 + ... c = np.zeros(L, dtype=np.int16) - # Coefficients of inequality constraints (left-hand side) - A_ub = np.zeros((L + int((L*L-L)/2), L*L), dtype=np.int16) - b_ub = np.zeros(L + int((L*L-L)/2), dtype=np.int16) - - # Fill in first row - b_ub[0] = round(max_time*self.overshoot) + # inequality matrix and vector + A_ub = np.zeros(L*L, dtype=np.int16) + b_ub = round(max_time*self.overshoot) for i, spot1 in enumerate(landmarks) : c[i] = spot1.attractiveness for j in range(i+1, L) : if i !=j : t = get_time(spot1.location, landmarks[j].location) - A_ub[0, i*L + j] = t + spot1.duration - A_ub[0, j*L + i] = t + landmarks[j].duration + A_ub[i*L + j] = t + spot1.duration + A_ub[j*L + i] = t + landmarks[j].duration - # Expand 'c' to L*L for every decision variable - c = np.tile(c, L) + # Expand 'c' to L*L for every decision variable and ad + c = np.tile(c, L) # Now sort and modify A_ub for each row if L > 22 : @@ -90,10 +91,12 @@ class Optimizer: row_values[mask] = 32765 A_ub[0, i*L:i*L+L] = row_values - return c, A_ub, b_ub + # Add the objective and the distance constraint + self.prob += pl.lpSum([c[j] * self.x[j] for j in range(L*L)]) + self.prob += (pl.lpSum([A_ub[j] * self.x[j] for j in range(L*L)]) <= b_ub) - def respect_number(self, A, b, L, max_landmarks: int): + def respect_number(self, L: int, max_landmarks: int): """ Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks. -> Adds L-1 rows of constraints @@ -105,18 +108,16 @@ class Optimizer: tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. """ # First constraint: each landmark is visited exactly once - # Fill-in row 2 until row L-2 - for i in range(1, L-1): - A[i, L*i:L*(i+1)] = np.ones(L, dtype=np.int16) - b[i] = 1 + A_ub = np.zeros(L*L, dtype=np.int8) + for i in range(0, L-2): + A_ub[L*i:L*(i+1)] = np.ones(L, dtype=np.int16) + self.prob += (pl.lpSum([A_ub[j] * self.x[j] for j in range(L*L)]) <= 1) - # Fill-in row L-1 # Second constraint: cap the total number of visits - A[L-1, :] = np.ones(L*L, dtype=np.int16) - b[L-1] = max_landmarks+2 + self.prob += (pl.lpSum([1 * self.x[j] for j in range(L*L)]) <= max_landmarks+2) - def break_sym(self, A, b, L): + def break_sym(self, L: int): """ Generate constraints to prevent simultaneous travel between two landmarks in both directions. Constraint to not have d14 and d41 simultaneously. @@ -133,18 +134,17 @@ class Optimizer: upper_ind = np.triu_indices(L,0,L) up_ind_x = upper_ind[0] up_ind_y = upper_ind[1] + A = np.zeros(L*L, dtype=np.int8) # Fill-in rows L to 2*L-1 - incr = 0 for i in range(int((L*L+L)/2)) : if up_ind_x[i] != up_ind_y[i] : - A[L+incr, up_ind_x[i]*L + up_ind_y[i]] = 1 - A[L+incr, up_ind_y[i]*L + up_ind_x[i]] = 1 - b[L+incr] = 1 - incr += 1 + A[up_ind_x[i]*L + up_ind_y[i]] = 1 + A[up_ind_y[i]*L + up_ind_x[i]] = 1 + self.prob += (pl.lpSum([A[j] * self.x[j] for j in range(L*L)]) <= 1) - def init_eq_not_stay(self, landmarks: list): + def init_eq_not_stay(self, L: int): """ Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.). -> Adds 1 row of constraints @@ -156,28 +156,17 @@ class Optimizer: Returns: tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints. """ - L = len(landmarks) - incr = 0 - for i, elem in enumerate(landmarks) : - if (elem.must_do or elem.must_avoid) and i not in [0, L-1]: - incr += 1 - - A_eq = np.zeros((L+2+incr, L*L), dtype=np.int8) - b_eq = np.zeros(L+2+incr, dtype=np.int8) - l = np.zeros((L, L), dtype=np.int8) + A_eq = np.zeros((L, L), dtype=np.int8) # Set diagonal elements to 1 (to prevent staying in the same position) - np.fill_diagonal(l, 1) + np.fill_diagonal(A_eq, 1) + A_eq = A_eq.flatten() - # Fill-in first row - A_eq[0,:] = l.flatten() - b_eq[0] = 0 - - return A_eq, b_eq + self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 1) # Constraint to ensure start at start and finish at goal - def respect_start_finish(self, A, b, L: int): + def respect_start_finish(self, L: int): """ Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark. @@ -189,22 +178,24 @@ class Optimizer: Returns: tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. """ - # Fill-in row 1. - A[1, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones) + # Fill-in row 0. + A_eq = np.zeros((3,L*L), dtype=np.int8) + A_eq[0, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones) for k in range(L-1) : if k != 0 : - # Fill-in row 2 - A[2, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones) - # Fill-in row 3 - A[3, k*L] = 1 + # Fill-in row 1 + A_eq[1, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones) + # Fill-in row 1 + A_eq[2, k*L] = 1 - A[3, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal - b[1:4] = [1, 1, 0] + A_eq[2, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal + b_eq= [1, 1, 0] - # return A, b + # Add the constraints to pulp + for i in range(3) : + self.prob += (pl.lpSum([A_eq[i][j] * self.x[j] for j in range(L*L)]) == b_eq[i]) - - def respect_order(self, A, b, L: int): + def respect_order(self, L: int): """ Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles. @@ -216,17 +207,17 @@ class Optimizer: Returns: tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. """ + A_eq = np.zeros(L*L, dtype=np.int8) ones = np.ones(L, dtype=np.int8) # Fill-in rows 4 to L+2 for i in range(1, L-1) : # Prevent stacked ones for j in range(L) : - A[i-1+4, i + j*L] = -1 - A[i-1+4, i*L:(i+1)*L] = ones - - b[4:L+2] = np.zeros(L-2, dtype=np.int8) + A_eq[i + j*L] = -1 + A_eq[i*L:(i+1)*L] = ones + self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 0) - def respect_user_must(self, A, b, landmarks: list[Landmark]) : + def respect_user_must(self, L: int, landmarks: list[Landmark]) : """ Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization. -> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED @@ -238,21 +229,16 @@ class Optimizer: Returns: tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints. """ - L = len(landmarks) ones = np.ones(L, dtype=np.int8) - incr = 0 + A_eq = np.zeros(L*L, dtype=np.int8) for i, elem in enumerate(landmarks) : if elem.must_do is True and i not in [0, L-1]: - # First part of the dynamic infill - A[L+2+incr, i*L:i*L+L] = ones - b[L+2+incr] = 1 - incr += 1 + A_eq[i*L:i*L+L] = ones + self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 1) if elem.must_avoid is True and i not in [0, L-1]: - # Second part of the dynamic infill - A[L+2+incr, i*L:i*L+L] = ones - b[L+2+incr] = 0 - incr += 1 + A_eq[i*L:i*L+L] = ones + self.prob += (pl.lpSum([A_eq[j] * self.x[j] for j in range(L*L)]) == 2) # Prevent the use of a particular solution. TODO probably can be done faster just using resx @@ -316,9 +302,11 @@ class Optimizer: l[0, g*L + s] = 1 l[1, s*L + g] = 1 - return l, [0, 0] - + # Add the constraints + self.prob += (pl.lpSum([l[0][j] * self.x[j] for j in range(L*L)]) == 0) + self.prob += (pl.lpSum([l[1][j] * self.x[j] for j in range(L*L)]) == 0) + def is_connected(self, resx) : """ Determine the order of visits and detect any circular paths in the given configuration. @@ -481,6 +469,27 @@ class Optimizer: return L + def pre_processing(self, L: int, landmarks: list[Landmark], max_time: int, max_landmarks: int | None) : + + if max_landmarks is None : + max_landmarks = self.max_landmarks + + # Define the problem + x_bounds = [(0, 1)]*L*L + self.x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)] + + # Setup the inequality constraints + self.init_ub_time(L, landmarks, max_time) # Adds the distances from each landmark to the other. + self.respect_number(L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks). + self.break_sym(L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces. + + # Setup the equality constraints + self.init_eq_not_stay(L) # Force solution not to stay in same place + self.respect_start_finish(L) # Force start and finish positions + self.respect_order(L) # Respect order of visit (only works when max_time is limiting factor) + self.respect_user_must(L, landmarks) # Force to do/avoid landmarks set by user. + + def solve_optimization( self, max_time: int, @@ -500,48 +509,16 @@ class Optimizer: Returns: list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found. """ - if max_landmarks is None : - max_landmarks = self.max_landmarks - + # 1. Setup the optimization proplem. L = len(landmarks) + self.pre_processing(L, landmarks, max_time, max_landmarks) - # SET CONSTRAINTS FOR INEQUALITY - c, A_ub, b_ub = self.init_ub_time(landmarks, max_time) # Adds the distances from each landmark to the other. - self.respect_number(A_ub, b_ub, L, max_landmarks) # Respects max number of visits (no more possible stops than landmarks). - self.break_sym(A_ub, b_ub, L) # Breaks the 'zig-zag' symmetry. Avoids d12 and d21 but not larger cirlces. + # 2. Solve the problem + self.prob.solve(pl.PULP_CBC_CMD(msg=True, gapRel=0.1)) - # SET CONSTRAINTS FOR EQUALITY - A_eq, b_eq = self.init_eq_not_stay(landmarks) # Force solution not to stay in same place - self.respect_start_finish(A_eq, b_eq, L) # Force start and finish positions - self.respect_order(A_eq, b_eq, L) # Respect order of visit (only works when max_time is limiting factor) - self.respect_user_must(A_eq, b_eq, landmarks) # Force to do/avoid landmarks set by user. - - self.logger.debug(f"Optimizing with {A_ub.shape[0]} + {A_eq.shape[0]} = {A_ub.shape[0] + A_eq.shape[0]} constraints.") - - # SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1) - x_bounds = [(0, 1)]*L*L - - # Solve linear programming problem with PulP - prob = pl.LpProblem("OptimizationProblem", pl.LpMaximize) - x = [pl.LpVariable(f"x_{i}", lowBound=x_bounds[i][0], upBound=x_bounds[i][1], cat='Binary') for i in range(L*L)] - - # Add the objective function - prob += pl.lpSum([c[i] * x[i] for i in range(L*L)]) - - # Add Inequality Constraints (A_ub @ x <= b_ub) - for i in range(len(b_ub)): # Iterate over rows of A_ub - prob += (pl.lpSum([A_ub[i][j] * x[j] for j in range(L*L)]) <= b_ub[i]) - - # 5. Add Equality Constraints (A_eq @ x == b_eq) - for i in range(len(b_eq)): # Iterate over rows of A_eq - prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i]) - - # 6. Solve the problem - prob.solve(pl.PULP_CBC_CMD(msg=False, gapRel=0.3)) - - # 7. Extract Results - status = pl.LpStatus[prob.status] - solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1) + # 3. Extract Results + status = pl.LpStatus[self.prob.status] + solution = [pl.value(var) for var in self.x] # The values of the decision variables (will be 0 or 1) self.logger.debug("First results are out. Looking out for circles and correcting.") @@ -563,12 +540,11 @@ class Optimizer: for circle in circles : A, b = self.prevent_circle(circle, L) - prob += (pl.lpSum([A[0][j] * x[j] for j in range(L*L)]) == b[0]) - prob += (pl.lpSum([A[1][j] * x[j] for j in range(L*L)]) == b[1]) - prob.solve(pl.PULP_CBC_CMD(msg=False)) + + self.prob.solve(pl.PULP_CBC_CMD(msg=False)) - status = pl.LpStatus[prob.status] - solution = [pl.value(var) for var in x] # The values of the decision variables (will be 0 or 1) + status = pl.LpStatus[self.prob.status] + solution = [pl.value(var) for var in self.x] # The values of the decision variables (will be 0 or 1) if status != 'Optimal' : self.logger.error("The problem is overconstrained, no solution after {i} cycles.") @@ -589,5 +565,5 @@ class Optimizer: order = self.get_order(solution) tour = [landmarks[i] for i in order] - self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}") + self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(self.prob.objective))}") return tour