better array handling in the optimizer
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Failing after 2m39s
Build and deploy the backend to staging / Deploy to staging (pull_request) Has been skipped
Run linting on the backend code / Build (pull_request) Successful in 25s
Run testing on the backend code / Build (pull_request) Failing after 1m37s

This commit is contained in:
Helldragon67 2025-01-14 11:59:23 +01:00
parent 41976e3e85
commit 4fae658dbb
6 changed files with 366 additions and 280 deletions

View File

@ -100,10 +100,11 @@ def new_trip(preferences: Preferences,
try:
base_tour = optimizer.solve_optimization(preferences.max_time_minute, landmarks_short)
except ArithmeticError as exc:
raise HTTPException(status_code=500, detail="No solution found") from exc
raise HTTPException(status_code=500) from exc
except TimeoutError as exc:
raise HTTPException(status_code=500, detail="Optimzation took too long") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(exc)}") from exc
t_first_stage = time.time() - start_time
start_time = time.time()

View File

@ -35,8 +35,10 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
}
)
result = response.json()
print(result)
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
@ -49,7 +51,7 @@ def test_turckheim(client, request): # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert len(landmarks) > 2 # check that there is something to visit
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert 2==3
def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
"""
@ -91,7 +93,88 @@ def test_bellecour(client, request) : # pylint: disable=redefined-outer-name
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
'''
def test_Paris(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Paris (les Halles) centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 300
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [48.86248803298562, 2.346451131285925]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
for elem in landmarks :
print(elem)
print(elem.osm_id)
# checks :
assert response.status_code == 200 # check for successful planning
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
def test_New_York(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in New York (les Halles) centre to ensure proper decision making in crowded area.
Args:
client:
request:
"""
start_time = time.time() # Start timer
duration_minutes = 600
response = client.post(
"/trip/new",
json={
"preferences": {"sightseeing": {"type": "sightseeing", "score": 5},
"nature": {"type": "nature", "score": 5},
"shopping": {"type": "shopping", "score": 5},
"max_time_minute": duration_minutes,
"detour_tolerance_minute": 0},
"start": [40.72592726802, -73.9920434795]
}
)
result = response.json()
landmarks = load_trip_landmarks(client, result['first_landmark_uuid'])
# Get computation time
comp_time = time.time() - start_time
# Add details to report
log_trip_details(request, landmarks, result['total_time'], duration_minutes)
for elem in landmarks :
print(elem)
print(elem.osm_id)
# checks :
assert response.status_code == 200 # check for successful planning
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
def test_shopping(client, request) : # pylint: disable=redefined-outer-name
"""
@ -128,7 +211,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
assert response.status_code == 200 # check for successful planning
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
'''
# def test_new_trip_single_prefs(client):
# response = client.post(
# "/trip/new",

View File

@ -280,6 +280,6 @@ class ClusterManager:
filtered_cluster_labels.append(np.full((label_counts[label],), label)) # Replicate the label
# update the cluster points and labels with the filtered data
self.cluster_points = np.vstack(filtered_cluster_points)
self.cluster_points = np.vstack(filtered_cluster_points) # ValueError here
self.cluster_labels = np.concatenate(filtered_cluster_labels)

View File

@ -1,3 +1,4 @@
"""Module used to import data from OSM and arrange them in categories."""
import math, yaml, logging
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
@ -79,7 +80,7 @@ class LandmarkManager:
# Create a bbox using the around technique
bbox = tuple((f"around:{reachable_bbox_side/2}", str(center_coordinates[0]), str(center_coordinates[1])))
# list for sightseeing
if preferences.sightseeing.score != 0:
score_function = lambda score: score * 10 * preferences.sightseeing.score / 5
@ -101,7 +102,7 @@ class LandmarkManager:
if preferences.shopping.score != 0:
score_function = lambda score: score * 10 * preferences.shopping.score / 5
current_landmarks = self.fetch_landmarks(bbox, self.amenity_selectors['shopping'], preferences.shopping.type, score_function)
# set time for all shopping activites :
for landmark in current_landmarks : landmark.duration = 30
all_landmarks.update(current_landmarks)
@ -110,7 +111,7 @@ class LandmarkManager:
shopping_manager = ClusterManager(bbox, 'shopping')
shopping_clusters = shopping_manager.generate_clusters()
all_landmarks.update(shopping_clusters)
landmarks_constrained = take_most_important(all_landmarks, self.N_important)
@ -152,7 +153,7 @@ class LandmarkManager:
elementType=['node', 'way', 'relation']
)
try:
try:
radius_result = self.overpass.query(radius_query)
N_elem = radius_result.countWays() + radius_result.countRelations()
self.logger.debug(f"There are {N_elem} ways/relations within 50m")
@ -242,28 +243,28 @@ class LandmarkManager:
name = elem.tag('name')
location = (elem.centerLat(), elem.centerLon())
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
osm_id = elem.id() # Add OSM id
# TODO: exclude these from the get go
# handle unprecise and no-name locations
if name is None or location[0] is None:
if osm_type == 'node' and 'viewpoint' in elem.tags().values():
if osm_type == 'node' and 'viewpoint' in elem.tags().values():
name = 'Viewpoint'
name_en = 'Viewpoint'
location = (elem.lat(), elem.lon())
else :
else :
continue
# skip if part of another building
if 'building:part' in elem.tags().keys() and elem.tag('building:part') == 'yes':
continue
elem_type = landmarktype # Add the landmark type as 'sightseeing,
elem_type = landmarktype # Add the landmark type as 'sightseeing,
n_tags = len(elem.tags().keys()) # Add number of tags
score = n_tags**self.tag_exponent # Add score
duration = 5 # Set base duration to 5 minutes
skip = False # Set skipping parameter to false
tag_values = set(elem.tags().values()) # Store tag values
tag_values = set(elem.tags().values()) # Store tag values
# Retrieve image, name and website :
@ -275,7 +276,7 @@ class LandmarkManager:
if elem_type != "nature" and elem.tag('leisure') == "park":
elem_type = "nature"
if elem.tag('wikipedia') is not None :
score += self.wikipedia_bonus
@ -309,9 +310,9 @@ class LandmarkManager:
# continue
score = score_function(score)
if "place_of_worship" in tag_values :
if 'cathedral' in tag_values :
if 'cathedral' in tag_values :
duration = 10
else :
score *= self.church_coeff
@ -319,7 +320,7 @@ class LandmarkManager:
elif 'viewpoint' in tag_values :
# viewpoints must count more
score = score * self.viewpoint_bonus
elif "museum" in tag_values or "aquarium" in tag_values or "planetarium" in tag_values:
duration = 60
@ -339,7 +340,7 @@ class LandmarkManager:
website_url = website_url
)
return_list.append(landmark)
self.logger.debug(f"Fetched {len(return_list)} landmarks of type {landmarktype} in {bbox}")
return return_list

View File

@ -29,7 +29,232 @@ class Optimizer:
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot']
def init_ub_time(self, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function coefficients and inequality constraints.
-> Adds 1 row of constraints
1 row
+ L-1 rows
-> Pre-allocates A_ub for the rest of the computations
This function computes the distances between all landmarks and stores
their attractiveness to maximize sightseeing. The goal is to maximize
the objective function subject to the constraints A*x < b and A_eq*x = b_eq.
Args:
landmarks (list[Landmark]): List of landmarks.
max_time (int): Maximum time of visit allowed.
Returns:
tuple[list[float], list[float], list[int]]: Objective function coefficients, inequality
constraint coefficients, and the right-hand side of the inequality constraint.
"""
L = len(landmarks)
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = np.zeros(L, dtype=np.int16)
# Coefficients of inequality constraints (left-hand side)
A_first = np.zeros((L, L), dtype=np.int16)
for i, spot1 in enumerate(landmarks) :
c[i] = -spot1.attractiveness
for j in range(i+1, L) :
if i !=j :
t = get_time(spot1.location, landmarks[j].location) + spot1.duration
A_first[i,j] = t
A_first[j,i] = t
# Now sort and modify A_ub for each row
if L > 22 :
for i in range(L):
# Get indices of the 20 smallest values in row i
closest_indices = np.argpartition(A_first[i, :], 20)[:20]
# Create a mask for non-closest landmarks
mask = np.ones(L, dtype=bool)
mask[closest_indices] = False
# Set non-closest landmarks to 32700
A_first[i, mask] = 32765
# Replicate the objective function 'c' for each decision variable (L times)
c = np.tile(c, L) # This correctly expands 'c' to L*L
return c, A_first.flatten(), [max_time*self.overshoot]
def respect_number(self, L, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
-> Adds L-1 rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
# First constraint: each landmark is visited exactly once
A = np.zeros((L-1, L*L), dtype=np.int8)
b = []
for i in range(1, L-1):
A[i-1, L*i:L*(i+1)] = np.ones(L, dtype=np.int8)
b.append(1)
# Second constraint: cap the total number of visits
A[-1, :] = np.ones(L*L, dtype=np.int8)
b.append(max_landmarks+2)
return A, b
def break_sym(self, L):
"""
Generate constraints to prevent simultaneous travel between two landmarks
in both directions. Constraint to not have d14 and d41 simultaneously.
Does not prevent cyclic paths with more elements
-> Adds a variable number of rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and
the right-hand side of the inequality constraints.
"""
b = []
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
A = np.zeros((len(up_ind_x[1:]),L*L), dtype=np.int8)
for i, _ in enumerate(up_ind_x[1:]) :
if up_ind_x[i] != up_ind_y[i] :
A[i, up_ind_x[i]*L + up_ind_y[i]] = 1
A[i, up_ind_y[i]*L + up_ind_x[i]] = 1
b.append(1)
return A[~np.all(A == 0, axis=1)], b
def init_eq_not_stay(self, L: int):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
-> Adds 1 row of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
"""
l = np.zeros((L, L), dtype=np.int8)
# Set diagonal elements to 1 (to prevent staying in the same position)
np.fill_diagonal(l, 1)
return l.flatten(), [0]
def respect_user_must_do(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
-> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = np.zeros((L, L*L), dtype=np.int8)
b = []
for i, elem in enumerate(landmarks) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
A[i, i*L:i*L+L] = np.ones(L, dtype=np.int8)
b.append(1)
return A[~np.all(A == 0, axis=1)], b
def respect_user_must_avoid(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped
in the optimization.
-> Adds a variable number of rows of constraints BUT CAN BE PRE COMPUTED
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_avoid'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = np.zeros((L, L*L), dtype=np.int8)
b = []
for i, elem in enumerate(landmarks) :
if elem.must_do is True and i not in [0, L-1]:
A[i, i*L:i*L+L] = np.ones(L, dtype=np.int8)
b.append(0)
return A[~np.all(A == 0, axis=1)], b
# Constraint to ensure start at start and finish at goal
def respect_start_finish(self, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated
start landmark and finishes at the goal landmark.
-> Adds 3 rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
A = np.zeros((3, L*L), dtype=np.int8)
A[0, :L] = np.ones(L, dtype=np.int8) # sets departures only for start (horizontal ones)
for k in range(L-1) :
A[2, k*L] = 1
if k != 0 :
A[1, k*L+L-1] = 1 # sets arrivals only for finish (vertical ones)
A[2, L*(L-1):] = np.ones(L, dtype=np.int8) # prevents arrivals at start and departures from goal
b = [1, 1, 0]
return A, b
def respect_order(self, L: int):
"""
Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles.
-> Adds L-2 rows of constraints
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
A = np.zeros((L-2, L*L), dtype=np.int8)
b = [0]*(L-2)
for i in range(1, L-1) : # Prevent stacked ones
for j in range(L) :
A[i-1, i + j*L] = -1
A[i-1, i*L:(i+1)*L] = np.ones(L, dtype=np.int8)
return A, b
# Prevent the use of a particular solution
@ -164,236 +389,6 @@ class Optimizer:
return order, all_journeys_nodes
def init_ub_time(self, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function coefficients and inequality constraints.
This function computes the distances between all landmarks and stores
their attractiveness to maximize sightseeing. The goal is to maximize
the objective function subject to the constraints A*x < b and A_eq*x = b_eq.
Args:
landmarks (list[Landmark]): List of landmarks.
max_time (int): Maximum time of visit allowed.
Returns:
tuple[list[float], list[float], list[int]]: Objective function coefficients, inequality
constraint coefficients, and the right-hand side of the inequality constraint.
"""
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
for spot1 in landmarks :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
t = get_time(spot1.location, spot2.location) + spot1.duration
dist_table[j] = t
closest = sorted(dist_table)[:15]
for i, dist in enumerate(dist_table) :
if dist not in closest :
dist_table[i] = 32700
A_ub += dist_table
c = c*len(landmarks)
return c, A_ub, [max_time*self.overshoot]
def respect_number(self, L, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
ones = [1]*L
zeros = [0]*L
A = ones + zeros*(L-1)
b = [1]
for i in range(L-1) :
h_new = zeros*i + ones + zeros*(L-1-i)
A = np.vstack((A, h_new))
b.append(1)
A = np.vstack((A, ones*L))
b.append(max_landmarks+1)
return A, b
# Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements
def break_sym(self, L):
"""
Generate constraints to prevent simultaneous travel between two landmarks in both directions.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
A = [0]*L*L
b = [1]
for i, _ in enumerate(up_ind_x[1:]) :
l = [0]*L*L
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
A = np.vstack((A,l))
b.append(1)
return A, b
def init_eq_not_stay(self, L: int):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
Args:
L (int): Number of landmarks.
Returns:
tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
"""
l = [0]*L*L
for i in range(L) :
for j in range(L) :
if j == i :
l[j + i*L] = 1
l = np.array(np.array(l), dtype=np.int8)
return [l], [0]
def respect_user_must_do(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L # set mandatory departures from landmarks tagged as 'must_do'
A = np.vstack((A,l))
b.append(1)
return A, b
def respect_user_must_avoid(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_avoid'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_avoid is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L
A = np.vstack((A,l))
b.append(0) # prevent departures from landmarks tagged as 'must_do'
return A, b
# Constraint to ensure start at start and finish at goal
def respect_start_finish(self, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
l_start = [1]*L + [0]*L*(L-1) # sets departures only for start (horizontal ones)
l_start[L-1] = 0 # prevents the jump from start to finish
l_goal = [0]*L*L # sets arrivals only for finish (vertical ones)
l_L = [0]*L*(L-1) + [1]*L # prevents arrivals at start and departures from goal
for k in range(L-1) : # sets only vertical ones for goal (go to)
l_L[k*L] = 1
if k != 0 :
l_goal[k*L+L-1] = 1
A = np.vstack((l_start, l_goal))
b = [1, 1]
A = np.vstack((A,l_L))
b.append(0)
return A, b
def respect_order(self, L: int):
"""
Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
A = [0]*L*L
b = [0]
for i in range(L-1) : # Prevent stacked ones
if i == 0 or i == L-1: # Don't touch start or finish
continue
else :
l = [0]*L
l[i] = -1
l = l*L
for j in range(L) :
l[i*L + j] = 1
A = np.vstack((A,l))
b.append(0)
return A, b
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
@ -455,28 +450,33 @@ class Optimizer:
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = self.init_ub_time(landmarks, max_time) # Add the distances from each landmark to the other
A, b = self.respect_number(L, max_landmarks) # Respect max number of visits (no more possible stops than landmarks).
A_ub = np.vstack((A_ub, A), dtype=np.int16)
A, b = self.respect_number(L, max_landmarks) # Respect max number of visits (no more possible stops than landmarks).
A_ub = np.vstack((A_ub, A))
b_ub += b
A, b = self.break_sym(L) # break the 'zig-zag' symmetry
A_ub = np.vstack((A_ub, A), dtype=np.int16)
A_ub = np.vstack((A_ub, A))
b_ub += b
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = self.init_eq_not_stay(L) # Force solution not to stay in same place
A, b = self.respect_user_must_do(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
if len(b) > 0 :
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_user_must_avoid(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
if len(b) > 0 :
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_start_finish(L) # Force start and finish positions
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_order(L) # Respect order of visit (only works when max_time is limiting factor)
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
# until here opti
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
@ -484,7 +484,7 @@ class Optimizer:
# Solve linear programming problem
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# Raise error if no solution is found
# Raise error if no solution is found. FIXME: for now this throws the internal server error
if not res.success :
raise ArithmeticError("No solution could be found, the problem is overconstrained. Try with a longer trip (>30 minutes).")
@ -505,7 +505,6 @@ class Optimizer:
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
if not res.success :
raise ArithmeticError("Solving failed because of overconstrained problem")
return None
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
if circles is None :

View File

@ -1,7 +1,9 @@
import yaml, logging
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
"""Allows to refine the tour by adding more landmarks and making the path easier to follow."""
import logging
from math import pi
import yaml
from shapely import buffer, LineString, Point, Polygon, MultiPoint, concave_hull
from ..structs.landmark import Landmark
from . import take_most_important, get_time_separation
@ -13,7 +15,7 @@ from ..constants import OPTIMIZER_PARAMETERS_PATH
class Refiner :
logger = logging.getLogger(__name__)
detour_factor: float # detour factor of straight line vs real distance in cities
detour_corridor_width: float # width of the corridor around the path
average_walking_speed: float # average walking speed of adult
@ -45,7 +47,7 @@ class Refiner :
"""
corrected_width = (180*width)/(6371000*pi)
path = self.create_linestring(landmarks)
obj = buffer(path, corrected_width, join_style="mitre", cap_style="square", mitre_limit=2)
@ -70,7 +72,7 @@ class Refiner :
return LineString(points)
# Check if some coordinates are in area. Used for the corridor
# Check if some coordinates are in area. Used for the corridor
def is_in_area(self, area: Polygon, coordinates) -> bool :
"""
Check if a given point is within a specified area.
@ -86,7 +88,7 @@ class Refiner :
return point.within(area)
# Function to determine if two landmarks are close to each other
# Function to determine if two landmarks are close to each other
def is_close_to(self, location1: tuple[float], location2: tuple[float]):
"""
Determine if two locations are close to each other by rounding their coordinates to 3 decimal places.
@ -119,7 +121,7 @@ class Refiner :
Returns:
list[Landmark]: The rearranged list of landmarks with grouped nearby visits.
"""
i = 1
while i < len(tour):
j = i+1
@ -131,9 +133,9 @@ class Refiner :
break # Move to the next i-th element after rearrangement
j += 1
i += 1
return tour
def integrate_landmarks(self, sub_list: list[Landmark], main_list: list[Landmark]) :
"""
Inserts 'sub_list' of Landmarks inside the 'main_list' by leaving the ends untouched.
@ -166,24 +168,24 @@ class Refiner :
should be visited, and the second element is a `Polygon` representing
the path connecting all landmarks.
"""
# Step 1: Find 'start' and 'finish' landmarks
start_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'start')
finish_idx = next(i for i, lm in enumerate(landmarks) if lm.type == 'finish')
start_landmark = landmarks[start_idx]
finish_landmark = landmarks[finish_idx]
# Step 2: Create a list of unvisited landmarks excluding 'start' and 'finish'
unvisited_landmarks = [lm for i, lm in enumerate(landmarks) if i not in [start_idx, finish_idx]]
# Step 3: Initialize the path with the 'start' landmark
path = [start_landmark]
coordinates = [landmarks[start_idx].location]
current_landmark = start_landmark
# Step 4: Use nearest neighbor heuristic to visit all landmarks
while unvisited_landmarks:
nearest_landmark = min(unvisited_landmarks, key=lambda lm: get_time_separation.get_time(current_landmark.location, lm.location))
@ -224,7 +226,7 @@ class Refiner :
for visited in visited_landmarks :
visited_names.append(visited.name)
for landmark in all_landmarks :
if self.is_in_area(area, landmark.location) and landmark.name not in visited_names:
second_order_landmarks.append(landmark)
@ -256,7 +258,7 @@ class Refiner :
coords_dict[landmark.location] = landmark
tour_poly = Polygon(coords)
better_tour_poly = tour_poly.buffer(0)
try :
xs, ys = better_tour_poly.exterior.xy
@ -299,7 +301,7 @@ class Refiner :
# Rearrange only if polygon still not simple
if not better_tour_poly.is_simple :
better_tour = self.rearrange(better_tour)
return better_tour
@ -330,7 +332,7 @@ class Refiner :
# No need to refine if no detour is taken
# if detour == 0:
# return base_tour
minor_landmarks = self.get_minor_landmarks(all_landmarks, base_tour, self.detour_corridor_width)
self.logger.debug(f"Using {len(minor_landmarks)} minor landmarks around the predicted path")
@ -341,7 +343,7 @@ class Refiner :
# Generate a new tour with the optimizer.
new_tour = self.optimizer.solve_optimization(
max_time = max_time + detour,
landmarks = full_set,
landmarks = full_set,
max_landmarks = self.max_landmarks_refiner
)
@ -357,7 +359,7 @@ class Refiner :
# Find shortest path using the nearest neighbor heuristic.
better_tour, better_poly = self.find_shortest_path_through_all_landmarks(new_tour)
# Fix the tour using Polygons if the path looks weird.
# Fix the tour using Polygons if the path looks weird.
# Conditions : circular trip and invalid polygon.
if base_tour[0].location == base_tour[-1].location and not better_poly.is_valid :
better_tour = self.fix_using_polygon(better_tour)