linting
Some checks failed
Build and deploy the backend to staging / Build and push image (pull_request) Successful in 2m22s
Run linting on the backend code / Build (pull_request) Successful in 43s
Run testing on the backend code / Build (pull_request) Failing after 2m23s
Build and deploy the backend to staging / Deploy to staging (pull_request) Successful in 24s

This commit is contained in:
Helldragon67 2025-01-16 12:22:36 +01:00
parent e5a4645f7a
commit 7027444602
12 changed files with 176 additions and 665 deletions

View File

@ -293,7 +293,7 @@ ignored-parents=
max-args=5
# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=20
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
@ -302,7 +302,7 @@ max-bool-expr=5
max-branches=12
# Maximum number of locals for function / method body.
max-locals=15
max-locals=30
# Maximum number of parents for a class (see R0901).
max-parents=7
@ -440,7 +440,12 @@ disable=raw-checker-failed,
use-implicit-booleaness-not-comparison-to-string,
use-implicit-booleaness-not-comparison-to-zero,
import-error,
line-too-long
multiple-statements,
line-too-long,
logging-fstring-interpolation,
duplicate-code,
relative-beyond-top-level,
invalid-name
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View File

@ -19,7 +19,6 @@ def configure_logging():
# in that case we want to log to stdout and also to loki
from loki_logger_handler.loki_logger_handler import LokiLoggerHandler
loki_url = os.getenv('LOKI_URL')
loki_url = "http://localhost:3100/loki/api/v1/push"
if loki_url is None:
raise ValueError("LOKI_URL environment variable is not set")

View File

@ -66,10 +66,10 @@ sightseeing:
- synagogue
- ruins
- temple
- government
# - government
- cathedral
- castle
- museum
# - museum
museums:
tourism:

View File

@ -11,7 +11,7 @@ def client():
"""Client used to call the app."""
return TestClient(app)
'''
def test_turckheim(client, request): # pylint: disable=redefined-outer-name
"""
Test n°1 : Custom test in Turckheim to ensure small villages are also supported.
@ -135,7 +135,7 @@ def test_cologne(client, request) : # pylint: disable=redefined-outer-name
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
"""
@ -176,7 +176,7 @@ def test_strasbourg(client, request) : # pylint: disable=redefined-outer-name
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
def test_zurich(client, request) : # pylint: disable=redefined-outer-name
"""
Test n°2 : Custom test in Lyon centre to ensure proper decision making in crowded area.
@ -335,7 +335,7 @@ def test_shopping(client, request) : # pylint: disable=redefined-outer-name
assert response.status_code == 200 # check for successful planning
assert comp_time < 30, f"Computation time exceeded 30 seconds: {comp_time:.2f} seconds"
assert duration_minutes*0.8 < int(result['total_time']) < duration_minutes*1.2
'''
# def test_new_trip_single_prefs(client):
# response = client.post(

View File

@ -1,3 +1,4 @@
"""Find clusters of interest to add more general areas of visit to the tour."""
import logging
from typing import Literal
@ -38,11 +39,24 @@ class Cluster(BaseModel):
class ClusterManager:
"""
A manager responsible for clustering points of interest, such as shops or historic sites,
to identify areas worth visiting. It uses the DBSCAN algorithm to detect clusters
based on a set of points retrieved from OpenStreetMap (OSM).
Attributes:
logger (logging.Logger): Logger for capturing relevant events and errors.
valid (bool): Indicates whether clusters were successfully identified.
all_points (list): All points retrieved from OSM, representing locations of interest.
cluster_points (list): Points identified as part of a cluster.
cluster_labels (list): Labels corresponding to the clusters each point belongs to.
cluster_type (Literal['sightseeing', 'shopping']): Type of clustering, either for sightseeing
landmarks or shopping areas.
"""
logger = logging.getLogger(__name__)
# NOTE: all points are in (lat, lon) format
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
valid: bool # Ensure the manager is valid (ie there are some clusters to be found)
all_points: list
cluster_points: list
cluster_labels: list
@ -65,8 +79,6 @@ class ClusterManager:
Args:
bbox: The bounding box coordinates (around:radius, center_lat, center_lon).
"""
# Initialize overpass and cache
self.overpass = Overpass()
CachingStrategy.use(JSON, cacheDir=OSM_CACHE_DIR)
@ -96,7 +108,7 @@ class ClusterManager:
if len(result.elements()) == 0 :
self.valid = False
else :
points = []
for elem in result.elements() :
@ -126,8 +138,8 @@ class ClusterManager:
self.filter_clusters() # ValueError here sometimes. I dont know why. # Filter the clusters to keep only the largest ones.
self.valid = True
else :
self.valid = False
else :
self.valid = False
def generate_clusters(self) -> list[Landmark]:
@ -155,7 +167,7 @@ class ClusterManager:
# Extract points belonging to the current cluster
current_cluster = self.cluster_points[self.cluster_labels == label]
# Calculate the centroid as the mean of the points
centroid = np.mean(current_cluster, axis=0)
@ -205,7 +217,7 @@ class ClusterManager:
selectors.append('"shop"="mall"')
new_name = 'Shopping Area'
t = 40
else :
else :
new_name = 'Neighborhood'
t = 15
@ -214,7 +226,7 @@ class ClusterManager:
osm_id = 0
osm_type = 'node'
for sel in selectors :
for sel in selectors :
query = overpassQueryBuilder(
bbox = bbox,
elementType = ['node', 'way', 'relation'],
@ -233,11 +245,11 @@ class ClusterManager:
location = (elem.centerLat(), elem.centerLon())
# Skip if element has neither name or location
if elem.tag('name') is None :
if elem.tag('name') is None :
continue
if location[0] is None :
if location[0] is None :
location = (elem.lat(), elem.lon())
if location[0] is None :
if location[0] is None :
continue
d = get_distance(cluster.centroid, location)
@ -245,14 +257,14 @@ class ClusterManager:
min_dist = d
new_name = elem.tag('name')
osm_type = elem.type() # Add type: 'way' or 'relation'
osm_id = elem.id() # Add OSM id
osm_id = elem.id() # Add OSM id
# Add english name if it exists
try :
new_name_en = elem.tag('name:en')
except:
pass
except Exception:
pass
return Landmark(
name=new_name,
type=self.cluster_type,
@ -290,4 +302,3 @@ class ClusterManager:
# update the cluster points and labels with the filtered data
self.cluster_points = np.vstack(filtered_cluster_points) # ValueError here
self.cluster_labels = np.concatenate(filtered_cluster_labels)

View File

@ -1,8 +1,10 @@
import yaml
"""Computes the distance (in meters) or the walking time (in minutes) between two coordinates."""
from math import sin, cos, sqrt, atan2, radians
import yaml
from ..constants import OPTIMIZER_PARAMETERS_PATH
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
DETOUR_FACTOR = parameters['detour_factor']
@ -10,6 +12,7 @@ with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
EARTH_RADIUS_KM = 6373
def get_time(p1: tuple[float, float], p2: tuple[float, float]) -> int:
"""
Calculate the time in minutes to travel from one location to another.
@ -21,8 +24,6 @@ def get_time(p1: tuple[float, float], p2: tuple[float, float]) -> int:
Returns:
int: Time to travel from p1 to p2 in minutes.
"""
# if p1 == p2:
# return 0
# else:
@ -61,22 +62,19 @@ def get_distance(p1: tuple[float, float], p2: tuple[float, float]) -> int:
Returns:
int: Time to travel from p1 to p2 in minutes.
"""
if p1 == p2:
return 0
else:
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
# Compute the distance in km along the surface of the Earth
# (assume spherical Earth)
# this is the haversine formula, stolen from stackoverflow
# in order to not use any external libraries
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
dlon = lon2 - lon1
dlat = lat2 - lat1
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return EARTH_RADIUS_KM * c
return EARTH_RADIUS_KM * c

View File

@ -1,5 +1,6 @@
"""Module used to import data from OSM and arrange them in categories."""
import math, yaml, logging
import logging
import yaml
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
@ -15,14 +16,17 @@ logging.getLogger('OSMPythonTools').setLevel(level=logging.CRITICAL)
class LandmarkManager:
"""
Use this to manage landmarks.
Uses the overpass api to fetch landmarks and classify them.
"""
logger = logging.getLogger(__name__)
radius_close_to: int # radius in meters
church_coeff: float # coeff to adjsut score of churches
nature_coeff: float # coeff to adjust score of parks
overall_coeff: float # coeff to adjust weight of tags
N_important: int # number of important landmarks to consider
n_important: int # number of important landmarks to consider
def __init__(self) -> None:
@ -43,7 +47,7 @@ class LandmarkManager:
self.wikipedia_bonus = parameters['wikipedia_bonus']
self.viewpoint_bonus = parameters['viewpoint_bonus']
self.pay_bonus = parameters['pay_bonus']
self.N_important = parameters['N_important']
self.n_important = parameters['N_important']
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
@ -113,7 +117,8 @@ class LandmarkManager:
self.logger.debug('Fetching shopping clusters...')
# set time for all shopping activites :
for landmark in current_landmarks : landmark.duration = 30
for landmark in current_landmarks :
landmark.duration = 30
all_landmarks.update(current_landmarks)
# special pipeline for shopping malls
@ -124,77 +129,12 @@ class LandmarkManager:
landmarks_constrained = take_most_important(all_landmarks, self.N_important)
landmarks_constrained = take_most_important(all_landmarks, self.n_important)
# self.logger.info(f'All landmarks generated : {len(all_landmarks)} landmarks around {center_coordinates}, and constrained to {len(landmarks_constrained)} most important ones.')
return all_landmarks, landmarks_constrained
"""
def count_elements_close_to(self, coordinates: tuple[float, float]) -> int:
Count the number of OpenStreetMap elements (nodes, ways, relations) within a specified radius of the given location.
This function constructs a bounding box around the specified coordinates based on the radius. It then queries
OpenStreetMap data to count the number of elements within that bounding box.
Args:
coordinates (tuple[float, float]): The latitude and longitude of the location to search around.
Returns:
int: The number of elements (nodes, ways, relations) within the specified radius. Returns 0 if no elements
are found or if an error occurs during the query.
lat = coordinates[0]
lon = coordinates[1]
radius = self.radius_close_to
alpha = (180 * radius) / (6371000 * math.pi)
bbox = {'latLower':lat-alpha,'lonLower':lon-alpha,'latHigher':lat+alpha,'lonHigher': lon+alpha}
# Build the query to find elements within the radius
radius_query = overpassQueryBuilder(
bbox=[bbox['latLower'],
bbox['lonLower'],
bbox['latHigher'],
bbox['lonHigher']],
elementType=['node', 'way', 'relation']
)
try:
radius_result = self.overpass.query(radius_query)
N_elem = radius_result.countWays() + radius_result.countRelations()
self.logger.debug(f"There are {N_elem} ways/relations within 50m")
if N_elem is None:
return 0
return N_elem
except:
return 0
"""
# def create_bbox(self, coordinates: tuple[float, float], reachable_bbox_side: int) -> tuple[float, float, float, float]:
# """
# Create a bounding box around the given coordinates.
# Args:
# coordinates (tuple[float, float]): The latitude and longitude of the center of the bounding box.
# reachable_bbox_side (int): The side length of the bounding box in meters.
# Returns:
# tuple[float, float, float, float]: The minimum latitude, minimum longitude, maximum latitude, and maximum longitude
# defining the bounding box.
# """
# # Half the side length in m (since it's a square bbox)
# half_side_length_m = reachable_bbox_side / 2
# return tuple((f"around:{half_side_length_m}", str(coordinates[0]), str(coordinates[1])))
def fetch_landmarks(self, bbox: tuple, amenity_selector: dict, landmarktype: str, score_function: callable) -> list[Landmark]:
"""
Fetches landmarks of a specified type from OpenStreetMap (OSM) within a bounding box centered on given coordinates.
@ -241,7 +181,7 @@ class LandmarkManager:
includeCenter = True,
out = 'center'
)
# self.logger.debug(f"Query: {query}")
self.logger.debug(f"Query: {query}")
try:
result = self.overpass.query(query)
@ -274,7 +214,7 @@ class LandmarkManager:
n_tags = len(elem.tags().keys()) # Add number of tags
score = n_tags**self.tag_exponent # Add score
duration = 5 # Set base duration to 5 minutes
skip = False # Set skipping parameter to false
# skip = False # Set skipping parameter to false
tag_values = set(elem.tags().values()) # Store tag values
@ -369,10 +309,10 @@ def dict_to_selector_list(d: dict) -> list:
"""
return_list = []
for key, value in d.items():
if type(value) == list:
if isinstance(value, list):
val = '|'.join(value)
return_list.append(f'{key}~"^({val})$"')
elif type(value) == str and len(value) == 0:
elif isinstance(value, str) and len(value) == 0:
return_list.append(f'{key}')
else:
return_list.append(f'{key}={value}')

View File

@ -1,524 +0,0 @@
import yaml, logging
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict, deque
from ..structs.landmark import Landmark
from .get_time_separation import get_time
from ..constants import OPTIMIZER_PARAMETERS_PATH
class Optimizer:
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
detour_factor: float # detour factor of straight line vs real distance in cities
average_walking_speed: float # average walking speed of adult
max_landmarks: int # max number of landmarks to visit
overshoot: float # overshoot to allow maxtime to overflow. Optimizer is a bit restrictive
def __init__(self) :
# load parameters from file
with OPTIMIZER_PARAMETERS_PATH.open('r') as f:
parameters = yaml.safe_load(f)
self.detour_factor = parameters['detour_factor']
self.average_walking_speed = parameters['average_walking_speed']
self.max_landmarks = parameters['max_landmarks']
self.overshoot = parameters['overshoot']
# Prevent the use of a particular solution
def prevent_config(self, resx):
"""
Prevent the use of a particular solution by adding constraints to the optimization.
Args:
resx (list[float]): List of edge weights.
Returns:
tuple[list[int], list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # Number of edges
L = int(np.sqrt(N)) # Number of landmarks
nonzeroind = np.nonzero(resx)[0] # the return is a little funky so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
vertices_visited = ind_a
vertices_visited.remove(0)
ones = [1]*L
h = [0]*N
for i in range(L) :
if i in vertices_visited :
h[i*L:i*L+L] = ones
return h, [len(vertices_visited)-1]
# Prevents the creation of the same circle (both directions)
def prevent_circle(self, circle_vertices: list, L: int) :
"""
Prevent circular paths by by adding constraints to the optimization.
Args:
circle_vertices (list): List of vertices forming a circle.
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: A tuple containing a new row for constraint matrix and new value for upper bound vector.
"""
l1 = [0]*L*L
l2 = [0]*L*L
for i, node in enumerate(circle_vertices[:-1]) :
next = circle_vertices[i+1]
l1[node*L + next] = 1
l2[next*L + node] = 1
s = circle_vertices[0]
g = circle_vertices[-1]
l1[g*L + s] = 1
l2[s*L + g] = 1
return np.vstack((l1, l2)), [0, 0]
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
Args:
resx (list): List of edge weights.
Returns:
tuple[list[int], Optional[list[list[int]]]]: A tuple containing the visit order and a list of any detected circles.
"""
# first round the results to have only 0-1 values
for i, elem in enumerate(resx):
resx[i] = round(elem)
N = len(resx) # length of res
L = int(np.sqrt(N)) # number of landmarks. CAST INTO INT but should not be a problem because N = L**2 by def.
nonzeroind = np.nonzero(resx)[0] # the return is a little funny so I use the [0]
nonzero_tup = np.unravel_index(nonzeroind, (L,L))
ind_a = nonzero_tup[0].tolist()
ind_b = nonzero_tup[1].tolist()
# Step 1: Create a graph representation
graph = defaultdict(list)
for a, b in zip(ind_a, ind_b):
graph[a].append(b)
# Step 2: Function to perform BFS/DFS to extract journeys
def get_journey(start):
journey_nodes = []
visited = set()
stack = deque([start])
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
journey_nodes.append(node)
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return journey_nodes
# Step 3: Extract all journeys
all_journeys_nodes = []
visited_nodes = set()
for node in ind_a:
if node not in visited_nodes:
journey_nodes = get_journey(node)
all_journeys_nodes.append(journey_nodes)
visited_nodes.update(journey_nodes)
for l in all_journeys_nodes :
if 0 in l :
order = l
all_journeys_nodes.remove(l)
break
if len(all_journeys_nodes) == 0 :
return order, None
return order, all_journeys_nodes
def init_ub_dist(self, landmarks: list[Landmark], max_time: int):
"""
Initialize the objective function coefficients and inequality constraints for the optimization problem.
This function computes the distances between all landmarks and stores their attractiveness to maximize sightseeing.
The goal is to maximize the objective function subject to the constraints A*x < b and A_eq*x = b_eq.
Args:
landmarks (list[Landmark]): List of landmarks.
max_time (int): Maximum time of visit allowed.
Returns:
tuple[list[float], list[float], list[int]]: Objective function coefficients, inequality constraint coefficients, and the right-hand side of the inequality constraint.
"""
# Objective function coefficients. a*x1 + b*x2 + c*x3 + ...
c = []
# Coefficients of inequality constraints (left-hand side)
A_ub = []
for spot1 in landmarks :
dist_table = [0]*len(landmarks)
c.append(-spot1.attractiveness)
for j, spot2 in enumerate(landmarks) :
t = get_time(spot1.location, spot2.location) + spot1.duration
dist_table[j] = t
closest = sorted(dist_table)[:25]
for i, dist in enumerate(dist_table) :
if dist not in closest :
dist_table[i] = 32700
A_ub += dist_table
c = c*len(landmarks)
return c, A_ub, [max_time*self.overshoot]
def respect_number(self, L, max_landmarks: int):
"""
Generate constraints to ensure each landmark is visited only once and cap the total number of visited landmarks.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
ones = [1]*L
zeros = [0]*L
A = ones + zeros*(L-1)
b = [1]
for i in range(L-1) :
h_new = zeros*i + ones + zeros*(L-1-i)
A = np.vstack((A, h_new))
b.append(1)
A = np.vstack((A, ones*L))
b.append(max_landmarks+1)
return A, b
# Constraint to not have d14 and d41 simultaneously. Does not prevent cyclic paths with more elements
def break_sym(self, L):
"""
Generate constraints to prevent simultaneous travel between two landmarks in both directions.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
upper_ind = np.triu_indices(L,0,L)
up_ind_x = upper_ind[0]
up_ind_y = upper_ind[1]
A = [0]*L*L
b = [1]
for i, _ in enumerate(up_ind_x[1:]) :
l = [0]*L*L
if up_ind_x[i] != up_ind_y[i] :
l[up_ind_x[i]*L + up_ind_y[i]] = 1
l[up_ind_y[i]*L + up_ind_x[i]] = 1
A = np.vstack((A,l))
b.append(1)
return A, b
def init_eq_not_stay(self, L: int):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
Args:
L (int): Number of landmarks.
Returns:
tuple[list[np.ndarray], list[int]]: Equality constraint coefficients and the right-hand side of the equality constraints.
"""
l = [0]*L*L
for i in range(L) :
for j in range(L) :
if j == i :
l[j + i*L] = 1
l = np.array(np.array(l), dtype=np.int8)
return [l], [0]
def respect_user_must_do(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_do' are included in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_do'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_do is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L # set mandatory departures from landmarks tagged as 'must_do'
A = np.vstack((A,l))
b.append(1)
return A, b
def respect_user_must_avoid(self, landmarks: list[Landmark]) :
"""
Generate constraints to ensure that landmarks marked as 'must_avoid' are skipped in the optimization.
Args:
landmarks (list[Landmark]): List of landmarks, where some are marked as 'must_avoid'.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
L = len(landmarks)
A = [0]*L*L
b = [0]
for i, elem in enumerate(landmarks[1:]) :
if elem.must_avoid is True and elem.name not in ['finish', 'start']:
l = [0]*L*L
l[i*L:i*L+L] = [1]*L
A = np.vstack((A,l))
b.append(0) # prevent departures from landmarks tagged as 'must_do'
return A, b
# Constraint to ensure start at start and finish at goal
def respect_start_finish(self, L: int):
"""
Generate constraints to ensure that the optimization starts at the designated start landmark and finishes at the goal landmark.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
l_start = [1]*L + [0]*L*(L-1) # sets departures only for start (horizontal ones)
l_start[L-1] = 0 # prevents the jump from start to finish
l_goal = [0]*L*L # sets arrivals only for finish (vertical ones)
l_L = [0]*L*(L-1) + [1]*L # prevents arrivals at start and departures from goal
for k in range(L-1) : # sets only vertical ones for goal (go to)
l_L[k*L] = 1
if k != 0 :
l_goal[k*L+L-1] = 1
A = np.vstack((l_start, l_goal))
b = [1, 1]
A = np.vstack((A,l_L))
b.append(0)
return A, b
def respect_order(self, L: int):
"""
Generate constraints to tie the optimization problem together and prevent stacked ones, although this does not fully prevent circles.
Args:
L (int): Number of landmarks.
Returns:
tuple[np.ndarray, list[int]]: Inequality constraint coefficients and the right-hand side of the inequality constraints.
"""
A = [0]*L*L
b = [0]
for i in range(L-1) : # Prevent stacked ones
if i == 0 or i == L-1: # Don't touch start or finish
continue
else :
l = [0]*L
l[i] = -1
l = l*L
for j in range(L) :
l[i*L + j] = 1
A = np.vstack((A,l))
b.append(0)
return A, b
def link_list(self, order: list[int], landmarks: list[Landmark])->list[Landmark] :
"""
Compute the time to reach from each landmark to the next and create a list of landmarks with updated travel times.
Args:
order (list[int]): List of indices representing the order of landmarks to visit.
landmarks (list[Landmark]): List of all landmarks.
Returns:
list[Landmark]]: The updated linked list of landmarks with travel times
"""
L = []
j = 0
while j < len(order)-1 :
# get landmarks involved
elem = landmarks[order[j]]
next = landmarks[order[j+1]]
# get attributes
elem.time_to_reach_next = get_time(elem.location, next.location)
elem.must_do = True
elem.location = (round(elem.location[0], 5), round(elem.location[1], 5))
elem.next_uuid = next.uuid
L.append(elem)
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
L.append(next)
return L
# Main optimization pipeline
def solve_optimization(
self,
max_time: int,
landmarks: list[Landmark],
max_landmarks: int = None
) -> list[Landmark]:
"""
Main optimization pipeline to solve the landmark visiting problem.
This method sets up and solves a linear programming problem with constraints to find an optimal tour of landmarks,
considering user-defined must-visit landmarks, start and finish points, and ensuring no cycles are present.
Args:
max_time (int): Maximum time allowed for the tour in minutes.
landmarks (list[Landmark]): List of landmarks to visit.
max_landmarks (int): Maximum number of landmarks visited
Returns:
list[Landmark]: The optimized tour of landmarks with updated travel times, or None if no valid solution is found.
"""
if max_landmarks is None :
max_landmarks = self.max_landmarks
L = len(landmarks)
# SET CONSTRAINTS FOR INEQUALITY
c, A_ub, b_ub = self.init_ub_dist(landmarks, max_time) # Add the distances from each landmark to the other
A, b = self.respect_number(L, max_landmarks) # Respect max number of visits (no more possible stops than landmarks).
A_ub = np.vstack((A_ub, A), dtype=np.int16)
b_ub += b
A, b = self.break_sym(L) # break the 'zig-zag' symmetry
A_ub = np.vstack((A_ub, A), dtype=np.int16)
b_ub += b
# SET CONSTRAINTS FOR EQUALITY
A_eq, b_eq = self.init_eq_not_stay(L) # Force solution not to stay in same place
A, b = self.respect_user_must_do(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_user_must_avoid(landmarks) # Check if there are user_defined must_see. Also takes care of start/goal
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_start_finish(L) # Force start and finish positions
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
A, b = self.respect_order(L) # Respect order of visit (only works when max_time is limiting factor)
A_eq = np.vstack((A_eq, A), dtype=np.int8)
b_eq += b
# SET BOUNDS FOR DECISION VARIABLE (x can only be 0 or 1)
x_bounds = [(0, 1)]*L*L
# Solve linear programming problem
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
# Raise error if no solution is found
if not res.success :
raise ArithmeticError("No solution could be found, the problem is overconstrained. Try with a longer trip (>30 minutes).")
# If there is a solution, we're good to go, just check for connectiveness
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
i = 0
timeout = 80
while circles is not None and i < timeout:
A, b = self.prevent_config(res.x)
A_ub = np.vstack((A_ub, A))
b_ub += b
#A_ub, b_ub = prevent_circle(order, len(landmarks), A_ub, b_ub)
for circle in circles :
A, b = self.prevent_circle(circle, L)
A_eq = np.vstack((A_eq, A))
b_eq += b
res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq = b_eq, bounds=x_bounds, method='highs', integrality=3)
if not res.success :
raise ArithmeticError("Solving failed because of overconstrained problem")
return None
order, circles = self.is_connected(res.x)
#nodes, edges = is_connected(res.x)
if circles is None :
break
# print(i)
i += 1
if i == timeout :
raise TimeoutError(f"Optimization took too long. No solution found after {timeout} iterations.")
#sort the landmarks in the order of the solution
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, score: {int(-res.fun)}")
return tour

View File

@ -1,19 +1,43 @@
import yaml, logging
"""Module responsible for sloving an MILP to find best tour around the given landmarks."""
import logging
from collections import defaultdict, deque
import yaml
import numpy as np
import pulp as pl
from scipy.optimize import linprog
from collections import defaultdict, deque
from ..structs.landmark import Landmark
from .get_time_separation import get_time
from ..constants import OPTIMIZER_PARAMETERS_PATH
# Silence the pupl logger
logging.getLogger('pulp').setLevel(level=logging.CRITICAL)
class Optimizer:
"""
Optimizes the balance between the efficiency of a tour and the inclusion of landmarks.
The `Optimizer` class is responsible for calculating the best possible detour adjustments
to a tour based on specific parameters such as detour time, walking speed, and the maximum
number of landmarks to visit. It helps refine a tour by determining whether adding additional
landmarks would significantly reduce the overall efficiency.
Responsibilities:
- Calculates the maximum detour time allowed for a given tour.
- Considers the detour factor, which accounts for real-world walking paths versus straight-line distance.
- Takes into account the average walking speed to estimate walking times.
- Limits the number of landmarks that can be added to the tour to prevent excessive detouring.
- Allows some overflow (overshoot) in the maximum detour time to accommodate for slight inefficiencies.
Attributes:
logger (logging.Logger): Logger for capturing relevant events and errors.
detour (int): The accepted maximum detour time in minutes.
detour_factor (float): The ratio between straight-line distance and actual walking distance in cities.
average_walking_speed (float): The average walking speed of an adult (in meters per second or kilometers per hour).
max_landmarks (int): The maximum number of landmarks to include in the tour.
overshoot (float): The overshoot allowance for exceeding the maximum detour time in a restrictive manner.
"""
logger = logging.getLogger(__name__)
detour: int = None # accepted max detour time (in minutes)
@ -135,7 +159,7 @@ class Optimizer:
prob += (x[up_ind_x[i]*L + up_ind_y[i]] + x[up_ind_y[i]*L + up_ind_x[i]] <= 1)
def init_eq_not_stay(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
def init_eq_not_stay(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to prevent staying in the same position (e.g., removing d11, d22, d33, etc.).
-> Adds 1 row of constraints
@ -187,7 +211,7 @@ class Optimizer:
for i in range(3) :
prob += (pl.lpSum([A_eq[i][j] * x[j] for j in range(L*L)]) == b_eq[i])
def respect_order(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
def respect_order(self, prob: pl.LpProblem, x: pl.LpVariable, L: int):
"""
Generate constraints to tie the optimization problem together and prevent
stacked ones, although this does not fully prevent circles.
@ -251,10 +275,10 @@ class Optimizer:
# Returns:
# tuple[list[int], list[int]]: A tuple containing a new row for A and new value for ub.
# """
# for i, elem in enumerate(resx):
# resx[i] = round(elem)
# N = len(resx) # Number of edges
# L = int(np.sqrt(N)) # Number of landmarks
@ -305,7 +329,7 @@ class Optimizer:
prob += (pl.lpSum([l[0][j] * x[j] for j in range(L*L)]) == 0)
prob += (pl.lpSum([l[1][j] * x[j] for j in range(L*L)]) == 0)
def is_connected(self, resx) :
"""
Determine the order of visits and detect any circular paths in the given configuration.
@ -462,13 +486,40 @@ class Optimizer:
j += 1
next.location = (round(next.location[0], 5), round(next.location[1], 5))
next.must_do = True
next.must_do = True
L.append(next)
return L
def pre_processing(self, L: int, landmarks: list[Landmark], max_time: int, max_landmarks: int | None) :
"""
Preprocesses the optimization problem by setting up constraints and variables for the tour optimization.
This method initializes and prepares the linear programming problem to optimize a tour that includes landmarks,
while respecting various constraints such as time limits, the number of landmarks to visit, and user preferences.
The pre-processing step sets up the problem before solving it using a linear programming solver.
Responsibilities:
- Defines the optimization problem using linear programming (LP) with the objective to maximize the tour value.
- Creates binary decision variables for each potential transition between landmarks.
- Sets up inequality constraints to respect the maximum time available for the tour and the maximum number of landmarks.
- Implements equality constraints to ensure the tour respects the start and finish positions, avoids staying in the same place,
and adheres to a visit order.
- Forces inclusion or exclusion of specific landmarks based on user preferences.
Attributes:
prob (pl.LpProblem): The linear programming problem to be solved.
x (list): A list of binary variables representing transitions between landmarks.
L (int): The total number of landmarks considered in the optimization.
landmarks (list[Landmark]): The list of landmarks to be visited in the tour.
max_time (int): The maximum allowable time for the entire tour.
max_landmarks (int | None): The maximum number of landmarks to visit in the tour, or None if no limit is set.
Returns:
prob (pl.LpProblem): The linear programming problem setup for optimization.
x (list): The list of binary variables for transitions between landmarks in the tour.
"""
if max_landmarks is None :
max_landmarks = self.max_landmarks
@ -490,7 +541,7 @@ class Optimizer:
self.respect_start_finish(prob, x, L) # Force start and finish positions
self.respect_order(prob, x, L) # Respect order of visit (only works when max_time is limiting factor)
self.respect_user_must(prob, x, L, landmarks) # Force to do/avoid landmarks set by user.
return prob, x
def solve_optimization(
@ -555,15 +606,15 @@ class Optimizer:
if pl.LpStatus[prob.status] != 'Optimal' :
self.logger.error("The problem is overconstrained, no solution after {i} cycles.")
raise ArithmeticError("No solution could be found. Please try again with more time or different preferences.")
circles = self.is_connected(solution)
if circles is None :
break
# Sort the landmarks in the order of the solution
order = self.get_order(solution)
tour = [landmarks[i] for i in order]
tour = [landmarks[i] for i in order]
self.logger.debug(f"Re-optimized {i} times, objective value : {int(pl.value(prob.objective))}")
return tour

View File

@ -13,7 +13,14 @@ from ..constants import OPTIMIZER_PARAMETERS_PATH
class Refiner :
"""
Refines a tour by incorporating smaller landmarks along the path to enhance the experience.
This class is designed to adjust an existing tour by considering additional,
smaller points of interest (landmarks) that may require minor detours but
improve the overall quality of the tour. It balances the efficiency of travel
with the added value of visiting these landmarks.
"""
logger = logging.getLogger(__name__)
detour_factor: float # detour factor of straight line vs real distance in cities
@ -267,7 +274,7 @@ class Refiner :
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
except :
except Exception:
better_tour_poly = concave_hull(MultiPoint(coords)) # Create concave hull with "core" of tour leaving out start and finish
xs, ys = better_tour_poly.exterior.xy
"""

View File

@ -1,3 +1,4 @@
"""Helper function to return only the major landmarks from a large list."""
from ..structs.landmark import Landmark
def take_most_important(landmarks: list[Landmark], n_important) -> list[Landmark]:

View File

@ -1,16 +1,34 @@
import logging, yaml
"""Module for finding public toilets around given coordinates."""
import logging
from OSMPythonTools.overpass import Overpass, overpassQueryBuilder
from OSMPythonTools.cachingStrategy import CachingStrategy, JSON
from ..structs.landmark import Toilets
from ..constants import LANDMARK_PARAMETERS_PATH, OSM_CACHE_DIR
from ..constants import OSM_CACHE_DIR
# silence the overpass logger
logging.getLogger('OSMPythonTools').setLevel(level=logging.CRITICAL)
class ToiletsManager:
"""
Manages the process of fetching and caching toilet information from
OpenStreetMap (OSM) based on a specified location and radius.
This class is responsible for:
- Fetching toilet data from OSM using Overpass API around a given set of
coordinates (latitude, longitude).
- Using a caching strategy to optimize requests by saving and retrieving
data from a local cache.
- Logging important events and errors related to data fetching.
Attributes:
logger (logging.Logger): Logger for the class to capture events.
location (tuple[float, float]): Latitude and longitude representing the
location to search around.
radius (int): The search radius in meters for finding nearby toilets.
overpass (Overpass): The Overpass API instance used to query OSM.
"""
logger = logging.getLogger(__name__)
location: tuple[float, float]
@ -26,9 +44,14 @@ class ToiletsManager:
def generate_toilet_list(self) -> list[Toilets] :
"""
Generates a list of toilet locations by fetching data from OpenStreetMap (OSM)
around the given coordinates stored in `self.location`.
# Create a bbox using the around technique
Returns:
list[Toilets]: A list of `Toilets` objects containing detailed information
about the toilets found around the given coordinates.
"""
bbox = tuple((f"around:{self.radius}", str(self.location[0]), str(self.location[1])))
toilets_list = []
@ -55,12 +78,12 @@ class ToiletsManager:
# handle unprecise and no-name locations
if location[0] is None:
location = (elem.lat(), elem.lon())
else :
location = (elem.lat(), elem.lon())
else :
continue
toilets = Toilets(location=location)
if 'wheelchair' in elem.tags().keys() and elem.tag('wheelchair') == 'yes':
toilets.wheelchair = True